Minor: simplify some of the JS runtime integration.

This commit is contained in:
Sebastian Jeltsch
2025-04-28 17:53:46 +02:00
parent d97e9a0366
commit c1a14c249f
+50 -63
View File
@@ -64,7 +64,7 @@ struct DispatchArgs {
enum Message {
Run(Box<dyn (FnOnce(&mut Runtime)) + Send + Sync>),
Dispatch(DispatchArgs),
HttpDispatch(DispatchArgs),
CallFunction(
Option<Module>,
&'static str,
@@ -75,7 +75,7 @@ enum Message {
}
struct State {
sender: async_channel::Sender<Message>,
private_sender: async_channel::Sender<Message>,
connection: Mutex<Option<trailbase_sqlite::Connection>>,
}
@@ -86,7 +86,7 @@ struct RuntimeSingleton {
handle: Option<std::thread::JoinHandle<()>>,
// Shared sender.
sender: async_channel::Sender<Message>,
shared_sender: async_channel::Sender<Message>,
// Isolate state.
state: Vec<State>,
@@ -96,8 +96,8 @@ impl Drop for RuntimeSingleton {
fn drop(&mut self) {
if let Some(handle) = self.handle.take() {
self.state.clear();
if handle.join().is_err() {
error!("Failed to join main rt thread");
if let Err(err) = handle.join() {
error!("Failed to join main rt thread: {err:?}");
}
}
}
@@ -137,7 +137,7 @@ impl RuntimeSingleton {
Message::Run(f) => {
f(runtime);
}
Message::Dispatch(args) => {
Message::HttpDispatch(args) => {
let channel = args.reply;
let uri = args.uri.clone();
let promise = match runtime.call_function_immediate::<Promise<JsResponse>>(
@@ -280,7 +280,7 @@ impl RuntimeSingleton {
return (
State {
sender,
private_sender: sender,
connection: Mutex::new(None),
},
receiver,
@@ -341,7 +341,7 @@ impl RuntimeSingleton {
return RuntimeSingleton {
n_threads,
sender: shared_sender,
shared_sender,
handle,
state,
};
@@ -360,25 +360,23 @@ impl RuntimeSingleton {
tokio_runtime,
)?;
let idx = index;
runtime
.register_function("isolate_id", move |_args: &[serde_json::Value]| {
return Ok(serde_json::json!(idx));
return Ok(serde_json::json!(index));
})
.expect("Failed to register 'isolate_id' function");
let idx = index;
runtime.register_async_function("query", move |args: Vec<serde_json::Value>| {
Box::pin(async move {
let query: String = get_arg(&args, 0)?;
let json_params: Vec<serde_json::Value> = get_arg(&args, 1)?;
let mut params: Vec<trailbase_sqlite::Value> = vec![];
for value in json_params {
params.push(json_value_to_param(value)?);
}
let params: Vec<trailbase_sqlite::Value> = json_params
.into_iter()
.map(json_value_to_param)
.collect::<Result<Vec<_>, _>>()?;
let Some(conn) = get_runtime(None).state[idx].connection.lock().clone() else {
let Some(conn) = get_runtime(None).state[index].connection.lock().clone() else {
return Err(rustyscript::Error::Runtime(
"missing db connection".to_string(),
));
@@ -396,18 +394,17 @@ impl RuntimeSingleton {
})
})?;
let idx = index;
runtime.register_async_function("execute", move |args: Vec<serde_json::Value>| {
Box::pin(async move {
let query: String = get_arg(&args, 0)?;
let json_params: Vec<serde_json::Value> = get_arg(&args, 1)?;
let mut params: Vec<trailbase_sqlite::Value> = vec![];
for value in json_params {
params.push(json_value_to_param(value)?);
}
let params: Vec<trailbase_sqlite::Value> = json_params
.into_iter()
.map(json_value_to_param)
.collect::<Result<Vec<_>, _>>()?;
let Some(conn) = get_runtime(None).state[idx].connection.lock().clone() else {
let Some(conn) = get_runtime(None).state[index].connection.lock().clone() else {
return Err(rustyscript::Error::Runtime(
"missing db connection".to_string(),
));
@@ -489,20 +486,6 @@ impl RuntimeHandle {
fn state(&self) -> &'static Vec<State> {
return &self.runtime.state;
}
#[allow(unused)]
async fn call_function<T>(
&self,
module: Option<Module>,
name: &'static str,
args: Vec<serde_json::Value>,
) -> Result<T, AnyError>
where
T: serde::de::DeserializeOwned,
{
// Use sender shared by all isolates for round robin.
return call_function::<T>(&self.runtime.sender, module, name, args).await;
}
}
async fn call_function<T>(
@@ -522,7 +505,7 @@ where
return Ok(serde_json::from_value::<T>(resp_receiver.await??)?);
}
pub fn json_value_to_param(
fn json_value_to_param(
value: serde_json::Value,
) -> Result<trailbase_sqlite::Value, rustyscript::Error> {
use rustyscript::Error;
@@ -620,8 +603,8 @@ fn add_route_to_router(
debug!("dispatch {method} {uri}");
runtime_handle
.runtime
.sender
.send(Message::Dispatch(DispatchArgs {
.shared_sender
.send(Message::HttpDispatch(DispatchArgs {
method,
route_path,
uri: uri.to_string(),
@@ -708,7 +691,7 @@ async fn install_routes_and_jobs(
let runtime_handle_clone = runtime_handle.clone();
let routers_clone = routers.clone();
if let Err(err) = state
.sender
.private_sender
.send(Message::Run(Box::new(move |runtime: &mut Runtime| {
// First install a native callbacks.
//
@@ -754,7 +737,7 @@ async fn install_routes_and_jobs(
};
if let Some(msg) = call_function::<Option<String>>(
&first_isolate.sender,
&first_isolate.private_sender,
None,
"__dispatchCron",
vec![id_value],
@@ -819,7 +802,7 @@ async fn await_loading_module(state: &State, module: Module) -> Result<(), AnyEr
let (sender, receiver) = oneshot::channel::<Result<(), AnyError>>();
state
.sender
.private_sender
.send(Message::LoadModule(module, sender))
.await?;
@@ -918,7 +901,7 @@ mod tests {
let handle = RuntimeHandle::new();
handle
.runtime
.sender
.shared_sender
.send(Message::Run(Box::new(|_rt| {
sender.send(5).unwrap();
})))
@@ -943,10 +926,14 @@ mod tests {
"#,
);
let result = handle
.call_function::<String>(Some(module), "test_fun", vec![])
.await
.unwrap();
let result = call_function::<String>(
&handle.runtime.shared_sender,
Some(module),
"test_fun",
vec![],
)
.await
.unwrap();
assert_eq!("test0", result);
}
@@ -978,14 +965,14 @@ mod tests {
"#,
);
let result = handle
.call_function::<Vec<Vec<serde_json::Value>>>(
Some(module),
"test_query",
vec![serde_json::json!("SELECT * FROM test")],
)
.await
.unwrap();
let result = call_function::<Vec<Vec<serde_json::Value>>>(
&handle.runtime.shared_sender,
Some(module),
"test_query",
vec![serde_json::json!("SELECT * FROM test")],
)
.await
.unwrap();
assert_eq!(
vec![
@@ -1026,14 +1013,14 @@ mod tests {
"#,
);
let _result = handle
.call_function::<i64>(
Some(module),
"test_execute",
vec![serde_json::json!("DELETE FROM test")],
)
.await
.unwrap();
let _result = call_function::<i64>(
&handle.runtime.shared_sender,
Some(module),
"test_execute",
vec![serde_json::json!("DELETE FROM test")],
)
.await
.unwrap();
let count: i64 = conn
.read_query_row_f("SELECT COUNT(*) FROM test", (), |row| row.get(0))