Clean up Sqlite connection setup, no-parallel reads for in-memory dbs, and add a placeholder storage for persisting apalis queues.

This commit is contained in:
Sebastian Jeltsch
2025-04-22 22:46:53 +02:00
parent 79e6244a19
commit fe2e8cfda4
17 changed files with 236 additions and 141 deletions

2
Cargo.lock generated
View File

@@ -6749,6 +6749,7 @@ dependencies = [
"tower-service",
"tracing",
"tracing-subscriber",
"trailbase-apalis",
"trailbase-assets",
"trailbase-extension",
"trailbase-refinery-core",
@@ -6813,7 +6814,6 @@ dependencies = [
"serde_json",
"tokio",
"trailbase",
"trailbase-sqlite",
"utoipa",
"utoipa-swagger-ui",
"uuid",

View File

@@ -61,6 +61,7 @@ rusqlite = { version = "0.34.0", default-features = false, features = ["bundled"
tokio = { version = "^1.38.0", features = ["macros", "rt-multi-thread", "fs", "signal", "time", "sync"] }
trailbase-refinery-core = { path = "vendor/refinery/refinery_core", version = "0.8.16", default-features = false, features = ["rusqlite-bundled"] }
trailbase-refinery-macros = { path = "vendor/refinery/refinery_macros", version = "0.8.15" }
trailbase-apalis = { path = "trailbase-apalis", version = "0.1.0" }
trailbase-assets = { path = "trailbase-assets", version = "0.1.0" }
trailbase-sqlean = { path = "vendor/sqlean", version = "0.0.2" }
trailbase-extension = { path = "trailbase-extension", version = "0.2.0" }

View File

@@ -74,7 +74,7 @@ impl SqliteStorage<()> {
.execute_batch(
r#"
PRAGMA journal_mode = 'WAL';
PRAGMA temp_store = 2;
PRAGMA temp_store = MEMORY;
PRAGMA synchronous = NORMAL;
PRAGMA cache_size = 64000;
"#,

View File

@@ -17,7 +17,6 @@ chrono = "^0.4.38"
clap = { version = "^4.4.11", features=["derive", "env"] }
env_logger = { workspace = true }
trailbase = { workspace = true }
trailbase-sqlite = { path = "../trailbase-sqlite" }
log = "^0.4.21"
mimalloc = { version = "^0.1.41", default-features = false }
serde = { version = "^1.0.203", features = ["derive"] }

View File

@@ -50,10 +50,7 @@ impl DbUser {
}
}
async fn get_user_by_email(
conn: &trailbase_sqlite::Connection,
email: &str,
) -> Result<DbUser, BoxError> {
async fn get_user_by_email(conn: &api::Connection, email: &str) -> Result<DbUser, BoxError> {
if let Some(user) = conn
.read_query_value::<DbUser>(
format!("SELECT * FROM {USER_TABLE} WHERE email = $1"),
@@ -157,10 +154,7 @@ async fn async_main() -> Result<(), BoxError> {
Some(SubCommands::Admin { cmd }) => {
init_logger(false);
let conn = trailbase_sqlite::Connection::new(
|| api::connect_sqlite(Some(data_dir.main_db_path()), None),
None,
)?;
let (conn, _) = api::init_main_db(Some(&data_dir), None)?;
match cmd {
Some(AdminSubCommands::List) => {
@@ -211,10 +205,7 @@ async fn async_main() -> Result<(), BoxError> {
init_logger(false);
let data_dir = DataDir(args.data_dir);
let conn = trailbase_sqlite::Connection::new(
|| api::connect_sqlite(Some(data_dir.main_db_path()), None),
None,
)?;
let (conn, _) = api::init_main_db(Some(&data_dir), None)?;
match cmd {
Some(UserSubCommands::ResetPassword { email, password }) => {

View File

@@ -78,6 +78,7 @@ tower-http = { version = "^0.6.0", default-features = false, features = ["cors",
tower-service = { version = "0.3.3", default-features = false }
tracing = { version = "0.1.40", default-features = false }
tracing-subscriber = { version = "0.3.18", default-features = false, features = ["smallvec", "std", "fmt", "json"] }
trailbase-apalis = { workspace = true }
trailbase-assets = { workspace = true }
trailbase-extension = { workspace = true }
trailbase-refinery-core = { workspace = true }

View File

@@ -389,7 +389,8 @@ mod tests {
async fn test_aggregate_rate_computation() {
let conn = trailbase_sqlite::Connection::new(
move || -> anyhow::Result<_> {
let mut conn_sync = crate::connection::connect_sqlite(None, None).unwrap();
let mut conn_sync =
crate::connection::connect_rusqlite_without_default_extensions_and_schemas(None).unwrap();
apply_logs_migrations(&mut conn_sync).unwrap();
return Ok(conn_sync);
},

View File

@@ -10,6 +10,7 @@ use crate::config::{validate_config, write_config_and_vault_textproto};
use crate::data_dir::DataDir;
use crate::email::Mailer;
use crate::js::RuntimeHandle;
use crate::queue::QueueStorage;
use crate::records::RecordApi;
use crate::records::subscribe::SubscriptionManager;
use crate::scheduler::{JobRegistry, build_job_registry_from_config};
@@ -31,8 +32,9 @@ struct InternalState {
record_apis: Computed<Vec<(String, RecordApi)>, Config>,
config: ValueNotifier<Config>,
logs_conn: trailbase_sqlite::Connection,
conn: trailbase_sqlite::Connection,
logs_conn: trailbase_sqlite::Connection,
queue: QueueStorage,
jwt: JwtHelper,
@@ -57,6 +59,7 @@ pub(crate) struct AppStateArgs {
pub config: Config,
pub conn: trailbase_sqlite::Connection,
pub logs_conn: trailbase_sqlite::Connection,
pub queue: QueueStorage,
pub jwt: JwtHelper,
pub object_store: Box<dyn ObjectStore + Send + Sync>,
pub js_runtime_threads: Option<usize>,
@@ -141,6 +144,7 @@ impl AppState {
config,
conn: args.conn.clone(),
logs_conn: args.logs_conn,
queue: args.queue,
jwt: args.jwt,
table_metadata: args.table_metadata.clone(),
subscription_manager: SubscriptionManager::new(args.conn, args.table_metadata, record_apis),
@@ -182,6 +186,10 @@ impl AppState {
return &self.state.logs_conn;
}
pub fn queue(&self) -> &QueueStorage {
return &self.state.queue;
}
pub fn version(&self) -> rustc_tools_util::VersionInfo {
return rustc_tools_util::get_version_info!();
}
@@ -314,7 +322,6 @@ pub async fn test_state(options: Option<TestStateOptions>) -> anyhow::Result<App
use crate::auth::oauth::providers::test::TestOAuthProvider;
use crate::config::proto::{OAuthProviderConfig, OAuthProviderId};
use crate::config::validate_config;
use crate::migrations::{apply_logs_migrations, apply_main_migrations, apply_user_migrations};
let _ = env_logger::try_init_from_env(env_logger::Env::new().default_filter_or(
"info,refinery_core=warn,trailbase_refinery_core=warn,log::span=warn,swc_ecma_codegen=off",
@@ -323,28 +330,9 @@ pub async fn test_state(options: Option<TestStateOptions>) -> anyhow::Result<App
let temp_dir = temp_dir::TempDir::new()?;
tokio::fs::create_dir_all(temp_dir.child("uploads")).await?;
let conn = {
trailbase_sqlite::Connection::new(
|| -> anyhow::Result<rusqlite::Connection> {
let mut conn = crate::connection::connect_sqlite(None, None)?;
apply_user_migrations(&mut conn)?;
let _new_db = apply_main_migrations(&mut conn, None)?;
return Ok(conn);
},
None,
)?
};
let logs_conn = {
trailbase_sqlite::Connection::new(
|| -> anyhow::Result<rusqlite::Connection> {
let mut conn = crate::connection::connect_sqlite(None, None)?;
apply_logs_migrations(&mut conn)?;
return Ok(conn);
},
None,
)?
};
let (conn, new) = crate::connection::init_main_db(None, None)?;
assert!(new);
let logs_conn = crate::connection::init_logs_db(None)?;
let table_metadata = TableMetadataCache::new(conn.clone()).await?;
@@ -453,6 +441,7 @@ pub async fn test_state(options: Option<TestStateOptions>) -> anyhow::Result<App
config,
conn: conn.clone(),
logs_conn,
queue: crate::queue::init_queue_storage(None).await.unwrap(),
jwt: jwt::test_jwt_helper(),
table_metadata: table_metadata.clone(),
subscription_manager: SubscriptionManager::new(conn.clone(), table_metadata, record_apis),

View File

@@ -109,9 +109,11 @@ mod tests {
#[tokio::test]
async fn test_some_sqlite_errors_yield_client_errors() {
let conn =
trailbase_sqlite::Connection::new(|| crate::connection::connect_sqlite(None, None), None)
.unwrap();
let conn = trailbase_sqlite::Connection::new(
|| crate::connection::connect_rusqlite_without_default_extensions_and_schemas(None),
None,
)
.unwrap();
conn
.execute(

View File

@@ -1,6 +1,77 @@
use parking_lot::Mutex;
use std::path::PathBuf;
use thiserror::Error;
pub fn connect_sqlite(
use crate::data_dir::DataDir;
use crate::migrations::{apply_logs_migrations, apply_main_migrations};
pub use trailbase_sqlite::Connection;
#[derive(Debug, Error)]
pub enum ConnectionError {
#[error("SQLite ext error: {0}")]
SqliteExtension(#[from] trailbase_extension::Error),
#[error("Rusqlite error: {0}")]
Rusqlite(#[from] rusqlite::Error),
#[error("Migration error: {0}")]
Migration(#[from] trailbase_refinery_core::Error),
}
/// Initializes a new SQLite Connection with all the default extensions, migrations and settings
/// applied.
///
/// Returns a Connection and whether the DB was newly created..
pub fn init_main_db(
data_dir: Option<&DataDir>,
extensions: Option<Vec<PathBuf>>,
) -> Result<(Connection, bool), ConnectionError> {
let new_db = Mutex::new(false);
let main_path = data_dir.map(|d| d.main_db_path());
let migrations_path = data_dir.map(|d| d.migrations_path());
let conn = trailbase_sqlite::Connection::new(
|| -> Result<_, ConnectionError> {
let mut conn = connect_rusqlite(main_path.clone(), extensions.clone())?;
*(new_db.lock()) |= apply_main_migrations(&mut conn, migrations_path.clone())?;
return Ok(conn);
},
Some(trailbase_sqlite::connection::Options {
n_read_threads: match (data_dir, std::thread::available_parallelism()) {
(None, _) => 0,
(Some(_), Ok(n)) => n.get().clamp(2, 4),
(Some(_), Err(_)) => 4,
},
..Default::default()
}),
)?;
return Ok((conn, *new_db.lock()));
}
pub(crate) fn init_logs_db(data_dir: Option<&DataDir>) -> Result<Connection, ConnectionError> {
let path = data_dir.map(|d| d.logs_db_path());
return trailbase_sqlite::Connection::new(
|| -> Result<_, ConnectionError> {
let mut conn = connect_rusqlite_without_default_extensions_and_schemas(path.clone())?;
// Turn off secure_deletions, i.e. don't wipe the memory with zeros.
conn.query_row("PRAGMA secure_delete = FALSE", (), |_row| Ok(()))?;
// Sync less often
conn.execute("PRAGMA synchronous = 1", ())?;
apply_logs_migrations(&mut conn)?;
return Ok(conn);
},
None,
);
}
pub(crate) fn connect_rusqlite(
path: Option<PathBuf>,
extensions: Option<Vec<PathBuf>>,
) -> Result<rusqlite::Connection, trailbase_extension::Error> {
@@ -8,3 +79,22 @@ pub fn connect_sqlite(
return trailbase_extension::connect_sqlite(path, extensions);
}
pub(crate) fn connect_rusqlite_without_default_extensions_and_schemas(
path: Option<PathBuf>,
) -> Result<rusqlite::Connection, rusqlite::Error> {
let conn = if let Some(p) = path {
use rusqlite::OpenFlags;
let flags = OpenFlags::SQLITE_OPEN_READ_WRITE
| OpenFlags::SQLITE_OPEN_CREATE
| OpenFlags::SQLITE_OPEN_NO_MUTEX;
rusqlite::Connection::open_with_flags(p, flags)?
} else {
rusqlite::Connection::open_in_memory()?
};
trailbase_extension::apply_default_pragmas(&conn)?;
return Ok(conn);
}

View File

@@ -27,6 +27,10 @@ impl DataDir {
return self.data_path().join("logs.db");
}
pub fn queue_db_path(&self) -> PathBuf {
return self.data_path().join("queue.db");
}
pub fn data_path(&self) -> PathBuf {
return self.0.join("data/");
}

View File

@@ -18,6 +18,7 @@ mod extract;
mod js;
mod listing;
mod migrations;
mod queue;
mod scheduler;
mod server;
mod table_metadata;
@@ -61,7 +62,7 @@ pub mod api {
pub use crate::admin::user::{CreateUserRequest, create_user_handler};
pub use crate::auth::api::login::login_with_password;
pub use crate::auth::{JwtHelper, TokenClaims, force_password_reset};
pub use crate::connection::connect_sqlite;
pub use crate::connection::{Connection, init_main_db};
pub use crate::email::{Email, EmailError};
pub use crate::migrations::new_unique_migration_filename;
pub use crate::records::json_schema::build_api_json_schema;

View File

@@ -90,27 +90,6 @@ pub(crate) fn apply_main_migrations(
return Ok(new_db);
}
#[cfg(test)]
pub(crate) fn apply_user_migrations(
user_conn: &mut rusqlite::Connection,
) -> Result<(), trailbase_refinery_core::Error> {
let mut runner = main::migrations::runner();
runner.set_migration_table_name(MIGRATION_TABLE_NAME);
let report = runner.run(user_conn).map_err(|err| {
error!("User migrations: {err}");
return err;
})?;
if cfg!(test) {
debug!("user migrations: {report:?}");
} else {
info!("user migrations: {report:?}");
}
return Ok(());
}
pub(crate) fn apply_logs_migrations(
logs_conn: &mut rusqlite::Connection,
) -> Result<(), trailbase_refinery_core::Error> {

View File

@@ -0,0 +1,41 @@
use serde::{Deserialize, Serialize};
use thiserror::Error;
use trailbase_apalis::sqlite::SqliteStorage;
use crate::data_dir::DataDir;
#[derive(Debug, Error)]
pub enum QueueError {
#[error("SQLite ext error: {0}")]
SqliteExtension(#[from] trailbase_extension::Error),
#[error("SQLite error: {0}")]
Sqlite(#[from] trailbase_sqlite::Error),
}
#[derive(Debug, Serialize, Deserialize)]
pub enum Job {
SendEmail(),
}
pub type QueueStorage = SqliteStorage<Job>;
pub(crate) async fn init_queue_storage(
data_dir: Option<&DataDir>,
) -> Result<QueueStorage, QueueError> {
let queue_path = data_dir.map(|d| d.queue_db_path());
let conn = trailbase_sqlite::Connection::new(
|| -> Result<_, trailbase_sqlite::Error> {
return Ok(
crate::connection::connect_rusqlite_without_default_extensions_and_schemas(
queue_path.clone(),
)?,
);
},
None,
)?;
SqliteStorage::setup(&conn).await?;
let config = trailbase_apalis::Config::new("apalis::test");
return Ok(SqliteStorage::new_with_config(conn, config));
}

View File

@@ -1,30 +1,21 @@
use log::*;
use parking_lot::Mutex;
use std::path::PathBuf;
use std::sync::Arc;
use thiserror::Error;
use crate::app_state::{AppState, AppStateArgs, build_objectstore};
use crate::auth::jwt::{JwtHelper, JwtHelperError};
use crate::config::load_or_init_config_textproto;
use crate::constants::USER_TABLE;
use crate::migrations::{apply_logs_migrations, apply_main_migrations};
use crate::rand::generate_random_string;
use crate::server::DataDir;
use crate::table_metadata::TableMetadataCache;
#[derive(Debug, Error)]
pub enum InitError {
#[error("SQLite extension error: {0}")]
SqliteExtension(#[from] trailbase_extension::Error),
#[error("TB SQLite error: {0}")]
#[error("SQLite error: {0}")]
Sqlite(#[from] trailbase_sqlite::Error),
#[error("Rusqlite error: {0}")]
Rusqlite(#[from] rusqlite::Error),
#[error("RusqliteFromSql error: {0}")]
FromSql(#[from] rusqlite::types::FromSqlError),
#[error("DB Migration error: {0}")]
Migration(#[from] trailbase_refinery_core::Error),
#[error("Connection error: {0}")]
Connection(#[from] crate::connection::ConnectionError),
#[error("IO error: {0}")]
IO(#[from] std::io::Error),
#[error("Config error: {0}")]
@@ -43,6 +34,8 @@ pub enum InitError {
ScriptError(String),
#[error("ObjectStore error: {0}")]
ObjectStore(#[from] object_store::Error),
#[error("Queue error: {0}")]
Queue(#[from] crate::queue::QueueError),
}
#[derive(Default)]
@@ -62,43 +55,14 @@ pub async fn init_app_state(
data_dir.ensure_directory_structure().await?;
// Then open or init new databases.
let logs_conn = {
trailbase_sqlite::Connection::new(
|| -> Result<_, InitError> {
let mut conn = init_logs_db(&data_dir)?;
apply_logs_migrations(&mut conn)?;
return Ok(conn);
},
None,
)?
};
let logs_conn = crate::connection::init_logs_db(Some(&data_dir))?;
// TODO: At this early stage we're using an in-memory db. Go persistent before rolling out.
let queue = crate::queue::init_queue_storage(None).await?;
// Open or init the main db. Note that we derive whether a new DB was initialized based on
// whether the V1 migration had to be applied. Should be fairly robust.
let (conn, new_db): (trailbase_sqlite::Connection, bool) = {
let new_db = Arc::new(Mutex::new(false));
let new_db_clone = new_db.clone();
let conn = trailbase_sqlite::Connection::new(
|| -> Result<_, InitError> {
let mut conn = crate::connection::connect_sqlite(Some(data_dir.main_db_path()), None)?;
*(new_db_clone.lock()) |=
apply_main_migrations(&mut conn, Some(data_dir.migrations_path()))?;
return Ok(conn);
},
Some(trailbase_sqlite::connection::Options {
n_read_threads: if let Ok(n) = std::thread::available_parallelism() {
n.get().clamp(2, 4)
} else {
4
},
..Default::default()
}),
)?;
let new_db: bool = *new_db.lock();
(conn, new_db)
};
let (conn, new_db) = crate::connection::init_main_db(Some(&data_dir), None)?;
let table_metadata = TableMetadataCache::new(conn.clone()).await?;
@@ -158,6 +122,7 @@ pub async fn init_app_state(
config,
conn,
logs_conn,
queue,
jwt,
object_store,
js_runtime_threads: args.js_runtime_threads,
@@ -214,15 +179,3 @@ pub async fn init_app_state(
return Ok((new_db, app_state));
}
fn init_logs_db(data_dir: &DataDir) -> Result<rusqlite::Connection, InitError> {
let conn = crate::connection::connect_sqlite(data_dir.logs_db_path().into(), None)?;
// Turn off secure_deletions, i.e. don't wipe the memory with zeros.
conn.query_row("PRAGMA secure_delete = FALSE", (), |_row| Ok(()))?;
// Sync less often
conn.execute("PRAGMA synchronous = 1", ())?;
return Ok(conn);
}

View File

@@ -19,6 +19,36 @@ pub enum Error {
Other(#[source] Box<dyn std::error::Error + Send + Sync + 'static>),
}
pub fn apply_default_pragmas(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
const CONFIG: &[&str] = &[
"PRAGMA busy_timeout = 10000",
"PRAGMA journal_mode = WAL",
"PRAGMA journal_size_limit = 200000000",
// Sync the file system less often.
"PRAGMA synchronous = NORMAL",
"PRAGMA foreign_keys = ON",
"PRAGMA temp_store = MEMORY",
"PRAGMA cache_size = -16000",
// TODO: Maybe worth exploring once we have a benchmark, based on
// https://phiresky.github.io/blog/2020/sqlite-performance-tuning/.
// "PRAGMA mmap_size = 30000000000",
// "PRAGMA page_size = 32768",
// Safety feature around application-defined functions recommended by
// https://sqlite.org/appfunc.html
"PRAGMA trusted_schema = OFF",
];
// NOTE: we're querying here since some pragmas return data.
for pragma in CONFIG {
let mut stmt = conn.prepare(pragma)?;
let mut rows = stmt.query([])?;
let _maybe_row = rows.next()?;
}
return Ok(());
}
#[allow(unsafe_code)]
pub fn connect_sqlite(
path: Option<PathBuf>,

View File

@@ -71,7 +71,29 @@ impl Connection {
builder: impl Fn() -> std::result::Result<rusqlite::Connection, E>,
opt: Option<Options>,
) -> std::result::Result<Self, E> {
let n_read_threads = {
let spawn = |receiver: Receiver<Message>| -> std::result::Result<Option<String>, E> {
let conn = builder()?;
let name = conn.path().and_then(|s| {
// Returns empty string for in-memory databases.
if s.is_empty() {
None
} else {
Some(s.to_string())
}
});
if let Some(timeout) = opt.as_ref().map(|o| o.busy_timeout) {
conn.busy_timeout(timeout).expect("busy timeout failed");
}
std::thread::spawn(move || event_loop(conn, receiver));
return Ok(name);
};
let (shared_write_sender, shared_write_receiver) = crossbeam_channel::unbounded::<Message>();
let name = spawn(shared_write_receiver)?;
let n_read_threads = if name.is_some() {
let n_read_threads = match opt.as_ref().map_or(0, |o| o.n_read_threads) {
1 => {
warn!(
@@ -92,23 +114,11 @@ impl Connection {
}
n_read_threads
} else {
// We cannot share an in-memory database across threads, they're all independent.
0
};
let spawn = |receiver: Receiver<Message>| -> std::result::Result<String, E> {
let conn = builder()?;
let name = conn.path().unwrap_or_default().to_string();
if let Some(timeout) = opt.as_ref().map(|o| o.busy_timeout) {
conn.busy_timeout(timeout).expect("busy timeout failed");
}
std::thread::spawn(move || event_loop(conn, receiver));
return Ok(name);
};
let (shared_write_sender, shared_write_receiver) = crossbeam_channel::unbounded::<Message>();
let name = spawn(shared_write_receiver)?;
let shared_read_sender = if n_read_threads > 0 {
let (shared_read_sender, shared_read_receiver) = crossbeam_channel::unbounded::<Message>();
for _ in 0..n_read_threads {
@@ -119,7 +129,10 @@ impl Connection {
shared_write_sender.clone()
};
debug!("Opened SQLite DB '{name}' with {n_read_threads} dedicated reader threads");
debug!(
"Opened SQLite DB '{name}' with {n_read_threads} dedicated reader threads",
name = name.as_deref().unwrap_or("<in-memory>")
);
return Ok(Self {
reader: shared_read_sender,