diff --git a/crates/core/src/app_state.rs b/crates/core/src/app_state.rs index 45eccc2c..0fe22423 100644 --- a/crates/core/src/app_state.rs +++ b/crates/core/src/app_state.rs @@ -41,7 +41,8 @@ struct InternalState { config: Reactive, json_schema_registry: Arc>, - // TODO: Remove in favor of connection manager. + // TODO: Maybe remove in favor of connection manager. Note that this is currently also used for + // the state.user_conn(). conn: trailbase_sqlite::Connection, logs_conn: trailbase_sqlite::Connection, connection_manager: ConnectionManager, diff --git a/crates/core/src/records/subscribe.rs b/crates/core/src/records/subscribe.rs index 24701277..e5b47d9e 100644 --- a/crates/core/src/records/subscribe.rs +++ b/crates/core/src/records/subscribe.rs @@ -157,9 +157,9 @@ struct PerConnectionState { /// Map from table name to row id to list of subscriptions. /// - /// QUESTION: Could intrusive maps help to streamline the subscription life-cycle? - /// Tentatively: no. Existing rust implementations don't support auto-unlink. - subscriptions: RwLock>, + /// NOTE: Use llayered locking to allow cleaning up per-table subscriptions w/o having to + /// exclusively lock the entire map. + subscriptions: RwLock>>, } impl PerConnectionState { @@ -169,9 +169,13 @@ impl PerConnectionState { // Gets called by the Stream destructor, e.g. when a client disconnects. fn remove_subscription(&self, conn: &rusqlite::Connection, id: SubscriptionId) { - let mut lock = self.subscriptions.write(); + let mut read_lock = self.subscriptions.upgradable_read(); + + let remove_subscription_entry_for_table = { + let Some(mut subscriptions) = read_lock.get(&id.table_name).map(|l| l.write()) else { + return; + }; - if let Some(subscriptions) = lock.get_mut(&id.table_name) { if let Some(row_id) = id.row_id { if let Some(record_subscriptions) = subscriptions.record.get_mut(&row_id) { record_subscriptions.retain(|sub| { @@ -188,13 +192,17 @@ impl PerConnectionState { }); } - if subscriptions.table.is_empty() && subscriptions.record.is_empty() { - lock.remove(&id.table_name); - } - } + subscriptions.table.is_empty() && subscriptions.record.is_empty() + }; - if lock.is_empty() { - conn.preupdate_hook(NO_HOOK); + if remove_subscription_entry_for_table { + // NOTE: Only write lock across all tables when necessary. + read_lock.with_upgraded(|lock| { + lock.remove(&id.table_name); + if lock.is_empty() { + conn.preupdate_hook(NO_HOOK); + } + }); } } @@ -225,7 +233,7 @@ impl PerConnectionState { // If there are no matching subscriptions, skip. { let lock = state.subscriptions.read(); - let Some(subscriptions) = lock.get(&qualified_table_name) else { + let Some(subscriptions) = lock.get(&qualified_table_name).map(|r| r.read()) else { return; }; @@ -289,6 +297,7 @@ impl PerConnectionState { let subscriptions = lock.entry(api.qualified_name().clone()).or_default(); subscriptions + .write() .record .entry(row_id) .or_default() @@ -347,7 +356,7 @@ impl PerConnectionState { let empty = lock.is_empty(); let subscriptions = lock.entry(api.qualified_name().clone()).or_default(); - subscriptions.table.push(Subscription { + subscriptions.write().table.push(Subscription { subscription_id, record_api_name: api.api_name().to_string(), user, @@ -522,7 +531,7 @@ impl SubscriptionManager { let mut count: usize = 0; for state in self.state.connections.read().values() { for (_table_name, subs) in state.subscriptions.read().iter() { - for record in subs.record.values() { + for record in subs.read().record.values() { count += record.len(); } } @@ -535,7 +544,7 @@ impl SubscriptionManager { let mut count: usize = 0; for state in self.state.connections.read().values() { for (_table_name, subs) in state.subscriptions.read().iter() { - count += subs.table.len(); + count += subs.read().table.len(); } } return count; @@ -662,7 +671,7 @@ fn hook_continuation(conn: &rusqlite::Connection, s: ContinuationState) { let mut read_lock = state.subscriptions.upgradable_read(); let (dead_record_subscriptions, dead_table_subscriptions) = { - let Some(subscriptions) = read_lock.get(&table_name) else { + let Some(subscriptions) = read_lock.get(&table_name).map(|r| r.read()) else { return; }; @@ -685,44 +694,49 @@ fn hook_continuation(conn: &rusqlite::Connection, s: ContinuationState) { .as_ref() .is_some_and(|dead| !dead.is_empty() || action == RecordAction::Delete); - // Finally clean up. - if !dead_table_subscriptions.is_empty() || cleanup_record_subscriptions { - // NOTE: we're locking for all tables. If this is a bottlneck we can always lock more - // granularly. - read_lock.with_upgraded(|lock| { - let Some(subscriptions) = lock.get_mut(&table_name) else { - return; - }; + // .Clean up if necessary + if dead_table_subscriptions.is_empty() && !cleanup_record_subscriptions { + return; + } - // Record subscription cleanup. - if let Some(dead_record_subscriptions) = dead_record_subscriptions { - if action == RecordAction::Delete { - // This is unique for record subscriptions: if the record is deleted, cancel all - // subscriptions. + let remove_subscription_entry_for_table = { + let Some(mut subscriptions) = read_lock.get(&table_name).map(|l| l.write()) else { + return; + }; + + // Record subscription cleanup. + if let Some(dead_record_subscriptions) = dead_record_subscriptions { + if action == RecordAction::Delete { + // This is unique for record subscriptions: if the record is deleted, cancel all + // subscriptions. + subscriptions.record.remove(&rowid); + } else if let Some(m) = subscriptions.record.get_mut(&rowid) { + for idx in dead_record_subscriptions.iter().rev() { + m.swap_remove(*idx); + } + + if m.is_empty() { subscriptions.record.remove(&rowid); - } else if let Some(m) = subscriptions.record.get_mut(&rowid) { - for idx in dead_record_subscriptions.iter().rev() { - m.swap_remove(*idx); - } - - if m.is_empty() { - subscriptions.record.remove(&rowid); - } } } + } - // Table subscription cleanup. - { - for idx in dead_table_subscriptions.iter().rev() { - subscriptions.table.swap_remove(*idx); - } + // Table subscription cleanup. + { + for idx in dead_table_subscriptions.iter().rev() { + subscriptions.table.swap_remove(*idx); } + } - if subscriptions.table.is_empty() && subscriptions.table.is_empty() { - lock.remove(&table_name); - if lock.is_empty() { - conn.preupdate_hook(NO_HOOK); - } + subscriptions.table.is_empty() && subscriptions.table.is_empty() + }; + + if remove_subscription_entry_for_table { + // NOTE: Only write lock across all tables when necessary. + read_lock.with_upgraded(|lock| { + lock.remove(&table_name); + if lock.is_empty() { + conn.preupdate_hook(NO_HOOK); } }); }