Streaming "generic" Connection and add .backup() to the point that TB compiles with it and all tests pass while continuing to use sqlite.

This commit is contained in:
Sebastian Jeltsch
2026-04-29 13:42:54 +02:00
parent 601e5128e2
commit 0b0f91191d
12 changed files with 140 additions and 126 deletions
+9 -6
View File
@@ -218,12 +218,15 @@ mod tests {
#[tokio::test]
async fn test_aggregate_rate_computation() {
let conn = trailbase_sqlite::Connection::new(move || -> Result<_, trailbase_sqlite::Error> {
let mut conn_sync =
crate::connection::connect_rusqlite_without_default_extensions_and_schemas(None).unwrap();
apply_logs_migrations(&mut conn_sync).unwrap();
return Ok(conn_sync);
})
let conn = trailbase_sqlite::Connection::with_opts(
move || -> Result<_, trailbase_sqlite::Error> {
let mut conn_sync =
crate::connection::connect_rusqlite_without_default_extensions_and_schemas(None).unwrap();
apply_logs_migrations(&mut conn_sync).unwrap();
return Ok(conn_sync);
},
Default::default(),
)
.unwrap();
let interval_seconds = 600;
+1 -1
View File
@@ -80,7 +80,7 @@ pub async fn query_handler(
.as_ref(),
)?;
let batched_rows_result = trailbase_sqlite::sqlite::execute_batch(&conn, request.query).await;
let batched_rows_result = trailbase_sqlite::execute_batch(&conn, request.query).await;
// In the fallback case we always need to invalidate the cache.
if must_invalidate_schema_cache {
+11 -8
View File
@@ -38,14 +38,17 @@ fn get_conn_and_migration_path(
let json_registry = state.json_schema_registry().clone();
Ok((
trailbase_sqlite::Connection::new(move || {
// TODO: We should load WASM SQLite functions, since migrations may depend on them.
return trailbase_extension::connect_sqlite(
Some(db_path.clone()),
Some(json_registry.clone()),
)
.map_err(|err| trailbase_sqlite::Error::Other(err.into()));
})?,
trailbase_sqlite::Connection::with_opts(
move || {
// TODO: We should load WASM SQLite functions, since migrations may depend on them.
return trailbase_extension::connect_sqlite(
Some(db_path.clone()),
Some(json_registry.clone()),
)
.map_err(|err| trailbase_sqlite::Error::Other(err.into()));
},
Default::default(),
)?,
migration_path,
))
}
+4 -3
View File
@@ -104,9 +104,10 @@ mod tests {
#[tokio::test]
async fn test_some_sqlite_errors_yield_client_errors() {
let conn = trailbase_sqlite::Connection::new(|| {
crate::connection::connect_rusqlite_without_default_extensions_and_schemas(None)
})
let conn = trailbase_sqlite::Connection::with_opts(
|| crate::connection::connect_rusqlite_without_default_extensions_and_schemas(None),
Default::default(),
)
.unwrap();
conn
+11 -8
View File
@@ -409,16 +409,19 @@ pub(super) fn init_logs_db(
pub fn init_session_db(data_dir: Option<&DataDir>) -> Result<Connection, trailbase_sqlite::Error> {
let path = data_dir.map(|d| d.session_db_path());
return trailbase_sqlite::Connection::new(|| -> Result<_, trailbase_sqlite::Error> {
// NOTE: The logs db needs the trailbase extensions for the maxminddb geoip lookup.
let mut conn = connect_rusqlite_without_default_extensions_and_schemas(path.clone())?;
return trailbase_sqlite::Connection::with_opts(
|| -> Result<_, trailbase_sqlite::Error> {
// NOTE: The logs db needs the trailbase extensions for the maxminddb geoip lookup.
let mut conn = connect_rusqlite_without_default_extensions_and_schemas(path.clone())?;
trailbase_extension::register_all_extension_functions(&conn, None)?;
trailbase_extension::register_all_extension_functions(&conn, None)?;
apply_session_migrations(&mut conn)
.map_err(|err| trailbase_sqlite::Error::Other(err.into()))?;
return Ok(conn);
});
apply_session_migrations(&mut conn)
.map_err(|err| trailbase_sqlite::Error::Other(err.into()))?;
return Ok(conn);
},
Default::default(),
);
}
pub(crate) fn connect_rusqlite_without_default_extensions_and_schemas(
+1 -1
View File
@@ -20,7 +20,7 @@ path = "benches/join-order/main.rs"
harness = false
[features]
default = ["pg"]
default = []
generic = ["pg"]
pg = ["dep:postgres", "dep:bytes", "dep:pgrow2serde", "dep:regex"]
+52 -63
View File
@@ -44,29 +44,37 @@ pub struct Connection {
#[allow(unused)]
impl Connection {
pub fn new<E>(builder: impl Fn() -> Result<Executor, E>) -> Result<Self, Error>
pub fn new(exec: Executor) -> Self {
return Self {
id: UNIQUE_CONN_ID.fetch_add(1, Ordering::SeqCst),
exec,
};
}
/// TODO: Should be renamed. Default to sqlite for POC.
pub fn with_opts<E>(
builder: impl Fn() -> Result<rusqlite::Connection, E>,
opts: crate::sqlite::executor::Options,
) -> Result<Self, Error>
where
Error: From<E>,
{
return Ok(Self {
id: UNIQUE_CONN_ID.fetch_add(1, Ordering::SeqCst),
exec: builder()?,
});
return Ok(Self::new(Executor::Sqlite(
crate::sqlite::executor::Executor::new(builder, opts.clone())?,
)));
}
pub fn open_in_memory() -> Result<Self, Error> {
return Self::new(|| -> Result<_, Error> {
let exec = crate::sqlite::executor::Executor::new(
rusqlite::Connection::open_in_memory,
crate::sqlite::executor::Options {
num_threads: Some(1),
..Default::default()
},
)?;
assert_eq!(1, exec.threads());
let inst = Self::with_opts(
rusqlite::Connection::open_in_memory,
crate::sqlite::executor::Options {
num_threads: Some(1),
..Default::default()
},
)?;
assert_eq!(1, inst.threads());
return Ok(Executor::Sqlite(exec));
});
return Ok(inst);
}
pub fn id(&self) -> usize {
@@ -422,12 +430,12 @@ impl Connection {
let query = format!("ATTACH DATABASE '{path}' AS {name} ");
exec.map(move |conn| {
conn.execute(&query, ())?;
return Ok(());
Ok(())
})
}
Executor::Pg(_) => {
// TBD
return Err(Error::NotSupported);
Err(Error::NotSupported)
}
};
}
@@ -438,55 +446,29 @@ impl Connection {
let query = format!("DETACH DATABASE {name}");
exec.map(move |conn| {
conn.execute(&query, ())?;
return Ok(());
Ok(())
})
}
Executor::Pg(_) => {
// TBD
return Err(Error::NotSupported);
Err(Error::NotSupported)
}
};
}
// pub async fn backup(&self, path: impl AsRef<std::path::Path>) -> Result<(), Error> {
// let mut dst = rusqlite::Connection::open(path)?;
// return self
// .exec
// .call_reader(move |src_conn| -> Result<(), Error> {
// use rusqlite::backup::{Backup, StepResult};
//
// let backup = Backup::new(src_conn, &mut dst)?;
// let mut retries = 0;
//
// loop {
// match backup.step(/* num_pages= */ 128)? {
// StepResult::Done => {
// return Ok(());
// }
// StepResult::More => {
// retries = 0;
// // Just continue.
// }
// StepResult::Locked | StepResult::Busy => {
// retries += 1;
// if retries > 100 {
// return Err(Error::Other("Backup failed".into()));
// }
//
// // Retry.
// std::thread::sleep(std::time::Duration::from_micros(100));
// }
// r => {
// // Non-exhaustive enum.
// return Err(Error::Other(
// format!("unexpected backup step result {r:?}").into(),
// ));
// }
// }
// }
// })
// .await;
// }
pub async fn backup(&self, path: impl AsRef<std::path::Path>) -> Result<(), Error> {
return match self.exec {
Executor::Sqlite(ref exec) => {
let mut dst = rusqlite::Connection::open(path)?;
exec
.call_reader(move |src_conn| -> Result<(), Error> {
return crate::sqlite::util::backup(src_conn, &mut dst);
})
.await
}
Executor::Pg(_) => Err(Error::NotSupported),
};
}
pub async fn list_databases(&self) -> Result<Vec<Database>, Error> {
return match self.exec {
@@ -633,6 +615,16 @@ impl<'a> SyncTransactionTrait for Transaction<'a> {
}
}
pub async fn execute_batch(
conn: &Connection,
sql: impl AsRef<str> + Send + 'static,
) -> Result<Option<Rows>, Error> {
return match conn.exec {
Executor::Sqlite(ref exec) => crate::sqlite::batch::execute_batch_impl(exec, sql).await,
Executor::Pg(_) => Err(Error::NotSupported),
};
}
static UNIQUE_CONN_ID: AtomicUsize = AtomicUsize::new(0);
#[cfg(test)]
@@ -658,10 +650,7 @@ mod tests {
#[tokio::test]
async fn generic_pg_poc_test() {
let conn = Connection::new(|| -> Result<_, Error> {
return Ok(Executor::Pg(build_executor()?));
})
.unwrap();
let conn = Connection::new(Executor::Pg(build_executor().unwrap()));
assert_eq!(2, conn.threads());
+3 -2
View File
@@ -28,13 +28,15 @@ mod pg;
#[cfg(not(feature = "generic"))]
mod connection_imports {
pub use super::sqlite::batch::execute_batch;
pub use super::sqlite::connection::{ArcLockGuard, Connection, LockError, LockGuard, Options};
pub use super::sqlite::sync::SyncConnection;
pub use super::sqlite::transaction::Transaction;
}
#[cfg(feature = "generic")]
mod connection_imports {
pub use super::generic::{Connection, SyncConnection};
pub use super::generic::{Connection, SyncConnection, Transaction, execute_batch};
pub use super::sqlite::connection::{ArcLockGuard, LockError, LockGuard, Options};
}
@@ -44,7 +46,6 @@ pub use database::Database;
pub use error::Error;
pub use params::{NamedParamRef, NamedParams, NamedParamsRef, Params};
pub use rows::{Row, Rows, ValueType};
pub use sqlite::transaction::Transaction;
pub use statement::Statement;
pub use traits::SyncConnection as SyncConnectionTrait;
pub use value::{Value, ValueRef};
+9 -2
View File
@@ -5,6 +5,7 @@ use super::util::{columns, from_row};
use crate::error::Error;
use crate::rows::{Column, Rows};
use crate::sqlite::connection::Connection;
use crate::sqlite::executor::Executor;
/// Batch execute SQL statements and return rows of last statement.
///
@@ -15,8 +16,14 @@ pub async fn execute_batch(
conn: &Connection,
sql: impl AsRef<str> + Send + 'static,
) -> Result<Option<Rows>, Error> {
return conn
.exec
return execute_batch_impl(&conn.exec, sql).await;
}
pub(crate) async fn execute_batch_impl(
exec: &Executor,
sql: impl AsRef<str> + Send + 'static,
) -> Result<Option<Rows>, Error> {
return exec
.call_writer(
move |conn: &mut rusqlite::Connection| -> Result<Option<Rows>, Error> {
let batch = rusqlite::Batch::new(conn, sql.as_ref());
+1 -31
View File
@@ -332,37 +332,7 @@ impl Connection {
return self
.exec
.call_reader(move |src_conn| -> Result<(), Error> {
use rusqlite::backup::{Backup, StepResult};
let backup = Backup::new(src_conn, &mut dst)?;
let mut retries = 0;
loop {
match backup.step(/* num_pages= */ 128)? {
StepResult::Done => {
return Ok(());
}
StepResult::More => {
retries = 0;
// Just continue.
}
StepResult::Locked | StepResult::Busy => {
retries += 1;
if retries > 100 {
return Err(Error::Other("Backup failed".into()));
}
// Retry.
std::thread::sleep(std::time::Duration::from_micros(100));
}
r => {
// Non-exhaustive enum.
return Err(Error::Other(
format!("unexpected backup step result {r:?}").into(),
));
}
}
}
return crate::sqlite::util::backup(src_conn, &mut dst);
})
.await;
}
+1 -1
View File
@@ -1,4 +1,4 @@
mod batch;
pub(super) mod batch;
pub(super) mod connection;
pub(super) mod executor;
mod lock;
+37
View File
@@ -145,3 +145,40 @@ pub(crate) fn list_databases(conn: &rusqlite::Connection) -> Result<Vec<Database
return Ok(dbs);
}
pub(crate) fn backup(
src: &rusqlite::Connection,
dst: &mut rusqlite::Connection,
) -> Result<(), Error> {
use rusqlite::backup::{Backup, StepResult};
let backup = Backup::new(src, dst)?;
let mut retries = 0;
loop {
match backup.step(/* num_pages= */ 128)? {
StepResult::Done => {
return Ok(());
}
StepResult::More => {
retries = 0;
// Just continue.
}
StepResult::Locked | StepResult::Busy => {
retries += 1;
if retries > 100 {
return Err(Error::Other("Backup failed".into()));
}
// Retry.
std::thread::sleep(std::time::Duration::from_micros(100));
}
r => {
// Non-exhaustive enum.
return Err(Error::Other(
format!("unexpected backup step result {r:?}").into(),
));
}
}
}
}