Make connection construction async.

This commit is contained in:
Sebastian Jeltsch
2026-04-30 14:27:25 +02:00
parent 2a2d231492
commit f87e3de3f1
+36 -120
View File
@@ -598,20 +598,21 @@ async fn build_record_apis(
connection_manager: ConnectionManager,
record_api_configs: Reactive<Vec<RecordApiConfig>>,
) -> AsyncReactive<HashMap<String, RecordApi>> {
let x = record_api_configs.derive_unchecked_async(move |DeriveInput { prev, dep }| {
let derived = record_api_configs.derive_unchecked_async(move |DeriveInput { prev, dep }| {
let connection_manager = connection_manager.clone();
let (prev, configs) = (prev.cloned(), dep.clone());
println!("BAR {dep:?}");
let configs: Arc<Vec<RecordApiConfig>> = dep.clone();
let prev = prev.cloned();
return Box::pin(async move {
// TODO: We would need to make the update async if connection building is async or make
// record APIs build the connection+metadata lazily.
// Re-use existing connection when possible to keep subscriptions alive.
let get_conn = |api_name: &str,
attached_databases: &[String]|
-> Result<_, ConnectionError> {
//
// WARN: We need to be very careful to how we rebuild RecordAPIs, since long-lived
// subscriptions may be tied to specific connections. So we need to keep connection alive
// whenever possible, e.g. an ACL changing for one API isn't a good reason to drop
// subscriptions on all APIs.
let get_conn = async |api_name: &str,
attached_databases: &[String]|
-> Result<_, ConnectionError> {
if let Some((_, candidate)) =
prev
.as_ref()
@@ -638,121 +639,36 @@ async fn build_record_apis(
return Ok((conn, metadata));
};
return configs
.iter()
.filter_map(|config| {
let (conn, metadata) = get_conn(config.name(), &config.attached_databases)
.map_err(|err| {
log::error!("Failed to get conn for record API {}: {err}", config.name());
return err;
})
.ok()?;
let mut next: HashMap<String, RecordApi> = HashMap::new();
return match build_record_api(conn, metadata, config.clone()) {
Ok(api) => Some((api.api_name().to_string(), api)),
Err(err) => {
log::error!("Failed to build record API {}: {err}", config.name());
None
}
};
})
.collect();
for config in configs.iter() {
let (conn, metadata) = match get_conn(config.name(), &config.attached_databases).await {
Ok(x) => x,
Err(err) => {
log::error!("Failed to get conn for record API {}: {err}", config.name());
continue;
}
};
match build_record_api(conn, metadata, config.clone()) {
Ok(api) => {
next.insert(api.api_name().to_string(), api);
}
Err(err) => {
log::error!("Failed to build record API {}: {err}", config.name());
}
};
}
return next;
});
});
let _make_sure_snapshot_is_valid = x.ptr().await;
// Give the snapshot a chance to update, otherwise `derived.snapshot()` will return only the
// default empty map.
let _make_sure_snapshot_is_valid = derived.ptr().await;
return x;
// TODO: Rather than init and then observe, could/should this be a derive?;
// let record_apis: Reactive<HashMap<String, RecordApi>> = Reactive::new(
// record_api_configs
// .value()
// .into_iter()
// .map(|config| {
// let ConnectionEntry {
// connection: conn,
// metadata,
// } = if config.attached_databases.is_empty() {
// connection_manager.main_entry()
// } else {
// connection_manager
// .get_entry(
// true,
// Some(config.attached_databases.iter().cloned().collect()),
// )
// .map_err(|err| err.to_string())
// .expect("startup")
// };
//
// let api = build_record_api(conn, metadata, config).expect("startup");
// return (api.api_name().to_string(), api);
// })
// .collect(),
// );
//
// // Rebuild RecordApi instances when config changes.
// //
// // WARN: We need to be very careful to how we rebuild RecordAPIs, since long-lived
// // subscriptions may be tied to specific connections. So we need to keep connection alive
// // whenever possible, e.g. an ACL changing for one API isn't a good reason to drop
// // subscriptions on all APIs.
// {
// let record_apis = record_apis.clone();
// record_api_configs.add_observer(move |record_api_configs| {
// // TODO: We would need to make the update async if connection building is async or make
// // record APIs build the connection+metadata lazily.
//
// record_apis.update_unchecked(|old| {
// // Re-use existing connection when possible to keep subscriptions alive.
// let get_conn =
// |api_name: &str, attached_databases: &[String]| -> Result<_, ConnectionError> {
// if let Some((_, candidate)) = old.iter().find(|(_name, api)| api.api_name() == api_name)
// && candidate.attached_databases() == attached_databases
// {
// return Ok((
// candidate.conn().clone(),
// candidate.connection_metadata().clone(),
// ));
// };
//
// let ConnectionEntry {
// connection: conn,
// metadata,
// } = if attached_databases.is_empty() {
// connection_manager.main_entry()
// } else {
// connection_manager
// .get_entry(true, Some(attached_databases.iter().cloned().collect()))?
// };
//
// return Ok((conn, metadata));
// };
//
// return record_api_configs
// .iter()
// .filter_map(|config| {
// let (conn, metadata) = get_conn(config.name(), &config.attached_databases)
// .map_err(|err| {
// log::error!("Failed to get conn for record API {}: {err}", config.name());
// return err;
// })
// .ok()?;
//
// return match build_record_api(conn, metadata, config.clone()) {
// Ok(api) => Some((api.api_name().to_string(), api)),
// Err(err) => {
// log::error!("Failed to build record API {}: {err}", config.name());
// None
// }
// };
// })
// .collect();
// });
// });
// }
//
// return record_apis;
return derived;
}
fn build_record_api(