diff --git a/Cargo.lock b/Cargo.lock index 1fa8fac2..28757b85 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6749,6 +6749,7 @@ dependencies = [ "tower-service", "tracing", "tracing-subscriber", + "trailbase-apalis", "trailbase-assets", "trailbase-extension", "trailbase-refinery-core", @@ -6813,7 +6814,6 @@ dependencies = [ "serde_json", "tokio", "trailbase", - "trailbase-sqlite", "utoipa", "utoipa-swagger-ui", "uuid", diff --git a/Cargo.toml b/Cargo.toml index df6b5531..c8b705cb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,6 +61,7 @@ rusqlite = { version = "0.34.0", default-features = false, features = ["bundled" tokio = { version = "^1.38.0", features = ["macros", "rt-multi-thread", "fs", "signal", "time", "sync"] } trailbase-refinery-core = { path = "vendor/refinery/refinery_core", version = "0.8.16", default-features = false, features = ["rusqlite-bundled"] } trailbase-refinery-macros = { path = "vendor/refinery/refinery_macros", version = "0.8.15" } +trailbase-apalis = { path = "trailbase-apalis", version = "0.1.0" } trailbase-assets = { path = "trailbase-assets", version = "0.1.0" } trailbase-sqlean = { path = "vendor/sqlean", version = "0.0.2" } trailbase-extension = { path = "trailbase-extension", version = "0.2.0" } diff --git a/trailbase-apalis/src/sqlite.rs b/trailbase-apalis/src/sqlite.rs index 692e5743..e7141715 100644 --- a/trailbase-apalis/src/sqlite.rs +++ b/trailbase-apalis/src/sqlite.rs @@ -74,7 +74,7 @@ impl SqliteStorage<()> { .execute_batch( r#" PRAGMA journal_mode = 'WAL'; - PRAGMA temp_store = 2; + PRAGMA temp_store = MEMORY; PRAGMA synchronous = NORMAL; PRAGMA cache_size = 64000; "#, diff --git a/trailbase-cli/Cargo.toml b/trailbase-cli/Cargo.toml index 7abac8ce..30f9c262 100644 --- a/trailbase-cli/Cargo.toml +++ b/trailbase-cli/Cargo.toml @@ -17,7 +17,6 @@ chrono = "^0.4.38" clap = { version = "^4.4.11", features=["derive", "env"] } env_logger = { workspace = true } trailbase = { workspace = true } -trailbase-sqlite = { path = "../trailbase-sqlite" } log = "^0.4.21" mimalloc = { version = "^0.1.41", default-features = false } serde = { version = "^1.0.203", features = ["derive"] } diff --git a/trailbase-cli/src/bin/trail.rs b/trailbase-cli/src/bin/trail.rs index 5fe20a69..a54fc6bb 100644 --- a/trailbase-cli/src/bin/trail.rs +++ b/trailbase-cli/src/bin/trail.rs @@ -50,10 +50,7 @@ impl DbUser { } } -async fn get_user_by_email( - conn: &trailbase_sqlite::Connection, - email: &str, -) -> Result { +async fn get_user_by_email(conn: &api::Connection, email: &str) -> Result { if let Some(user) = conn .read_query_value::( format!("SELECT * FROM {USER_TABLE} WHERE email = $1"), @@ -157,10 +154,7 @@ async fn async_main() -> Result<(), BoxError> { Some(SubCommands::Admin { cmd }) => { init_logger(false); - let conn = trailbase_sqlite::Connection::new( - || api::connect_sqlite(Some(data_dir.main_db_path()), None), - None, - )?; + let (conn, _) = api::init_main_db(Some(&data_dir), None)?; match cmd { Some(AdminSubCommands::List) => { @@ -211,10 +205,7 @@ async fn async_main() -> Result<(), BoxError> { init_logger(false); let data_dir = DataDir(args.data_dir); - let conn = trailbase_sqlite::Connection::new( - || api::connect_sqlite(Some(data_dir.main_db_path()), None), - None, - )?; + let (conn, _) = api::init_main_db(Some(&data_dir), None)?; match cmd { Some(UserSubCommands::ResetPassword { email, password }) => { diff --git a/trailbase-core/Cargo.toml b/trailbase-core/Cargo.toml index 22b0214b..d30fb02b 100644 --- a/trailbase-core/Cargo.toml +++ b/trailbase-core/Cargo.toml @@ -78,6 +78,7 @@ tower-http = { version = "^0.6.0", default-features = false, features = ["cors", tower-service = { version = "0.3.3", default-features = false } tracing = { version = "0.1.40", default-features = false } tracing-subscriber = { version = "0.3.18", default-features = false, features = ["smallvec", "std", "fmt", "json"] } +trailbase-apalis = { workspace = true } trailbase-assets = { workspace = true } trailbase-extension = { workspace = true } trailbase-refinery-core = { workspace = true } diff --git a/trailbase-core/src/admin/list_logs.rs b/trailbase-core/src/admin/list_logs.rs index a4827732..6eade205 100644 --- a/trailbase-core/src/admin/list_logs.rs +++ b/trailbase-core/src/admin/list_logs.rs @@ -389,7 +389,8 @@ mod tests { async fn test_aggregate_rate_computation() { let conn = trailbase_sqlite::Connection::new( move || -> anyhow::Result<_> { - let mut conn_sync = crate::connection::connect_sqlite(None, None).unwrap(); + let mut conn_sync = + crate::connection::connect_rusqlite_without_default_extensions_and_schemas(None).unwrap(); apply_logs_migrations(&mut conn_sync).unwrap(); return Ok(conn_sync); }, diff --git a/trailbase-core/src/app_state.rs b/trailbase-core/src/app_state.rs index edbab0df..ea885321 100644 --- a/trailbase-core/src/app_state.rs +++ b/trailbase-core/src/app_state.rs @@ -10,6 +10,7 @@ use crate::config::{validate_config, write_config_and_vault_textproto}; use crate::data_dir::DataDir; use crate::email::Mailer; use crate::js::RuntimeHandle; +use crate::queue::QueueStorage; use crate::records::RecordApi; use crate::records::subscribe::SubscriptionManager; use crate::scheduler::{JobRegistry, build_job_registry_from_config}; @@ -31,8 +32,9 @@ struct InternalState { record_apis: Computed, Config>, config: ValueNotifier, - logs_conn: trailbase_sqlite::Connection, conn: trailbase_sqlite::Connection, + logs_conn: trailbase_sqlite::Connection, + queue: QueueStorage, jwt: JwtHelper, @@ -57,6 +59,7 @@ pub(crate) struct AppStateArgs { pub config: Config, pub conn: trailbase_sqlite::Connection, pub logs_conn: trailbase_sqlite::Connection, + pub queue: QueueStorage, pub jwt: JwtHelper, pub object_store: Box, pub js_runtime_threads: Option, @@ -141,6 +144,7 @@ impl AppState { config, conn: args.conn.clone(), logs_conn: args.logs_conn, + queue: args.queue, jwt: args.jwt, table_metadata: args.table_metadata.clone(), subscription_manager: SubscriptionManager::new(args.conn, args.table_metadata, record_apis), @@ -182,6 +186,10 @@ impl AppState { return &self.state.logs_conn; } + pub fn queue(&self) -> &QueueStorage { + return &self.state.queue; + } + pub fn version(&self) -> rustc_tools_util::VersionInfo { return rustc_tools_util::get_version_info!(); } @@ -314,7 +322,6 @@ pub async fn test_state(options: Option) -> anyhow::Result) -> anyhow::Result anyhow::Result { - let mut conn = crate::connection::connect_sqlite(None, None)?; - apply_user_migrations(&mut conn)?; - let _new_db = apply_main_migrations(&mut conn, None)?; - return Ok(conn); - }, - None, - )? - }; - - let logs_conn = { - trailbase_sqlite::Connection::new( - || -> anyhow::Result { - let mut conn = crate::connection::connect_sqlite(None, None)?; - apply_logs_migrations(&mut conn)?; - return Ok(conn); - }, - None, - )? - }; + let (conn, new) = crate::connection::init_main_db(None, None)?; + assert!(new); + let logs_conn = crate::connection::init_logs_db(None)?; let table_metadata = TableMetadataCache::new(conn.clone()).await?; @@ -453,6 +441,7 @@ pub async fn test_state(options: Option) -> anyhow::Result, + extensions: Option>, +) -> Result<(Connection, bool), ConnectionError> { + let new_db = Mutex::new(false); + + let main_path = data_dir.map(|d| d.main_db_path()); + let migrations_path = data_dir.map(|d| d.migrations_path()); + + let conn = trailbase_sqlite::Connection::new( + || -> Result<_, ConnectionError> { + let mut conn = connect_rusqlite(main_path.clone(), extensions.clone())?; + + *(new_db.lock()) |= apply_main_migrations(&mut conn, migrations_path.clone())?; + + return Ok(conn); + }, + Some(trailbase_sqlite::connection::Options { + n_read_threads: match (data_dir, std::thread::available_parallelism()) { + (None, _) => 0, + (Some(_), Ok(n)) => n.get().clamp(2, 4), + (Some(_), Err(_)) => 4, + }, + ..Default::default() + }), + )?; + + return Ok((conn, *new_db.lock())); +} + +pub(crate) fn init_logs_db(data_dir: Option<&DataDir>) -> Result { + let path = data_dir.map(|d| d.logs_db_path()); + + return trailbase_sqlite::Connection::new( + || -> Result<_, ConnectionError> { + let mut conn = connect_rusqlite_without_default_extensions_and_schemas(path.clone())?; + + // Turn off secure_deletions, i.e. don't wipe the memory with zeros. + conn.query_row("PRAGMA secure_delete = FALSE", (), |_row| Ok(()))?; + + // Sync less often + conn.execute("PRAGMA synchronous = 1", ())?; + + apply_logs_migrations(&mut conn)?; + return Ok(conn); + }, + None, + ); +} + +pub(crate) fn connect_rusqlite( path: Option, extensions: Option>, ) -> Result { @@ -8,3 +79,22 @@ pub fn connect_sqlite( return trailbase_extension::connect_sqlite(path, extensions); } + +pub(crate) fn connect_rusqlite_without_default_extensions_and_schemas( + path: Option, +) -> Result { + let conn = if let Some(p) = path { + use rusqlite::OpenFlags; + let flags = OpenFlags::SQLITE_OPEN_READ_WRITE + | OpenFlags::SQLITE_OPEN_CREATE + | OpenFlags::SQLITE_OPEN_NO_MUTEX; + + rusqlite::Connection::open_with_flags(p, flags)? + } else { + rusqlite::Connection::open_in_memory()? + }; + + trailbase_extension::apply_default_pragmas(&conn)?; + + return Ok(conn); +} diff --git a/trailbase-core/src/data_dir.rs b/trailbase-core/src/data_dir.rs index 01944ddb..04fd2473 100644 --- a/trailbase-core/src/data_dir.rs +++ b/trailbase-core/src/data_dir.rs @@ -27,6 +27,10 @@ impl DataDir { return self.data_path().join("logs.db"); } + pub fn queue_db_path(&self) -> PathBuf { + return self.data_path().join("queue.db"); + } + pub fn data_path(&self) -> PathBuf { return self.0.join("data/"); } diff --git a/trailbase-core/src/lib.rs b/trailbase-core/src/lib.rs index 6904bc6f..ba7d69ad 100644 --- a/trailbase-core/src/lib.rs +++ b/trailbase-core/src/lib.rs @@ -18,6 +18,7 @@ mod extract; mod js; mod listing; mod migrations; +mod queue; mod scheduler; mod server; mod table_metadata; @@ -61,7 +62,7 @@ pub mod api { pub use crate::admin::user::{CreateUserRequest, create_user_handler}; pub use crate::auth::api::login::login_with_password; pub use crate::auth::{JwtHelper, TokenClaims, force_password_reset}; - pub use crate::connection::connect_sqlite; + pub use crate::connection::{Connection, init_main_db}; pub use crate::email::{Email, EmailError}; pub use crate::migrations::new_unique_migration_filename; pub use crate::records::json_schema::build_api_json_schema; diff --git a/trailbase-core/src/migrations.rs b/trailbase-core/src/migrations.rs index 9919f6bf..91438ed1 100644 --- a/trailbase-core/src/migrations.rs +++ b/trailbase-core/src/migrations.rs @@ -90,27 +90,6 @@ pub(crate) fn apply_main_migrations( return Ok(new_db); } -#[cfg(test)] -pub(crate) fn apply_user_migrations( - user_conn: &mut rusqlite::Connection, -) -> Result<(), trailbase_refinery_core::Error> { - let mut runner = main::migrations::runner(); - runner.set_migration_table_name(MIGRATION_TABLE_NAME); - - let report = runner.run(user_conn).map_err(|err| { - error!("User migrations: {err}"); - return err; - })?; - - if cfg!(test) { - debug!("user migrations: {report:?}"); - } else { - info!("user migrations: {report:?}"); - } - - return Ok(()); -} - pub(crate) fn apply_logs_migrations( logs_conn: &mut rusqlite::Connection, ) -> Result<(), trailbase_refinery_core::Error> { diff --git a/trailbase-core/src/queue.rs b/trailbase-core/src/queue.rs new file mode 100644 index 00000000..f93157d8 --- /dev/null +++ b/trailbase-core/src/queue.rs @@ -0,0 +1,41 @@ +use serde::{Deserialize, Serialize}; +use thiserror::Error; +use trailbase_apalis::sqlite::SqliteStorage; + +use crate::data_dir::DataDir; + +#[derive(Debug, Error)] +pub enum QueueError { + #[error("SQLite ext error: {0}")] + SqliteExtension(#[from] trailbase_extension::Error), + #[error("SQLite error: {0}")] + Sqlite(#[from] trailbase_sqlite::Error), +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum Job { + SendEmail(), +} + +pub type QueueStorage = SqliteStorage; + +pub(crate) async fn init_queue_storage( + data_dir: Option<&DataDir>, +) -> Result { + let queue_path = data_dir.map(|d| d.queue_db_path()); + let conn = trailbase_sqlite::Connection::new( + || -> Result<_, trailbase_sqlite::Error> { + return Ok( + crate::connection::connect_rusqlite_without_default_extensions_and_schemas( + queue_path.clone(), + )?, + ); + }, + None, + )?; + + SqliteStorage::setup(&conn).await?; + + let config = trailbase_apalis::Config::new("apalis::test"); + return Ok(SqliteStorage::new_with_config(conn, config)); +} diff --git a/trailbase-core/src/server/init.rs b/trailbase-core/src/server/init.rs index c7eb0f57..2e3b0827 100644 --- a/trailbase-core/src/server/init.rs +++ b/trailbase-core/src/server/init.rs @@ -1,30 +1,21 @@ use log::*; -use parking_lot::Mutex; use std::path::PathBuf; -use std::sync::Arc; use thiserror::Error; use crate::app_state::{AppState, AppStateArgs, build_objectstore}; use crate::auth::jwt::{JwtHelper, JwtHelperError}; use crate::config::load_or_init_config_textproto; use crate::constants::USER_TABLE; -use crate::migrations::{apply_logs_migrations, apply_main_migrations}; use crate::rand::generate_random_string; use crate::server::DataDir; use crate::table_metadata::TableMetadataCache; #[derive(Debug, Error)] pub enum InitError { - #[error("SQLite extension error: {0}")] - SqliteExtension(#[from] trailbase_extension::Error), - #[error("TB SQLite error: {0}")] + #[error("SQLite error: {0}")] Sqlite(#[from] trailbase_sqlite::Error), - #[error("Rusqlite error: {0}")] - Rusqlite(#[from] rusqlite::Error), - #[error("RusqliteFromSql error: {0}")] - FromSql(#[from] rusqlite::types::FromSqlError), - #[error("DB Migration error: {0}")] - Migration(#[from] trailbase_refinery_core::Error), + #[error("Connection error: {0}")] + Connection(#[from] crate::connection::ConnectionError), #[error("IO error: {0}")] IO(#[from] std::io::Error), #[error("Config error: {0}")] @@ -43,6 +34,8 @@ pub enum InitError { ScriptError(String), #[error("ObjectStore error: {0}")] ObjectStore(#[from] object_store::Error), + #[error("Queue error: {0}")] + Queue(#[from] crate::queue::QueueError), } #[derive(Default)] @@ -62,43 +55,14 @@ pub async fn init_app_state( data_dir.ensure_directory_structure().await?; // Then open or init new databases. - let logs_conn = { - trailbase_sqlite::Connection::new( - || -> Result<_, InitError> { - let mut conn = init_logs_db(&data_dir)?; - apply_logs_migrations(&mut conn)?; - return Ok(conn); - }, - None, - )? - }; + let logs_conn = crate::connection::init_logs_db(Some(&data_dir))?; + + // TODO: At this early stage we're using an in-memory db. Go persistent before rolling out. + let queue = crate::queue::init_queue_storage(None).await?; // Open or init the main db. Note that we derive whether a new DB was initialized based on // whether the V1 migration had to be applied. Should be fairly robust. - let (conn, new_db): (trailbase_sqlite::Connection, bool) = { - let new_db = Arc::new(Mutex::new(false)); - - let new_db_clone = new_db.clone(); - let conn = trailbase_sqlite::Connection::new( - || -> Result<_, InitError> { - let mut conn = crate::connection::connect_sqlite(Some(data_dir.main_db_path()), None)?; - *(new_db_clone.lock()) |= - apply_main_migrations(&mut conn, Some(data_dir.migrations_path()))?; - return Ok(conn); - }, - Some(trailbase_sqlite::connection::Options { - n_read_threads: if let Ok(n) = std::thread::available_parallelism() { - n.get().clamp(2, 4) - } else { - 4 - }, - ..Default::default() - }), - )?; - - let new_db: bool = *new_db.lock(); - (conn, new_db) - }; + let (conn, new_db) = crate::connection::init_main_db(Some(&data_dir), None)?; let table_metadata = TableMetadataCache::new(conn.clone()).await?; @@ -158,6 +122,7 @@ pub async fn init_app_state( config, conn, logs_conn, + queue, jwt, object_store, js_runtime_threads: args.js_runtime_threads, @@ -214,15 +179,3 @@ pub async fn init_app_state( return Ok((new_db, app_state)); } - -fn init_logs_db(data_dir: &DataDir) -> Result { - let conn = crate::connection::connect_sqlite(data_dir.logs_db_path().into(), None)?; - - // Turn off secure_deletions, i.e. don't wipe the memory with zeros. - conn.query_row("PRAGMA secure_delete = FALSE", (), |_row| Ok(()))?; - - // Sync less often - conn.execute("PRAGMA synchronous = 1", ())?; - - return Ok(conn); -} diff --git a/trailbase-extension/src/lib.rs b/trailbase-extension/src/lib.rs index b446f140..2f81c3b6 100644 --- a/trailbase-extension/src/lib.rs +++ b/trailbase-extension/src/lib.rs @@ -19,6 +19,36 @@ pub enum Error { Other(#[source] Box), } +pub fn apply_default_pragmas(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> { + const CONFIG: &[&str] = &[ + "PRAGMA busy_timeout = 10000", + "PRAGMA journal_mode = WAL", + "PRAGMA journal_size_limit = 200000000", + // Sync the file system less often. + "PRAGMA synchronous = NORMAL", + "PRAGMA foreign_keys = ON", + "PRAGMA temp_store = MEMORY", + "PRAGMA cache_size = -16000", + // TODO: Maybe worth exploring once we have a benchmark, based on + // https://phiresky.github.io/blog/2020/sqlite-performance-tuning/. + // "PRAGMA mmap_size = 30000000000", + // "PRAGMA page_size = 32768", + + // Safety feature around application-defined functions recommended by + // https://sqlite.org/appfunc.html + "PRAGMA trusted_schema = OFF", + ]; + + // NOTE: we're querying here since some pragmas return data. + for pragma in CONFIG { + let mut stmt = conn.prepare(pragma)?; + let mut rows = stmt.query([])?; + let _maybe_row = rows.next()?; + } + + return Ok(()); +} + #[allow(unsafe_code)] pub fn connect_sqlite( path: Option, diff --git a/trailbase-sqlite/src/connection.rs b/trailbase-sqlite/src/connection.rs index 443cf147..696750ad 100644 --- a/trailbase-sqlite/src/connection.rs +++ b/trailbase-sqlite/src/connection.rs @@ -71,7 +71,29 @@ impl Connection { builder: impl Fn() -> std::result::Result, opt: Option, ) -> std::result::Result { - let n_read_threads = { + let spawn = |receiver: Receiver| -> std::result::Result, E> { + let conn = builder()?; + let name = conn.path().and_then(|s| { + // Returns empty string for in-memory databases. + if s.is_empty() { + None + } else { + Some(s.to_string()) + } + }); + if let Some(timeout) = opt.as_ref().map(|o| o.busy_timeout) { + conn.busy_timeout(timeout).expect("busy timeout failed"); + } + + std::thread::spawn(move || event_loop(conn, receiver)); + + return Ok(name); + }; + + let (shared_write_sender, shared_write_receiver) = crossbeam_channel::unbounded::(); + let name = spawn(shared_write_receiver)?; + + let n_read_threads = if name.is_some() { let n_read_threads = match opt.as_ref().map_or(0, |o| o.n_read_threads) { 1 => { warn!( @@ -92,23 +114,11 @@ impl Connection { } n_read_threads + } else { + // We cannot share an in-memory database across threads, they're all independent. + 0 }; - let spawn = |receiver: Receiver| -> std::result::Result { - let conn = builder()?; - let name = conn.path().unwrap_or_default().to_string(); - if let Some(timeout) = opt.as_ref().map(|o| o.busy_timeout) { - conn.busy_timeout(timeout).expect("busy timeout failed"); - } - - std::thread::spawn(move || event_loop(conn, receiver)); - - return Ok(name); - }; - - let (shared_write_sender, shared_write_receiver) = crossbeam_channel::unbounded::(); - let name = spawn(shared_write_receiver)?; - let shared_read_sender = if n_read_threads > 0 { let (shared_read_sender, shared_read_receiver) = crossbeam_channel::unbounded::(); for _ in 0..n_read_threads { @@ -119,7 +129,10 @@ impl Connection { shared_write_sender.clone() }; - debug!("Opened SQLite DB '{name}' with {n_read_threads} dedicated reader threads"); + debug!( + "Opened SQLite DB '{name}' with {n_read_threads} dedicated reader threads", + name = name.as_deref().unwrap_or("") + ); return Ok(Self { reader: shared_read_sender,