Put early queue work and apalis deps behind a "queue" feature.

This commit is contained in:
Sebastian Jeltsch
2025-04-23 12:52:25 +02:00
parent 2908e9fde6
commit 4487d30b79
6 changed files with 115 additions and 19 deletions
Generated
+1
View File
@@ -6686,6 +6686,7 @@ name = "trailbase"
version = "0.1.0"
dependencies = [
"anyhow",
"apalis",
"arc-swap",
"argon2",
"askama",
-1
View File
@@ -15,7 +15,6 @@ members = [
]
default-members = [
"client/trailbase-rs",
"trailbase-apalis",
"trailbase-assets",
"trailbase-cli",
"trailbase-core",
+4 -2
View File
@@ -21,9 +21,11 @@ harness = false
[features]
default = ["v8"]
v8 = ["dep:rustyscript", ]
v8 = ["dep:rustyscript"]
queue = ["dep:apalis", "dep:trailbase-apalis"]
[dependencies]
apalis = { version = "0.7.0", optional = true, default-features = false }
arc-swap = "1.7.1"
argon2 = { version = "^0.5.3", default-features = false, features = ["alloc", "password-hash"] }
askama = { workspace = true }
@@ -78,7 +80,7 @@ tower-http = { version = "^0.6.0", default-features = false, features = ["cors",
tower-service = { version = "0.3.3", default-features = false }
tracing = { version = "0.1.40", default-features = false }
tracing-subscriber = { version = "0.3.18", default-features = false, features = ["smallvec", "std", "fmt", "json"] }
trailbase-apalis = { workspace = true }
trailbase-apalis = { workspace = true, optional = true }
trailbase-assets = { workspace = true }
trailbase-extension = { workspace = true }
trailbase-refinery-core = { workspace = true }
+5 -5
View File
@@ -10,7 +10,7 @@ use crate::config::{validate_config, write_config_and_vault_textproto};
use crate::data_dir::DataDir;
use crate::email::Mailer;
use crate::js::RuntimeHandle;
use crate::queue::QueueStorage;
use crate::queue::Queue;
use crate::records::RecordApi;
use crate::records::subscribe::SubscriptionManager;
use crate::scheduler::{JobRegistry, build_job_registry_from_config};
@@ -34,7 +34,7 @@ struct InternalState {
conn: trailbase_sqlite::Connection,
logs_conn: trailbase_sqlite::Connection,
queue: QueueStorage,
queue: Queue,
jwt: JwtHelper,
@@ -59,7 +59,7 @@ pub(crate) struct AppStateArgs {
pub config: Config,
pub conn: trailbase_sqlite::Connection,
pub logs_conn: trailbase_sqlite::Connection,
pub queue: QueueStorage,
pub queue: Queue,
pub jwt: JwtHelper,
pub object_store: Box<dyn ObjectStore + Send + Sync>,
pub js_runtime_threads: Option<usize>,
@@ -186,7 +186,7 @@ impl AppState {
return &self.state.logs_conn;
}
pub fn queue(&self) -> &QueueStorage {
pub fn queue(&self) -> &Queue {
return &self.state.queue;
}
@@ -441,7 +441,7 @@ pub async fn test_state(options: Option<TestStateOptions>) -> anyhow::Result<App
config,
conn: conn.clone(),
logs_conn,
queue: crate::queue::init_queue_storage(None).await.unwrap(),
queue: Queue::new(None).await.unwrap(),
jwt: jwt::test_jwt_helper(),
table_metadata: table_metadata.clone(),
subscription_manager: SubscriptionManager::new(conn.clone(), table_metadata, record_apis),
+104 -10
View File
@@ -1,6 +1,4 @@
use serde::{Deserialize, Serialize};
use thiserror::Error;
use trailbase_apalis::sqlite::SqliteStorage;
use crate::data_dir::DataDir;
@@ -10,18 +8,71 @@ pub enum QueueError {
SqliteExtension(#[from] trailbase_extension::Error),
#[error("SQLite error: {0}")]
Sqlite(#[from] trailbase_sqlite::Error),
#[error("IO error: {0}")]
IO(#[from] std::io::Error),
}
#[derive(Debug, Serialize, Deserialize)]
pub enum Job {
SendEmail(),
#[cfg(feature = "queue")]
pub(crate) mod queue_impl {
use log::*;
use serde::{Deserialize, Serialize};
use super::QueueError;
#[derive(Debug, Serialize, Deserialize)]
pub enum Job {
Something(),
}
pub async fn handle_job(job: Job) -> Result<(), QueueError> {
match job {
Job::Something() => {
info!("Queue got something");
}
}
return Ok(());
}
#[cfg(feature = "queue")]
pub(crate) type QueueStorage = trailbase_apalis::sqlite::SqliteStorage<Job>;
}
pub type QueueStorage = SqliteStorage<Job>;
#[derive(Clone)]
pub struct Queue {
#[cfg(feature = "queue")]
pub(crate) storage: queue_impl::QueueStorage,
}
impl Queue {
#[allow(unused)]
pub(crate) async fn new(data_dir: Option<&DataDir>) -> Result<Self, QueueError> {
return Ok(Self {
#[cfg(feature = "queue")]
storage: init_queue_storage(data_dir).await?,
});
}
#[cfg(feature = "queue")]
#[allow(unused)]
pub(crate) async fn run(&self) -> Result<(), QueueError> {
use apalis::prelude::*;
let monitor = Monitor::new().register({
WorkerBuilder::new("default-worker")
// .enable_tracing()
.backend(self.storage.clone())
.build_fn(queue_impl::handle_job)
});
return Ok(monitor.run().await?);
}
}
#[cfg(feature = "queue")]
pub(crate) async fn init_queue_storage(
data_dir: Option<&DataDir>,
) -> Result<QueueStorage, QueueError> {
) -> Result<queue_impl::QueueStorage, QueueError> {
let queue_path = data_dir.map(|d| d.queue_db_path());
let conn = trailbase_sqlite::Connection::new(
|| -> Result<_, trailbase_sqlite::Error> {
@@ -34,8 +85,51 @@ pub(crate) async fn init_queue_storage(
None,
)?;
SqliteStorage::setup(&conn).await?;
trailbase_apalis::sqlite::SqliteStorage::setup(&conn).await?;
let config = trailbase_apalis::Config::new("apalis::test");
return Ok(SqliteStorage::new_with_config(conn, config));
let config = trailbase_apalis::Config::new("ns::trailbase");
return Ok(queue_impl::QueueStorage::new_with_config(conn, config));
}
#[cfg(test)]
#[cfg(feature = "queue")]
mod tests {
use super::queue_impl::*;
use super::*;
use apalis::prelude::*;
#[tokio::test]
async fn test_queue() {
let mut queue = Queue::new(None).await.unwrap();
let (sender, receiver) = async_channel::unbounded::<()>();
let storage = queue.storage.clone();
let _ = tokio::spawn(async move {
let monitor = Monitor::new().register({
WorkerBuilder::new("default-worker")
.data(sender)
.backend(storage)
.build_fn(
async |job: Job, sender: Data<async_channel::Sender<()>>| -> Result<(), QueueError> {
match job {
Job::Something() => sender.send(()).await.unwrap(),
}
return Ok(());
},
)
});
return monitor.run().await;
});
let job = queue.storage.push(Job::Something()).await.unwrap();
let entry = queue.storage.fetch_by_id(&job.task_id).await.unwrap();
assert!(entry.is_some());
receiver.recv().await.unwrap();
}
}
+1 -1
View File
@@ -58,7 +58,7 @@ pub async fn init_app_state(
let logs_conn = crate::connection::init_logs_db(Some(&data_dir))?;
// TODO: At this early stage we're using an in-memory db. Go persistent before rolling out.
let queue = crate::queue::init_queue_storage(None).await?;
let queue = crate::queue::Queue::new(None).await?;
// Open or init the main db. Note that we derive whether a new DB was initialized based on
// whether the V1 migration had to be applied. Should be fairly robust.