Move SQLite connection out of JS runtime.

This commit is contained in:
Sebastian Jeltsch
2025-04-29 22:20:14 +02:00
parent 141a9148a8
commit 134b1e7a84
3 changed files with 66 additions and 73 deletions
+11 -5
View File
@@ -9,7 +9,7 @@ use crate::config::proto::{Config, RecordApiConfig, S3StorageConfig, hash_config
use crate::config::{validate_config, write_config_and_vault_textproto};
use crate::data_dir::DataDir;
use crate::email::Mailer;
use crate::js::RuntimeHandle;
use crate::js::{RuntimeHandle, register_database_functions};
use crate::queue::Queue;
use crate::records::RecordApi;
use crate::records::subscribe::SubscriptionManager;
@@ -452,17 +452,23 @@ pub async fn test_state(options: Option<TestStateOptions>) -> anyhow::Result<App
});
}
#[cfg(test)]
static START: std::sync::Once = std::sync::Once::new();
fn build_js_runtime(conn: trailbase_sqlite::Connection, threads: Option<usize>) -> RuntimeHandle {
let runtime = if let Some(threads) = threads {
RuntimeHandle::new_with_threads(threads)
RuntimeHandle::singleton_or_init_with_threads(threads)
} else {
RuntimeHandle::new()
RuntimeHandle::singleton()
};
#[cfg(test)]
runtime.set_connection(conn, true);
START.call_once(|| {
register_database_functions(&runtime, conn);
});
#[cfg(not(test))]
runtime.set_connection(conn, false);
register_database_functions(&runtime, conn);
return runtime;
}
+5 -5
View File
@@ -7,20 +7,20 @@ mod fallback {
pub(crate) struct RuntimeHandle {}
impl RuntimeHandle {
pub(crate) fn set_connection(&self, _conn: trailbase_sqlite::Connection, r#override: bool) {}
pub(crate) fn new() -> Self {
pub(crate) fn singleton() -> Self {
return Self {};
}
pub(crate) fn new_with_threads(n_threads: usize) -> Self {
pub(crate) fn singleton_or_init_with_threads(_: usize) -> Self {
return Self {};
}
}
pub fn register_database_functions(_: &RuntimeHandle, _: trailbase_sqlite::Connection) {}
}
#[cfg(feature = "v8")]
pub use trailbase_js::runtime::RuntimeHandle;
pub use trailbase_js::runtime::{RuntimeHandle, register_database_functions};
#[cfg(not(feature = "v8"))]
pub use fallback::*;
+50 -63
View File
@@ -3,7 +3,6 @@ use axum::http::{StatusCode, header::CONTENT_TYPE};
use axum::response::{IntoResponse, Response};
use futures_util::future::LocalBoxFuture;
use log::*;
use parking_lot::Mutex;
use rustyscript::{deno_core::PollEventLoopOptions, init_platform, js_value::Promise};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
@@ -67,7 +66,6 @@ pub enum Message {
pub struct State {
private_sender: kanal::AsyncSender<Message>,
connection: Mutex<Option<trailbase_sqlite::Connection>>,
}
impl State {
@@ -126,7 +124,7 @@ impl Drop for RuntimeSingleton {
pub trait Completer {
fn is_ready(&self, runtime: &mut Runtime) -> bool;
fn resolve<'a>(self: Box<Self>, runtime: &'a mut Runtime) -> LocalBoxFuture<'a, ()>;
fn resolve(self: Box<Self>, runtime: &mut Runtime) -> LocalBoxFuture<'_, ()>;
}
pub struct CompleterImpl<T: serde::de::DeserializeOwned + Send + 'static> {
@@ -144,7 +142,7 @@ impl<T: serde::de::DeserializeOwned + Send + 'static> Completer for CompleterImp
return !self.promise.is_pending(runtime);
}
fn resolve<'a>(self: Box<Self>, runtime: &'a mut Runtime) -> LocalBoxFuture<'a, ()> {
fn resolve(self: Box<Self>, runtime: &mut Runtime) -> LocalBoxFuture<'_, ()> {
let resolver = self.resolver;
let promise = self.promise;
Box::pin(async {
@@ -175,15 +173,9 @@ impl RuntimeSingleton {
let (state, receivers): (Vec<State>, Vec<kanal::AsyncReceiver<Message>>) = (0..n_threads)
.map(|_index| {
let (sender, receiver) = kanal::unbounded_async::<Message>();
let (private_sender, private_receiver) = kanal::unbounded_async::<Message>();
return (
State {
private_sender: sender,
connection: Mutex::new(None),
},
receiver,
);
return (State { private_sender }, private_receiver);
})
.unzip();
@@ -265,17 +257,19 @@ impl RuntimeSingleton {
})
.expect("Failed to register 'isolate_id' function");
return Ok(runtime);
}
}
pub fn register_database_functions(handle: &RuntimeHandle, conn: trailbase_sqlite::Connection) {
fn register(runtime: &mut Runtime, conn: trailbase_sqlite::Connection) -> Result<(), Error> {
let conn_clone = conn.clone();
runtime.register_async_function("query", move |args: Vec<serde_json::Value>| {
let conn = conn_clone.clone();
Box::pin(async move {
let query: String = get_arg(&args, 0)?;
let params = json_values_to_params(get_arg(&args, 1)?)?;
let Some(conn) = get_runtime(None).state[index].connection.lock().clone() else {
return Err(rustyscript::Error::Runtime(
"missing db connection".to_string(),
));
};
let rows = conn
.write_query_rows(query, params)
.await
@@ -294,16 +288,11 @@ impl RuntimeSingleton {
})?;
runtime.register_async_function("execute", move |args: Vec<serde_json::Value>| {
let conn = conn.clone();
Box::pin(async move {
let query: String = get_arg(&args, 0)?;
let params = json_values_to_params(get_arg(&args, 1)?)?;
let Some(conn) = get_runtime(None).state[index].connection.lock().clone() else {
return Err(rustyscript::Error::Runtime(
"missing db connection".to_string(),
));
};
let rows_affected = conn
.execute(query, params)
.await
@@ -313,7 +302,31 @@ impl RuntimeSingleton {
})
})?;
return Ok(runtime);
return Ok(());
}
let states = &handle.runtime.state;
let (sender, receiver) = kanal::bounded(states.len());
for state in states {
let conn = conn.clone();
let sender = sender.clone();
state
.private_sender
.as_sync()
.send(Message::Run(
None,
Box::new(move |_, runtime: &mut Runtime, _| {
register(runtime, conn).expect("startup");
sender.send(()).expect("startup");
}),
))
.expect("startup");
}
for _ in 0..states.len() {
receiver.recv().expect("startup");
}
}
@@ -328,11 +341,9 @@ where
{
return Message::Run(
module,
Box::new(
move |module_handle, runtime: &mut Runtime, _completers: &mut Vec<Box<dyn Completer>>| {
resolver(runtime.call_function_immediate::<T>(module_handle, function_name, &args));
},
),
Box::new(move |module_handle, runtime: &mut Runtime, _| {
resolver(runtime.call_function_immediate::<T>(module_handle, function_name, &args));
}),
);
}
@@ -490,13 +501,13 @@ pub struct RuntimeHandle {
impl RuntimeHandle {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
pub fn singleton() -> Self {
return Self {
runtime: get_runtime(None),
};
}
pub fn new_with_threads(n_threads: usize) -> Self {
pub fn singleton_or_init_with_threads(n_threads: usize) -> Self {
return Self {
runtime: get_runtime(Some(n_threads)),
};
@@ -513,21 +524,6 @@ impl RuntimeHandle {
pub async fn send_to_any_isolate(&self, msg: Message) -> Result<(), kanal::SendError> {
return self.runtime.shared_sender.send(msg).await;
}
pub fn set_connection(&self, conn: trailbase_sqlite::Connection, r#override: bool) {
for s in &self.runtime.state {
let mut lock = s.connection.lock();
if lock.is_some() {
if !r#override {
panic!("connection already set");
}
debug!("connection already set");
} else {
lock.replace(conn.clone());
}
}
}
}
fn json_value_to_param(
@@ -646,7 +642,7 @@ mod tests {
async fn test_runtime_apply() {
let (sender, receiver) = tokio::sync::oneshot::channel::<i64>();
let handle = RuntimeHandle::new();
let handle = RuntimeHandle::singleton();
handle
.runtime
.shared_sender
@@ -663,7 +659,7 @@ mod tests {
}
async fn test_runtime_javascript() {
let handle = RuntimeHandle::new();
let handle = RuntimeHandle::singleton();
tracing_subscriber::Registry::default()
.with(tracing_subscriber::filter::LevelFilter::WARN)
@@ -695,16 +691,6 @@ mod tests {
assert_eq!("test0", receiver.await.unwrap().unwrap());
}
fn override_connection(handle: &RuntimeHandle, conn: trailbase_sqlite::Connection) {
for s in &handle.runtime.state {
let mut lock = s.connection.lock();
if lock.is_some() {
debug!("connection already set");
}
lock.replace(conn.clone());
}
}
async fn test_javascript_query() {
let conn = trailbase_sqlite::Connection::open_in_memory().unwrap();
conn
@@ -716,12 +702,13 @@ mod tests {
.await
.unwrap();
let handle = RuntimeHandle::new();
override_connection(&handle, conn);
let handle = RuntimeHandle::singleton();
register_database_functions(&handle, conn);
tracing_subscriber::Registry::default()
.with(tracing_subscriber::filter::LevelFilter::WARN)
.set_default();
let module = Module::new(
"module.ts",
r#"
@@ -778,8 +765,8 @@ mod tests {
.await
.unwrap();
let handle = RuntimeHandle::new();
override_connection(&handle, conn.clone());
let handle = RuntimeHandle::singleton();
register_database_functions(&handle, conn.clone());
tracing_subscriber::Registry::default()
.with(tracing_subscriber::filter::LevelFilter::WARN)