Small cleanups and add a placeholder for record filters to subscription impl.

This commit is contained in:
Sebastian Jeltsch
2025-09-24 22:46:00 +02:00
parent b3084aee96
commit 7661f6599c
+52 -33
View File
@@ -116,6 +116,11 @@ pub enum DbEvent {
Error(String),
}
pub enum Filter {
Passthrough,
Record(trailbase_qs::ValueOrComposite),
}
pub struct Subscription {
/// Id uniquely identifying this subscription.
subscription_id: i64,
@@ -123,11 +128,13 @@ pub struct Subscription {
/// hot path to make sure we're getting the latest configuration.
record_api_name: String,
/// Record id present for subscriptions to specific records.
// record_id: Option<trailbase_sqlite::Value>,
/// User associated with subscriber.
user: Option<User>,
/// Channel for sending events to the SSE handler.
sender: async_channel::Sender<Event>,
/// Filter
filter: Filter,
}
/// Internal, shareable state of the cloneable SubscriptionManager.
@@ -278,20 +285,26 @@ impl SubscriptionManager {
if let Ok(ev) = Event::default().json_data(DbEvent::Error("Access denied".into())) {
let _ = sub.sender.try_send(ev);
}
dead_subscriptions.push(idx);
sub.sender.close();
}
continue;
}
match sub.sender.try_send(event.clone()) {
Ok(_) => {}
Err(async_channel::TrySendError::Full(ev)) => {
warn!("Channel full, dropping event: {ev:?}");
}
Err(async_channel::TrySendError::Closed(_ev)) => {
dead_subscriptions.push(idx);
sub.sender.close();
if let Filter::Record(ref _qs) = sub.filter {
// TODO: Implement record-level filtering.
}
if let Err(err) = sub.sender.try_send(event.clone()) {
match err {
async_channel::TrySendError::Full(ev) => {
warn!("Channel full, dropping event: {ev:?}");
}
async_channel::TrySendError::Closed(_ev) => {
dead_subscriptions.push(idx);
sub.sender.close();
}
}
}
}
@@ -300,7 +313,7 @@ impl SubscriptionManager {
}
/// Continuation of the preupdate hook being scheduled on the executor.
fn hook_continuation(conn: &rusqlite::Connection, state: ContinuationState) {
fn hook_continuation(conn: &rusqlite::Connection, s: ContinuationState) {
let ContinuationState {
state,
schema_metadata,
@@ -308,17 +321,17 @@ impl SubscriptionManager {
action,
rowid,
record_values,
} = state;
let s = &state;
} = s;
// If schema_metadata is missing, the config/schema must have changed, thus removing the
// subscriptions.
let Some(schema_metadata) = schema_metadata else {
warn!("Table not found: {table_name:?}. Removing subscriptions");
let mut record_subs = s.record_subscriptions.write();
let mut record_subs = state.record_subscriptions.write();
record_subs.remove(&table_name);
let mut table_subs = s.table_subscriptions.write();
let mut table_subs = state.table_subscriptions.write();
table_subs.remove(&table_name);
if record_subs.is_empty() && table_subs.is_empty() {
@@ -363,12 +376,13 @@ impl SubscriptionManager {
};
'record_subs: {
let mut read_lock = s.record_subscriptions.upgradable_read();
let mut read_lock = state.record_subscriptions.upgradable_read();
let Some(subs) = read_lock.get(&table_name).and_then(|m| m.get(&rowid)) else {
break 'record_subs;
};
let dead_subscriptions = Self::broker_subscriptions(s, conn, subs, true, &record, &event);
let dead_subscriptions =
Self::broker_subscriptions(&state, conn, subs, true, &record, &event);
if dead_subscriptions.is_empty() && action != RecordAction::Delete {
// No cleanup needed.
break 'record_subs;
@@ -385,7 +399,7 @@ impl SubscriptionManager {
if table_subscriptions.is_empty() {
subscriptions.remove(&table_name);
if subscriptions.is_empty() && s.table_subscriptions.read().is_empty() {
if subscriptions.is_empty() && state.table_subscriptions.read().is_empty() {
conn.preupdate_hook(NO_HOOK);
}
}
@@ -403,7 +417,7 @@ impl SubscriptionManager {
if table_subscriptions.is_empty() {
subscriptions.remove(&table_name);
if subscriptions.is_empty() && s.table_subscriptions.read().is_empty() {
if subscriptions.is_empty() && state.table_subscriptions.read().is_empty() {
conn.preupdate_hook(NO_HOOK);
}
}
@@ -413,18 +427,19 @@ impl SubscriptionManager {
}
'table_subs: {
let mut read_lock = s.table_subscriptions.upgradable_read();
let mut read_lock = state.table_subscriptions.upgradable_read();
let Some(subs) = read_lock.get(&table_name) else {
break 'table_subs;
};
let dead_subscriptions = Self::broker_subscriptions(s, conn, subs, false, &record, &event);
let dead_subscriptions =
Self::broker_subscriptions(&state, conn, subs, false, &record, &event);
if dead_subscriptions.is_empty() && action != RecordAction::Delete {
// No cleanup needed.
break 'table_subs;
}
read_lock.with_upgraded(move |subscriptions| {
read_lock.with_upgraded(|subscriptions| {
let Some(table_subscriptions) = subscriptions.get_mut(&table_name) else {
return;
};
@@ -436,7 +451,7 @@ impl SubscriptionManager {
if table_subscriptions.is_empty() {
subscriptions.remove(&table_name);
if subscriptions.is_empty() && s.record_subscriptions.read().is_empty() {
if subscriptions.is_empty() && state.record_subscriptions.read().is_empty() {
conn.preupdate_hook(NO_HOOK);
}
}
@@ -445,10 +460,9 @@ impl SubscriptionManager {
}
fn add_hook(&self) {
let s = self.state.clone();
let write_conn = self.state.conn.write_lock();
let state = self.state.clone();
write_conn.preupdate_hook(Some(
self.state.conn.write_lock().preupdate_hook(Some(
move |action: Action, db: &str, table_name: &str, case: &PreUpdateCase| {
let action: RecordAction = match action {
Action::SQLITE_UPDATE | Action::SQLITE_INSERT | Action::SQLITE_DELETE => action.into(),
@@ -469,13 +483,13 @@ impl SubscriptionManager {
};
// If there are no subscriptions, do nothing.
let record_subs_candidate = s
let record_subs_candidate = state
.record_subscriptions
.read()
.get(&qualified_table_name)
.and_then(|m| m.get(&rowid))
.is_some();
let table_subs_candidate = s
let table_subs_candidate = state
.table_subscriptions
.read()
.get(&qualified_table_name)
@@ -489,9 +503,12 @@ impl SubscriptionManager {
return;
};
let state = ContinuationState {
state: s.clone(),
schema_metadata: s.schema_metadata.value().get_table(&qualified_table_name),
let s = ContinuationState {
state: state.clone(),
schema_metadata: state
.schema_metadata
.value()
.get_table(&qualified_table_name),
action,
table_name: qualified_table_name,
rowid,
@@ -501,8 +518,8 @@ impl SubscriptionManager {
// TODO: Optimization: in cases where there's only table-level access restrictions, we
// could avoid the continuation and even dispatch the subscription handling to a
// different thread entirely to take more work off the SQLite thread.
s.conn.call_and_forget(move |conn| {
Self::hook_continuation(conn, state);
state.conn.call_and_forget(move |conn| {
Self::hook_continuation(conn, s);
});
},
));
@@ -547,6 +564,7 @@ impl SubscriptionManager {
// record_id: Some(record),
user,
sender: sender.clone(),
filter: Filter::Passthrough,
});
empty
@@ -595,6 +613,7 @@ impl SubscriptionManager {
record_api_name: api.api_name().to_string(),
user,
sender: sender.clone(),
filter: Filter::Passthrough,
});
empty