Layered locking to reduce congestion and improve subscription performance.

This commit is contained in:
Sebastian Jeltsch
2025-12-14 00:29:39 +01:00
parent 1875220c67
commit 2de60c2d10
2 changed files with 63 additions and 48 deletions

View File

@@ -41,7 +41,8 @@ struct InternalState {
config: Reactive<Config>,
json_schema_registry: Arc<parking_lot::RwLock<JsonSchemaRegistry>>,
// TODO: Remove in favor of connection manager.
// TODO: Maybe remove in favor of connection manager. Note that this is currently also used for
// the state.user_conn().
conn: trailbase_sqlite::Connection,
logs_conn: trailbase_sqlite::Connection,
connection_manager: ConnectionManager,

View File

@@ -157,9 +157,9 @@ struct PerConnectionState {
/// Map from table name to row id to list of subscriptions.
///
/// QUESTION: Could intrusive maps help to streamline the subscription life-cycle?
/// Tentatively: no. Existing rust implementations don't support auto-unlink.
subscriptions: RwLock<HashMap</* table_name= */ QualifiedName, Subscriptions>>,
/// NOTE: Use llayered locking to allow cleaning up per-table subscriptions w/o having to
/// exclusively lock the entire map.
subscriptions: RwLock<HashMap</* table_name= */ QualifiedName, RwLock<Subscriptions>>>,
}
impl PerConnectionState {
@@ -169,9 +169,13 @@ impl PerConnectionState {
// Gets called by the Stream destructor, e.g. when a client disconnects.
fn remove_subscription(&self, conn: &rusqlite::Connection, id: SubscriptionId) {
let mut lock = self.subscriptions.write();
let mut read_lock = self.subscriptions.upgradable_read();
let remove_subscription_entry_for_table = {
let Some(mut subscriptions) = read_lock.get(&id.table_name).map(|l| l.write()) else {
return;
};
if let Some(subscriptions) = lock.get_mut(&id.table_name) {
if let Some(row_id) = id.row_id {
if let Some(record_subscriptions) = subscriptions.record.get_mut(&row_id) {
record_subscriptions.retain(|sub| {
@@ -188,13 +192,17 @@ impl PerConnectionState {
});
}
if subscriptions.table.is_empty() && subscriptions.record.is_empty() {
lock.remove(&id.table_name);
}
}
subscriptions.table.is_empty() && subscriptions.record.is_empty()
};
if lock.is_empty() {
conn.preupdate_hook(NO_HOOK);
if remove_subscription_entry_for_table {
// NOTE: Only write lock across all tables when necessary.
read_lock.with_upgraded(|lock| {
lock.remove(&id.table_name);
if lock.is_empty() {
conn.preupdate_hook(NO_HOOK);
}
});
}
}
@@ -225,7 +233,7 @@ impl PerConnectionState {
// If there are no matching subscriptions, skip.
{
let lock = state.subscriptions.read();
let Some(subscriptions) = lock.get(&qualified_table_name) else {
let Some(subscriptions) = lock.get(&qualified_table_name).map(|r| r.read()) else {
return;
};
@@ -289,6 +297,7 @@ impl PerConnectionState {
let subscriptions = lock.entry(api.qualified_name().clone()).or_default();
subscriptions
.write()
.record
.entry(row_id)
.or_default()
@@ -347,7 +356,7 @@ impl PerConnectionState {
let empty = lock.is_empty();
let subscriptions = lock.entry(api.qualified_name().clone()).or_default();
subscriptions.table.push(Subscription {
subscriptions.write().table.push(Subscription {
subscription_id,
record_api_name: api.api_name().to_string(),
user,
@@ -522,7 +531,7 @@ impl SubscriptionManager {
let mut count: usize = 0;
for state in self.state.connections.read().values() {
for (_table_name, subs) in state.subscriptions.read().iter() {
for record in subs.record.values() {
for record in subs.read().record.values() {
count += record.len();
}
}
@@ -535,7 +544,7 @@ impl SubscriptionManager {
let mut count: usize = 0;
for state in self.state.connections.read().values() {
for (_table_name, subs) in state.subscriptions.read().iter() {
count += subs.table.len();
count += subs.read().table.len();
}
}
return count;
@@ -662,7 +671,7 @@ fn hook_continuation(conn: &rusqlite::Connection, s: ContinuationState) {
let mut read_lock = state.subscriptions.upgradable_read();
let (dead_record_subscriptions, dead_table_subscriptions) = {
let Some(subscriptions) = read_lock.get(&table_name) else {
let Some(subscriptions) = read_lock.get(&table_name).map(|r| r.read()) else {
return;
};
@@ -685,44 +694,49 @@ fn hook_continuation(conn: &rusqlite::Connection, s: ContinuationState) {
.as_ref()
.is_some_and(|dead| !dead.is_empty() || action == RecordAction::Delete);
// Finally clean up.
if !dead_table_subscriptions.is_empty() || cleanup_record_subscriptions {
// NOTE: we're locking for all tables. If this is a bottlneck we can always lock more
// granularly.
read_lock.with_upgraded(|lock| {
let Some(subscriptions) = lock.get_mut(&table_name) else {
return;
};
// .Clean up if necessary
if dead_table_subscriptions.is_empty() && !cleanup_record_subscriptions {
return;
}
// Record subscription cleanup.
if let Some(dead_record_subscriptions) = dead_record_subscriptions {
if action == RecordAction::Delete {
// This is unique for record subscriptions: if the record is deleted, cancel all
// subscriptions.
let remove_subscription_entry_for_table = {
let Some(mut subscriptions) = read_lock.get(&table_name).map(|l| l.write()) else {
return;
};
// Record subscription cleanup.
if let Some(dead_record_subscriptions) = dead_record_subscriptions {
if action == RecordAction::Delete {
// This is unique for record subscriptions: if the record is deleted, cancel all
// subscriptions.
subscriptions.record.remove(&rowid);
} else if let Some(m) = subscriptions.record.get_mut(&rowid) {
for idx in dead_record_subscriptions.iter().rev() {
m.swap_remove(*idx);
}
if m.is_empty() {
subscriptions.record.remove(&rowid);
} else if let Some(m) = subscriptions.record.get_mut(&rowid) {
for idx in dead_record_subscriptions.iter().rev() {
m.swap_remove(*idx);
}
if m.is_empty() {
subscriptions.record.remove(&rowid);
}
}
}
}
// Table subscription cleanup.
{
for idx in dead_table_subscriptions.iter().rev() {
subscriptions.table.swap_remove(*idx);
}
// Table subscription cleanup.
{
for idx in dead_table_subscriptions.iter().rev() {
subscriptions.table.swap_remove(*idx);
}
}
if subscriptions.table.is_empty() && subscriptions.table.is_empty() {
lock.remove(&table_name);
if lock.is_empty() {
conn.preupdate_hook(NO_HOOK);
}
subscriptions.table.is_empty() && subscriptions.table.is_empty()
};
if remove_subscription_entry_for_table {
// NOTE: Only write lock across all tables when necessary.
read_lock.with_upgraded(|lock| {
lock.remove(&table_name);
if lock.is_empty() {
conn.preupdate_hook(NO_HOOK);
}
});
}