From 63c330c4e51daf5fb7309ffa3307414c854fb9be Mon Sep 17 00:00:00 2001 From: Sebastian Jeltsch Date: Fri, 18 Apr 2025 10:48:38 +0200 Subject: [PATCH] Add a custom PoC apalis queue storage backend. Forked from apalis-sql's sqlite implementation. --- Cargo.lock | 81 ++ Cargo.toml | 2 + trailbase-apalis/Cargo.toml | 43 + trailbase-apalis/migrations/V1__initial.sql | 38 + trailbase-apalis/src/context.rs | 128 +++ trailbase-apalis/src/lib.rs | 324 +++++++ trailbase-apalis/src/sqlite.rs | 975 ++++++++++++++++++++ 7 files changed, 1591 insertions(+) create mode 100644 trailbase-apalis/Cargo.toml create mode 100644 trailbase-apalis/migrations/V1__initial.sql create mode 100644 trailbase-apalis/src/context.rs create mode 100644 trailbase-apalis/src/lib.rs create mode 100644 trailbase-apalis/src/sqlite.rs diff --git a/Cargo.lock b/Cargo.lock index e59d9cdf..e3befef2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -198,6 +198,38 @@ version = "1.0.98" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" +[[package]] +name = "apalis" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f3c13b1a2646be21cfd231fc8835dedbc7d5d648d68297fd2e97d75758bee97" +dependencies = [ + "apalis-core", + "futures", + "pin-project-lite", + "serde", + "thiserror 2.0.12", + "tower 0.5.2", + "tracing", + "tracing-futures", +] + +[[package]] +name = "apalis-core" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b88bff4c16a47ab5d8907985957f31b0a937aa1b212dd4d6ab57a8706cf1d9c0" +dependencies = [ + "futures", + "futures-timer", + "pin-project-lite", + "serde", + "serde_json", + "thiserror 2.0.12", + "tower 0.5.2", + "ulid", +] + [[package]] name = "arbitrary" version = "1.4.1" @@ -2444,6 +2476,12 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" +[[package]] +name = "futures-timer" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" + [[package]] name = "futures-util" version = "0.3.31" @@ -6616,6 +6654,15 @@ dependencies = [ "valuable", ] +[[package]] +name = "tracing-futures" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" +dependencies = [ + "tracing", +] + [[package]] name = "tracing-serde" version = "0.2.0" @@ -6722,6 +6769,30 @@ dependencies = [ "validator", ] +[[package]] +name = "trailbase-apalis" +version = "0.1.0" +dependencies = [ + "apalis", + "apalis-core", + "async-std", + "async-stream", + "chrono", + "futures", + "futures-lite", + "log", + "once_cell", + "serde", + "serde_json", + "serde_rusqlite", + "thiserror 2.0.12", + "tokio", + "trailbase-apalis", + "trailbase-refinery-core", + "trailbase-refinery-macros", + "trailbase-sqlite", +] + [[package]] name = "trailbase-assets" version = "0.1.0" @@ -6935,6 +7006,16 @@ version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1dccffe3ce07af9386bfd29e80c0ab1a8205a2fc34e4bcd40364df902cfa8f3f" +[[package]] +name = "ulid" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "470dbf6591da1b39d43c14523b2b469c86879a53e8b758c8e090a470fe7b1fbe" +dependencies = [ + "rand 0.9.0", + "web-time", +] + [[package]] name = "uncased" version = "0.9.10" diff --git a/Cargo.toml b/Cargo.toml index 49d0ce7f..f0f86a9f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "client/trailbase-rs", "docs/examples/record_api_rs", "examples/custom-binary", + "trailbase-apalis", "trailbase-assets", "trailbase-cli", "trailbase-core", @@ -14,6 +15,7 @@ members = [ ] default-members = [ "client/trailbase-rs", + "trailbase-apalis", "trailbase-assets", "trailbase-cli", "trailbase-core", diff --git a/trailbase-apalis/Cargo.toml b/trailbase-apalis/Cargo.toml new file mode 100644 index 00000000..e52c39fb --- /dev/null +++ b/trailbase-apalis/Cargo.toml @@ -0,0 +1,43 @@ +[package] +name = "trailbase-apalis" +version = "0.1.0" +edition = "2021" +publish = false + +license = "OSL-3.0" +description = "SQLite SQL storage for Apalis' background job processing" + +[features] +default = [] + +[dependencies] +apalis-core = { version = "0.7.0", default-features = false, features = [ + "sleep", + "json", +] } +async-std = { version = "1.13.0", optional = true } +async-stream = "0.3.5" +chrono = { version = "0.4", features = ["serde"] } +futures = "0.3.30" +futures-lite = "2.3.0" +log = "0.4.21" +serde = { version = "1", features = ["derive"] } +serde_json = "1" +serde_rusqlite = "0.38.0" +thiserror = "2.0.0" +tokio = { workspace = true } +trailbase-refinery-core = { workspace = true } +trailbase-refinery-macros = { workspace = true } +trailbase-sqlite = { workspace = true } + +[dev-dependencies] +apalis = { version = "0.7.0" } +apalis-core = { version = "0.7.0", default-features = false, features = [ "test-utils" ] } +once_cell = "1.19.0" +tokio = { workspace = true } +trailbase-apalis = { path = "." } + +[package.metadata.docs.rs] +# defines the configuration attribute `docsrs` +rustdoc-args = ["--cfg", "docsrs"] +all-features = true diff --git a/trailbase-apalis/migrations/V1__initial.sql b/trailbase-apalis/migrations/V1__initial.sql new file mode 100644 index 00000000..245e2142 --- /dev/null +++ b/trailbase-apalis/migrations/V1__initial.sql @@ -0,0 +1,38 @@ +CREATE TABLE IF NOT EXISTS Workers ( + id TEXT NOT NULL UNIQUE, + worker_type TEXT NOT NULL, + storage_name TEXT NOT NULL, + layers TEXT, + last_seen INTEGER NOT NULL DEFAULT (strftime('%s', 'now')) +); + +CREATE INDEX IF NOT EXISTS Idx ON Workers(id); + +CREATE INDEX IF NOT EXISTS WTIdx ON Workers(worker_type); + +CREATE INDEX IF NOT EXISTS LSIdx ON Workers(last_seen); + +CREATE TABLE IF NOT EXISTS Jobs ( + job TEXT NOT NULL, + id TEXT NOT NULL UNIQUE, + job_type TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'Pending', + attempts INTEGER NOT NULL DEFAULT 0, + max_attempts INTEGER NOT NULL DEFAULT 25, + run_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')), + last_error TEXT, + lock_at INTEGER, + lock_by TEXT, + done_at INTEGER, + priority INTEGER NOT NULL DEFAULT 0, + + FOREIGN KEY(lock_by) REFERENCES Workers(id) +); + +CREATE INDEX IF NOT EXISTS TIdx ON Jobs(id); + +CREATE INDEX IF NOT EXISTS SIdx ON Jobs(status); + +CREATE INDEX IF NOT EXISTS LIdx ON Jobs(lock_by); + +CREATE INDEX IF NOT EXISTS JTIdx ON Jobs(job_type); diff --git a/trailbase-apalis/src/context.rs b/trailbase-apalis/src/context.rs new file mode 100644 index 00000000..cbc7284d --- /dev/null +++ b/trailbase-apalis/src/context.rs @@ -0,0 +1,128 @@ +use apalis_core::request::Request; +use apalis_core::service_fn::FromRequest; +use apalis_core::worker::WorkerId; +use apalis_core::{error::Error, request::State}; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; + +/// The context for a job is represented here +/// Used to provide a context for a job with an sql backend +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SqlContext { + status: State, + run_at: DateTime, + max_attempts: i32, + last_error: Option, + lock_at: Option, + lock_by: Option, + done_at: Option, + priority: i32, +} + +impl Default for SqlContext { + fn default() -> Self { + Self::new() + } +} + +impl SqlContext { + /// Build a new context with defaults + pub fn new() -> Self { + SqlContext { + status: State::Pending, + run_at: Utc::now(), + lock_at: None, + done_at: None, + max_attempts: 5, + last_error: None, + lock_by: None, + priority: 0, + } + } + + /// Set the number of attempts + pub fn set_max_attempts(&mut self, max_attempts: i32) { + self.max_attempts = max_attempts; + } + + /// Gets the maximum attempts for a job. Default 25 + pub fn max_attempts(&self) -> i32 { + self.max_attempts + } + + /// Get the time a job was done + pub fn done_at(&self) -> &Option { + &self.done_at + } + + /// Set the time a job was done + pub fn set_done_at(&mut self, done_at: Option) { + self.done_at = done_at; + } + + /// Get the time a job is supposed to start + pub fn run_at(&self) -> &DateTime { + &self.run_at + } + + /// Set the time a job should run + pub fn set_run_at(&mut self, run_at: DateTime) { + self.run_at = run_at; + } + + /// Get the time a job was locked + pub fn lock_at(&self) -> &Option { + &self.lock_at + } + + /// Set the lock_at value + pub fn set_lock_at(&mut self, lock_at: Option) { + self.lock_at = lock_at; + } + + /// Get the job status + pub fn status(&self) -> &State { + &self.status + } + + /// Set the job status + pub fn set_status(&mut self, status: State) { + self.status = status; + } + + /// Get the time a job was locked + pub fn lock_by(&self) -> &Option { + &self.lock_by + } + + /// Set `lock_by` + pub fn set_lock_by(&mut self, lock_by: Option) { + self.lock_by = lock_by; + } + + /// Get the time a job was locked + pub fn last_error(&self) -> &Option { + &self.last_error + } + + /// Set the last error + pub fn set_last_error(&mut self, error: Option) { + self.last_error = error; + } + + /// Set the job priority. Larger values will run sooner. Default is 0. + pub fn set_priority(&mut self, priority: i32) { + self.priority = priority + } + + /// Get the job priority + pub fn priority(&self) -> &i32 { + &self.priority + } +} + +impl FromRequest> for SqlContext { + fn from_request(req: &Request) -> Result { + Ok(req.parts.context.clone()) + } +} diff --git a/trailbase-apalis/src/lib.rs b/trailbase-apalis/src/lib.rs new file mode 100644 index 00000000..bf15c02f --- /dev/null +++ b/trailbase-apalis/src/lib.rs @@ -0,0 +1,324 @@ +#![allow(clippy::needless_return)] +#![warn(missing_debug_implementations, missing_docs, unreachable_pub)] +#![cfg_attr(docsrs, feature(doc_cfg))] + +//! # trailbase-apalis +//! +//! apalis offers Sqlite storages for its workers. +//! See relevant modules for examples + +use std::{num::TryFromIntError, time::Duration}; + +use apalis_core::{error::Error, request::State, response::Response}; + +/// The context of the sql job +pub mod context; + +/// Sqlite Storage for apalis. +/// Uses a transaction and min(rowid) +#[cfg_attr(docsrs, doc(cfg(feature = "sqlite")))] +pub mod sqlite; + +use context::SqlContext; + +/// Config for sql storages +#[derive(Debug, Clone)] +pub struct Config { + keep_alive: Duration, + buffer_size: usize, + poll_interval: Duration, + reenqueue_orphaned_after: Duration, + namespace: String, +} + +/// A general sql error +#[derive(Debug, thiserror::Error)] +pub enum SqlError { + /// TrailBase error. + #[error("SQL Error: {0}")] + TB(#[from] trailbase_sqlite::Error), + /// Conversion error + #[error("TryFromIntError: {0}")] + TryFromInt(#[from] TryFromIntError), +} + +impl Default for Config { + fn default() -> Self { + Self { + keep_alive: Duration::from_secs(30), + buffer_size: 10, + poll_interval: Duration::from_millis(100), + reenqueue_orphaned_after: Duration::from_secs(300), // 5 minutes + namespace: String::from("apalis::sql"), + } + } +} + +impl Config { + /// Create a new config with a jobs namespace + pub fn new(namespace: &str) -> Self { + Config::default().set_namespace(namespace) + } + + /// Interval between database poll queries + /// + /// Defaults to 100ms + pub fn set_poll_interval(mut self, interval: Duration) -> Self { + self.poll_interval = interval; + self + } + + /// Interval between worker keep-alive database updates + /// + /// Defaults to 30s + pub fn set_keep_alive(mut self, keep_alive: Duration) -> Self { + self.keep_alive = keep_alive; + self + } + + /// Buffer size to use when querying for jobs + /// + /// Defaults to 10 + pub fn set_buffer_size(mut self, buffer_size: usize) -> Self { + self.buffer_size = buffer_size; + self + } + + /// Set the namespace to consume and push jobs to + /// + /// Defaults to "apalis::sql" + pub fn set_namespace(mut self, namespace: &str) -> Self { + self.namespace = namespace.to_string(); + self + } + + /// Gets a reference to the keep_alive duration. + pub fn keep_alive(&self) -> &Duration { + &self.keep_alive + } + + /// Gets a mutable reference to the keep_alive duration. + pub fn keep_alive_mut(&mut self) -> &mut Duration { + &mut self.keep_alive + } + + /// Gets the buffer size. + pub fn buffer_size(&self) -> usize { + self.buffer_size + } + + /// Gets a reference to the poll_interval duration. + pub fn poll_interval(&self) -> &Duration { + &self.poll_interval + } + + /// Gets a mutable reference to the poll_interval duration. + pub fn poll_interval_mut(&mut self) -> &mut Duration { + &mut self.poll_interval + } + + /// Gets a reference to the namespace. + pub fn namespace(&self) -> &String { + &self.namespace + } + + /// Gets a mutable reference to the namespace. + pub fn namespace_mut(&mut self) -> &mut String { + &mut self.namespace + } + + /// Gets the reenqueue_orphaned_after duration. + pub fn reenqueue_orphaned_after(&self) -> Duration { + self.reenqueue_orphaned_after + } + + /// Gets a mutable reference to the reenqueue_orphaned_after. + pub fn reenqueue_orphaned_after_mut(&mut self) -> &mut Duration { + &mut self.reenqueue_orphaned_after + } + + /// Occasionally some workers die, or abandon jobs because of panics. + /// This is the time a task takes before its back to the queue + /// + /// Defaults to 5 minutes + pub fn set_reenqueue_orphaned_after(mut self, after: Duration) -> Self { + self.reenqueue_orphaned_after = after; + self + } +} + +/// Calculates the status from a result +pub fn calculate_status(ctx: &SqlContext, res: &Response) -> State { + match &res.inner { + Ok(_) => State::Done, + Err(e) => match &e { + Error::Abort(_) => State::Killed, + Error::Failed(_) if ctx.max_attempts() as usize <= res.attempt.current() => State::Killed, + _ => State::Failed, + }, + } +} + +/// Standard checks for any sql backend +#[macro_export] +macro_rules! sql_storage_tests { + ($setup:path, $storage_type:ty, $job_type:ty) => { + type WrappedStorage = TestWrapper<$storage_type, Request<$job_type, SqlContext>, ()>; + + async fn setup_test_wrapper() -> WrappedStorage { + let (mut t, poller) = TestWrapper::new_with_service( + $setup().await, + apalis_core::service_fn::service_fn(email_service::send_email), + ); + tokio::spawn(poller); + t.vacuum().await.unwrap(); + t + } + + async fn push_email_priority( + storage: &mut WrappedStorage, + email: Email, + priority: i32, + ) -> TaskId { + let mut ctx = SqlContext::new(); + ctx.set_priority(priority); + storage + .push_request(Request::new_with_ctx(email, ctx)) + .await + .expect("failed to push a job") + .task_id + } + + #[tokio::test] + async fn integration_test_kill_job() { + let mut storage = setup_test_wrapper().await; + + storage + .push(email_service::example_killed_email()) + .await + .unwrap(); + + let (job_id, res) = storage.execute_next().await.unwrap(); + assert_eq!(res, Err("AbortError: Invalid character.".to_owned())); + apalis_core::sleep(Duration::from_secs(1)).await; + let job = storage + .fetch_by_id(&job_id) + .await + .unwrap() + .expect("No job found"); + let ctx = job.parts.context; + assert_eq!(*ctx.status(), State::Killed); + // assert!(ctx.done_at().is_some()); + assert_eq!( + ctx.last_error().clone().unwrap(), + "{\"Err\":\"AbortError: Invalid character.\"}" + ); + } + + #[tokio::test] + async fn integration_test_acknowledge_good_job() { + let mut storage = setup_test_wrapper().await; + storage + .push(email_service::example_good_email()) + .await + .unwrap(); + + let (job_id, res) = storage.execute_next().await.unwrap(); + assert_eq!(res, Ok("()".to_owned())); + apalis_core::sleep(Duration::from_secs(1)).await; + let job = storage.fetch_by_id(&job_id).await.unwrap().unwrap(); + let ctx = job.parts.context; + assert_eq!(*ctx.status(), State::Done); + assert!(ctx.done_at().is_some()); + } + + #[tokio::test] + async fn integration_test_acknowledge_failed_job() { + let mut storage = setup_test_wrapper().await; + + storage + .push(email_service::example_retry_able_email()) + .await + .unwrap(); + + let (job_id, res) = storage.execute_next().await.unwrap(); + assert_eq!( + res, + Err("FailedError: Missing separator character '@'.".to_owned()) + ); + apalis_core::sleep(Duration::from_secs(1)).await; + let job = storage.fetch_by_id(&job_id).await.unwrap().unwrap(); + let ctx = job.parts.context; + assert_eq!(*ctx.status(), State::Failed); + assert!(job.parts.attempt.current() >= 1); + assert_eq!( + ctx.last_error().clone().unwrap(), + "{\"Err\":\"FailedError: Missing separator character '@'.\"}" + ); + } + + #[tokio::test] + async fn worker_consume() { + use apalis_core::builder::WorkerBuilder; + use apalis_core::builder::WorkerFactoryFn; + let storage = $setup().await; + let mut handle = storage.clone(); + + let parts = handle + .push(email_service::example_good_email()) + .await + .unwrap(); + + async fn task(_job: Email) -> &'static str { + tokio::time::sleep(Duration::from_millis(100)).await; + "Job well done" + } + let worker = WorkerBuilder::new("rango-tango").backend(storage); + let worker = worker.build_fn(task); + let wkr = worker.run(); + + let w = wkr.get_handle(); + + let runner = async move { + apalis_core::sleep(Duration::from_secs(3)).await; + let job_id = &parts.task_id; + let job = get_job(&mut handle, job_id).await; + let ctx = job.parts.context; + + assert_eq!(*ctx.status(), State::Done); + assert!(ctx.done_at().is_some()); + assert!(ctx.lock_by().is_some()); + assert!(ctx.lock_at().is_some()); + assert!(ctx.last_error().is_some()); // TODO: rename last_error to last_result + + w.stop(); + }; + + tokio::join!(runner, wkr); + } + + #[tokio::test] + async fn test_consume_jobs_with_priority() { + let mut storage = setup_test_wrapper().await; + + // push several jobs with differing priorities, then ensure they get executed + // in priority order. + let job2 = push_email_priority(&mut storage, email_service::example_good_email(), 5).await; + let job1 = push_email_priority(&mut storage, email_service::example_good_email(), 10).await; + let job4 = push_email_priority(&mut storage, email_service::example_good_email(), -1).await; + let job3 = push_email_priority(&mut storage, email_service::example_good_email(), 1).await; + + for (job_id, prio) in &[(job1, 10), (job2, 5), (job3, 1), (job4, -1)] { + let (exec_job_id, res) = storage.execute_next().await.unwrap(); + assert_eq!(job_id, &exec_job_id); + assert_eq!(res, Ok("()".to_owned())); + apalis_core::sleep(Duration::from_millis(500)).await; + let job = storage.fetch_by_id(&exec_job_id).await.unwrap().unwrap(); + let ctx = job.parts.context; + + assert_eq!(*ctx.status(), State::Done); + assert_eq!(ctx.priority(), prio); + } + } + }; +} diff --git a/trailbase-apalis/src/sqlite.rs b/trailbase-apalis/src/sqlite.rs new file mode 100644 index 00000000..0ad07b47 --- /dev/null +++ b/trailbase-apalis/src/sqlite.rs @@ -0,0 +1,975 @@ +use crate::context::SqlContext; +use crate::{calculate_status, Config, SqlError}; +use apalis_core::backend::{BackendExpose, Stat, WorkerState}; +use apalis_core::codec::json::JsonCodec; +use apalis_core::error::Error; +use apalis_core::layers::{Ack, AckLayer}; +use apalis_core::poller::controller::Controller; +use apalis_core::poller::stream::BackendStream; +use apalis_core::poller::Poller; +use apalis_core::request::{Parts, Request, RequestStream, State}; +use apalis_core::response::Response; +use apalis_core::storage::Storage; +use apalis_core::task::namespace::Namespace; +use apalis_core::task::task_id::TaskId; +use apalis_core::worker::{Context, Event, Worker, WorkerId}; +use apalis_core::{backend::Backend, codec::Codec}; +use async_stream::try_stream; +use chrono::{DateTime, Utc}; +use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; +use log::error; +use serde::{de::DeserializeOwned, Serialize}; +use std::any::type_name; +use std::convert::TryInto; +use std::fmt; +use std::fmt::Debug; +use std::sync::Arc; +use std::{marker::PhantomData, time::Duration}; + +pub use trailbase_sqlite::Connection; + +type SqlRequest = Request; + +/// Represents a [Storage] that persists to Sqlite +pub struct SqliteStorage> { + conn: Connection, + job_type: PhantomData, + controller: Controller, + config: Config, + codec: PhantomData, +} + +impl fmt::Debug for SqliteStorage { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SqliteStorage") + .field("conn", &self.conn) + .field("job_type", &"PhantomData") + .field("controller", &self.controller) + .field("config", &self.config) + .field("codec", &std::any::type_name::()) + .finish() + } +} + +impl Clone for SqliteStorage { + fn clone(&self) -> Self { + SqliteStorage { + conn: self.conn.clone(), + job_type: PhantomData, + controller: self.controller.clone(), + config: self.config.clone(), + codec: self.codec, + } + } +} + +mod main { + trailbase_refinery_macros::embed_migrations!("migrations"); +} + +impl SqliteStorage<()> { + /// Perform migrations for storage + pub async fn setup(conn: &Connection) -> Result<(), trailbase_sqlite::Error> { + conn.execute("PRAGMA journal_mode = 'WAL'", ()).await?; + conn.execute("PRAGMA temp_store = 2", ()).await?; + conn.execute("PRAGMA synchronous = NORMAL", ()).await?; + conn.execute("PRAGMA cache_size = 64000", ()).await?; + + let runner = main::migrations::runner(); + + let _report = conn + .call(move |conn| { + return runner + .run(conn) + .map_err(|err| trailbase_sqlite::Error::Other(err.into())); + }) + .await?; + + Ok(()) + } +} + +impl SqliteStorage { + /// Create a new instance + pub fn new(conn: Connection) -> Self { + Self { + conn, + job_type: PhantomData, + controller: Controller::new(), + config: Config::new(type_name::()), + codec: PhantomData, + } + } + + /// Create a new instance with a custom config + pub fn new_with_config(conn: Connection, config: Config) -> Self { + Self { + conn, + job_type: PhantomData, + controller: Controller::new(), + config, + codec: PhantomData, + } + } +} +impl SqliteStorage { + /// Keeps a storage notified that the worker is still alive manually + pub async fn keep_alive_at( + &mut self, + worker: &Worker, + last_seen: i64, + ) -> Result<(), trailbase_sqlite::Error> { + const QUERY: &str = "INSERT INTO Workers (id, worker_type, storage_name, layers, last_seen) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (id) DO + UPDATE SET last_seen = EXCLUDED.last_seen"; + + let worker_type = self.config.namespace.clone(); + let storage_name = std::any::type_name::(); + + self + .conn + .execute( + QUERY, + trailbase_sqlite::params!( + worker.id().to_string(), + worker_type, + storage_name, + worker.get_service().to_string(), + last_seen + ), + ) + .await?; + + Ok(()) + } + + /// Expose the connection for other functionality, eg custom migrations + pub fn conn(&self) -> &Connection { + &self.conn + } + + /// Get the config used by the storage + pub fn get_config(&self) -> &Config { + &self.config + } +} + +impl SqliteStorage { + /// Expose the code used + pub fn codec(&self) -> &PhantomData { + &self.codec + } +} + +async fn fetch_next( + conn: &Connection, + worker_id: &WorkerId, + id: String, + config: &Config, +) -> Result>, trailbase_sqlite::Error> { + let now: i64 = Utc::now().timestamp(); + const UPDATE_QUERY : &str = "UPDATE Jobs SET status = 'Running', lock_by = ?2, lock_at = ?3 WHERE id = ?1 AND job_type = ?4 AND status = 'Pending' AND lock_by IS NULL; Select * from Jobs where id = ?1 AND lock_by = ?2 AND job_type = ?4"; + + let job: Option> = conn + .query_row_f( + UPDATE_QUERY, + trailbase_sqlite::params!( + id.to_string(), + worker_id.to_string(), + now, + config.namespace.clone() + ), + serde_rusqlite::from_row, + ) + .await?; + + Ok(job) +} + +impl SqliteStorage +where + T: DeserializeOwned + Send + Unpin, + C: Codec, +{ + fn stream_jobs( + &self, + worker: &Worker, + interval: Duration, + buffer_size: usize, + ) -> impl Stream>, trailbase_sqlite::Error>> { + const FETCH_QUERY : &str = "SELECT id FROM Jobs + WHERE (status = 'Pending' OR (status = 'Failed' AND attempts < max_attempts)) AND run_at < ?1 AND job_type = ?2 ORDER BY priority DESC LIMIT ?3"; + + let conn = self.conn.clone(); + let worker = worker.clone(); + let config = self.config.clone(); + let namespace = Namespace(self.config.namespace.clone()); + try_stream! { + loop { + apalis_core::sleep(interval).await; + if !worker.is_ready() { + continue; + } + let worker_id = worker.id(); + let job_type = &config.namespace; + + let now: i64 = Utc::now().timestamp(); + + let ids: Vec = conn.read_query_values(FETCH_QUERY, trailbase_sqlite::params!( + now, + job_type.to_string(), + buffer_size as i64, + // i64::try_from(buffer_size).map_err(|e| trailbase_sqlite::Error::Other(e.into()))?, + )) + .await?; + + for id in ids { + let res = fetch_next(&conn, worker_id, id, &config).await?; + yield match res { + None => None::>, + Some(job) => { + let (req, parts) = job.take_parts(); + + let args = C::decode(req) + .map_err(|e| trailbase_sqlite::Error::Other(e.into()))?; + + let mut req = Request::new_with_parts(args, parts); + req.parts.namespace = Some(namespace.clone()); + Some(req) + } + } + }; + } + } + } +} + +impl Storage for SqliteStorage +where + T: Serialize + DeserializeOwned + Send + 'static + Unpin + Sync, + C: Codec + Send + 'static + Sync, + C::Error: std::error::Error + Send + Sync + 'static, +{ + type Job = T; + + type Error = trailbase_sqlite::Error; + + type Context = SqlContext; + + type Compact = String; + + async fn push_request( + &mut self, + job: Request, + ) -> Result, Self::Error> { + const QUERY : &str = "INSERT INTO Jobs VALUES (?1, ?2, ?3, 'Pending', 0, ?4, strftime('%s','now'), NULL, NULL, NULL, NULL, ?5)"; + let (task, parts) = job.take_parts(); + let raw = C::encode(&task).map_err(|e| trailbase_sqlite::Error::Other(e.into()))?; + let job_type = self.config.namespace.clone(); + + self + .conn + .execute( + QUERY, + trailbase_sqlite::params!( + raw, + parts.task_id.to_string(), + job_type.to_string(), + parts.context.max_attempts() as i64, + *parts.context.priority() as i64, + ), + ) + .await?; + + Ok(parts) + } + + async fn push_raw_request( + &mut self, + job: Request, + ) -> Result, Self::Error> { + const QUERY :&str = "INSERT INTO Jobs VALUES (?1, ?2, ?3, 'Pending', 0, ?4, strftime('%s','now'), NULL, NULL, NULL, NULL, ?5)"; + let (task, parts) = job.take_parts(); + let raw = C::encode(&task).map_err(|e| trailbase_sqlite::Error::Other(e.into()))?; + + let job_type = self.config.namespace.clone(); + self + .conn + .execute( + QUERY, + trailbase_sqlite::params!( + raw, + parts.task_id.to_string(), + job_type.to_string(), + parts.context.max_attempts() as i64, + *parts.context.priority() as i64, + ), + ) + .await?; + Ok(parts) + } + + async fn schedule_request( + &mut self, + req: Request, + on: i64, + ) -> Result, Self::Error> { + const QUERY: &str = + "INSERT INTO Jobs VALUES (?1, ?2, ?3, 'Pending', 0, ?4, ?5, NULL, NULL, NULL, NULL, ?6)"; + let id = &req.parts.task_id; + let job = C::encode(&req.args).map_err(|e| trailbase_sqlite::Error::Other(e.into()))?; + + let job_type = self.config.namespace.clone(); + self + .conn + .execute( + QUERY, + trailbase_sqlite::params!( + job, + id.to_string(), + job_type, + req.parts.context.max_attempts() as i64, + *req.parts.context.priority() as i64, + on, + ), + ) + .await?; + + Ok(req.parts) + } + + async fn fetch_by_id( + &mut self, + job_id: &TaskId, + ) -> Result>, Self::Error> { + const FETCH_QUERY: &str = "SELECT * FROM Jobs WHERE id = ?1"; + + let res: Option> = self + .conn + .query_row_f(FETCH_QUERY, (job_id.to_string(),), serde_rusqlite::from_row) + .await?; + + match res { + None => Ok(None), + Some(job) => Ok(Some({ + let (req, parts) = job.take_parts(); + let args = C::decode(req).map_err(|e| trailbase_sqlite::Error::Other(e.into()))?; + + let mut req: Request = Request::new_with_parts(args, parts); + req.parts.namespace = Some(Namespace(self.config.namespace.clone())); + req + })), + } + } + + async fn len(&mut self) -> Result { + const QUERY : &str = "Select Count(*) as count from Jobs WHERE (status = 'Pending' OR (status = 'Failed' AND attempts < max_attempts))"; + + let count: Option = self + .conn + .read_query_row_f(QUERY, (), |row| row.get(0)) + .await?; + + return Ok(count.unwrap_or_default()); + } + + async fn reschedule( + &mut self, + job: Request, + wait: Duration, + ) -> Result<(), Self::Error> { + let task_id = job.parts.task_id; + + const QUERY : &str = + "UPDATE Jobs SET status = 'Failed', done_at = NULL, lock_by = NULL, lock_at = NULL, run_at = ?2 WHERE id = ?1"; + + let wait: i64 = wait.as_secs() as i64; + let now: i64 = Utc::now().timestamp(); + let wait_until = now + wait; + + self + .conn + .execute( + QUERY, + trailbase_sqlite::params!(task_id.to_string(), wait_until,), + ) + .await?; + + Ok(()) + } + + async fn update(&mut self, job: Request) -> Result<(), Self::Error> { + let ctx = job.parts.context; + let status = ctx.status().to_string(); + let attempts = job.parts.attempt; + let done_at = *ctx.done_at(); + let lock_by = ctx.lock_by().clone(); + let lock_at = *ctx.lock_at(); + let last_error = ctx.last_error().clone(); + let priority = *ctx.priority(); + let job_id = job.parts.task_id; + + const QUERY : &str = + "UPDATE Jobs SET status = ?1, attempts = ?2, done_at = ?3, lock_by = ?4, lock_at = ?5, last_error = ?6, priority = ?7 WHERE id = ?8"; + + self + .conn + .execute( + QUERY, + trailbase_sqlite::params!( + status.to_owned(), + attempts.current() as i64, + done_at, + lock_by.map(|w| w.name().to_string()), + lock_at, + last_error, + priority as i64, + job_id.to_string() + ), + ) + .await?; + + Ok(()) + } + + async fn is_empty(&mut self) -> Result { + self.len().map_ok(|c| c == 0).await + } + + async fn vacuum(&mut self) -> Result { + const QUERY: &str = "Delete from Jobs where status='Done'"; + + let rows_affected = self.conn.execute(QUERY, ()).await?; + + Ok(rows_affected) + } +} + +impl SqliteStorage { + /// Puts the job instantly back into the queue + /// Another Worker may consume + pub async fn retry( + &mut self, + worker_id: &WorkerId, + job_id: &TaskId, + ) -> Result<(), trailbase_sqlite::Error> { + const QUERY : &str = + "UPDATE Jobs SET status = 'Pending', done_at = NULL, lock_by = NULL WHERE id = ?1 AND lock_by = ?2"; + + self + .conn + .execute( + QUERY, + trailbase_sqlite::params!(job_id.to_string(), worker_id.to_string()), + ) + .await?; + Ok(()) + } + + /// Kill a job + pub async fn kill( + &mut self, + worker_id: &WorkerId, + job_id: &TaskId, + ) -> Result<(), trailbase_sqlite::Error> { + const QUERY : &str = + "UPDATE Jobs SET status = 'Killed', done_at = strftime('%s','now') WHERE id = ?1 AND lock_by = ?2"; + + self + .conn + .execute( + QUERY, + trailbase_sqlite::params!(job_id.to_string(), worker_id.to_string(),), + ) + .await?; + + Ok(()) + } + + /// Add jobs that workers have disappeared to the queue + pub async fn reenqueue_orphaned( + &self, + count: i32, + dead_since: DateTime, + ) -> Result<(), trailbase_sqlite::Error> { + const QUERY: &str = r#"UPDATE Jobs + SET status = "Pending", done_at = NULL, lock_by = NULL, lock_at = NULL, attempts = attempts + 1, last_error ="Job was abandoned" + WHERE id in + (SELECT Jobs.id from Jobs INNER join Workers ON lock_by = Workers.id + WHERE status= "Running" AND workers.last_seen < ?1 + AND Workers.worker_type = ?2 ORDER BY lock_at ASC LIMIT ?3);"#; + + let job_type = self.config.namespace.clone(); + + self + .conn + .execute( + QUERY, + trailbase_sqlite::params!(dead_since.timestamp(), job_type, count as i64,), + ) + .await?; + Ok(()) + } +} + +/// Errors that can occur while polling an SQLite database. +#[derive(thiserror::Error, Debug)] +pub enum SqlitePollError { + /// Error during a keep-alive heartbeat. + #[error("Encountered an error during KeepAlive heartbeat: `{0}`")] + KeepAliveError(trailbase_sqlite::Error), + + /// Error during re-enqueuing orphaned tasks. + #[error("Encountered an error during ReenqueueOrphaned heartbeat: `{0}`")] + ReenqueueOrphanedError(trailbase_sqlite::Error), +} + +impl Backend> for SqliteStorage +where + C: Codec + Send + 'static + Sync, + C::Error: std::error::Error + 'static + Send + Sync, + T: Serialize + DeserializeOwned + Sync + Send + Unpin + 'static, +{ + type Stream = BackendStream>>; + type Layer = AckLayer, T, SqlContext, C>; + + type Codec = JsonCodec; + + fn poll(mut self, worker: &Worker) -> Poller { + let layer = AckLayer::new(self.clone()); + let config = self.config.clone(); + let controller = self.controller.clone(); + let stream = self + .stream_jobs(worker, config.poll_interval, config.buffer_size) + .map_err(|e| Error::SourceError(Arc::new(Box::new(e)))); + let stream = BackendStream::new(stream.boxed(), controller); + let requeue_storage = self.clone(); + let w = worker.clone(); + let heartbeat = async move { + // Lets reenqueue any jobs that belonged to this worker in case of a death + if let Err(e) = self + .reenqueue_orphaned((config.buffer_size * 10) as i32, Utc::now()) + .await + { + w.emit(Event::Error(Box::new( + SqlitePollError::ReenqueueOrphanedError(e), + ))); + } + loop { + let now: i64 = Utc::now().timestamp(); + if let Err(e) = self.keep_alive_at(&w, now).await { + w.emit(Event::Error(Box::new(SqlitePollError::KeepAliveError(e)))); + } + apalis_core::sleep(Duration::from_secs(30)).await; + } + } + .boxed(); + let w = worker.clone(); + let reenqueue_beat = async move { + loop { + let dead_since = + Utc::now() - chrono::Duration::from_std(config.reenqueue_orphaned_after).unwrap(); + if let Err(e) = requeue_storage + .reenqueue_orphaned( + config + .buffer_size + .try_into() + .expect("could not convert usize to i32"), + dead_since, + ) + .await + { + w.emit(Event::Error(Box::new( + SqlitePollError::ReenqueueOrphanedError(e), + ))); + } + apalis_core::sleep(config.poll_interval).await; + } + }; + Poller::new_with_layer( + stream, + async { + futures::join!(heartbeat, reenqueue_beat); + }, + layer, + ) + } +} + +impl Ack for SqliteStorage { + type Context = SqlContext; + type AckError = trailbase_sqlite::Error; + + async fn ack( + &mut self, + ctx: &Self::Context, + res: &Response, + ) -> Result<(), trailbase_sqlite::Error> { + let conn = self.conn.clone(); + const QUERY : &str = + "UPDATE Jobs SET status = ?4, attempts = ?5, done_at = strftime('%s','now'), last_error = ?3 WHERE id = ?1 AND lock_by = ?2"; + let result = serde_json::to_string(&res.inner.as_ref().map_err(|r| r.to_string())) + .map_err(|e| trailbase_sqlite::Error::Other(e.into()))?; + + conn + .execute( + QUERY, + trailbase_sqlite::params!( + res.task_id.to_string(), + ctx + .lock_by() + .as_ref() + .expect("Task is not locked") + .to_string(), + result, + calculate_status(ctx, res).to_string(), + res.attempt.current() as i64 + ), + ) + .await?; + Ok(()) + } +} + +impl BackendExpose + for SqliteStorage> +{ + type Request = Request>; + type Error = SqlError; + + async fn stats(&self) -> Result { + const FETCH_QUERY: &str = "SELECT + COUNT(1) FILTER (WHERE status = 'Pending') AS pending, + COUNT(1) FILTER (WHERE status = 'Running') AS running, + COUNT(1) FILTER (WHERE status = 'Done') AS done, + COUNT(1) FILTER (WHERE status = 'Failed') AS failed, + COUNT(1) FILTER (WHERE status = 'Killed') AS killed + FROM Jobs WHERE job_type = ?"; + + let res: Option<(i64, i64, i64, i64, i64)> = self + .conn + .query_row_f( + FETCH_QUERY, + (self.get_config().namespace().to_string(),), + |row| -> Result<_, trailbase_sqlite::Error> { + Ok(( + row.get(0)?, + row.get(1)?, + row.get(2)?, + row.get(3)?, + row.get(4)?, + )) + }, + ) + .await?; + + let Some(res) = res else { + return Err(trailbase_sqlite::Error::Other("missing".into()).into()); + }; + + Ok(Stat { + pending: res.0.try_into()?, + running: res.1.try_into()?, + dead: res.4.try_into()?, + failed: res.3.try_into()?, + success: res.2.try_into()?, + }) + } + + async fn list_jobs(&self, status: &State, page: i32) -> Result, Self::Error> { + const FETCH_QUERY : &str = "SELECT * FROM Jobs WHERE status = ? AND job_type = ? ORDER BY done_at DESC, run_at DESC LIMIT 10 OFFSET ?"; + + let status = status.to_string(); + let res: Vec> = self + .conn + .read_query_values( + FETCH_QUERY, + trailbase_sqlite::params!( + status, + self.get_config().namespace().to_string(), + ((page - 1) * 10).to_string(), + ), + ) + .await?; + Ok( + res + .into_iter() + .map(|j| { + let (req, ctx) = j.take_parts(); + let req = JsonCodec::::decode(req).unwrap(); + Request::new_with_ctx(req, ctx) + }) + .collect(), + ) + } + + async fn list_workers(&self) -> Result>, Self::Error> { + const FETCH_QUERY : &str = + "SELECT id, layers, last_seen FROM Workers WHERE worker_type = ? ORDER BY last_seen DESC LIMIT 20 OFFSET ?"; + + let res: Vec<(String, String, i64)> = self + .conn + .read_query_values( + FETCH_QUERY, + trailbase_sqlite::params!(self.get_config().namespace().to_string(), 0,), + ) + .await?; + + Ok( + res + .into_iter() + .map(|w| Worker::new(WorkerId::new(w.0), WorkerState::new::(w.1))) + .collect(), + ) + } +} + +#[cfg(test)] +mod tests { + use super::*; + // use crate::sql_storage_tests; + + // use apalis_core::request::State; + // use chrono::Utc; + // use email_service::example_good_email; + // use email_service::Email; + // use futures::StreamExt; + + // use apalis_core::generic_storage_test; + // use apalis_core::test_utils::apalis_test_service_fn; + // use apalis_core::test_utils::TestWrapper; + + // sql_storage_tests!(setup::, SqliteStorage, Email); + + /// migrate DB and return a storage instance. + #[allow(unused)] + async fn setup() -> SqliteStorage { + // Because connections cannot be shared across async runtime + // (different runtimes are created for each test), + // we don't share the storage and tests must be run sequentially. + let conn = Connection::open_in_memory().unwrap(); + SqliteStorage::setup(&conn) + .await + .expect("failed to migrate DB"); + let config = Config::new("apalis::test"); + let storage = SqliteStorage::::new_with_config(conn, config); + + storage + } + + // #[tokio::test] + // async fn test_inmemory_sqlite_worker() { + // let mut sqlite = setup().await; + // sqlite + // .push(Email { + // subject: "Test Subject".to_string(), + // to: "example@sqlite".to_string(), + // text: "Some Text".to_string(), + // }) + // .await + // .expect("Unable to push job"); + // let len = sqlite.len().await.expect("Could not fetch the jobs count"); + // assert_eq!(len, 1); + // } + // + // async fn consume_one( + // storage: &mut SqliteStorage, + // worker: &Worker, + // ) -> Request { + // let mut stream = storage + // .stream_jobs(worker, std::time::Duration::from_secs(10), 1) + // .boxed(); + // stream + // .next() + // .await + // .expect("stream is empty") + // .expect("failed to poll job") + // .expect("no job is pending") + // } + // + // async fn register_worker_at( + // storage: &mut SqliteStorage, + // last_seen: i64, + // ) -> Worker { + // let worker_id = WorkerId::new("test-worker"); + // + // let worker = Worker::new(worker_id, Default::default()); + // storage + // .keep_alive_at(&worker, last_seen) + // .await + // .expect("failed to register worker"); + // + // worker.start(); + // worker + // } + + // async fn register_worker(storage: &mut SqliteStorage) -> Worker { + // register_worker_at(storage, Utc::now().timestamp()).await + // } + // + // async fn push_email(storage: &mut SqliteStorage, email: Email) { + // storage.push(email).await.expect("failed to push a job"); + // } + // + // async fn get_job( + // storage: &mut SqliteStorage, + // job_id: &TaskId, + // ) -> Request { + // storage + // .fetch_by_id(job_id) + // .await + // .expect("failed to fetch job by id") + // .expect("no job found by id") + // } + // + // #[tokio::test] + // async fn test_consume_last_pushed_job() { + // let mut storage = setup().await; + // let worker = register_worker(&mut storage).await; + // + // push_email(&mut storage, example_good_email()).await; + // let len = storage.len().await.expect("Could not fetch the jobs count"); + // assert_eq!(len, 1); + // + // let job = consume_one(&mut storage, &worker).await; + // let ctx = job.parts.context; + // assert_eq!(*ctx.status(), State::Running); + // assert_eq!(*ctx.lock_by(), Some(worker.id().clone())); + // assert!(ctx.lock_at().is_some()); + // } + // + // #[tokio::test] + // async fn test_acknowledge_job() { + // let mut storage = setup().await; + // let worker = register_worker(&mut storage).await; + // + // push_email(&mut storage, example_good_email()).await; + // let job = consume_one(&mut storage, &worker).await; + // let job_id = &job.parts.task_id; + // let ctx = &job.parts.context; + // let res = 1usize; + // storage + // .ack( + // ctx, + // &Response::success(res, job_id.clone(), job.parts.attempt.clone()), + // ) + // .await + // .expect("failed to acknowledge the job"); + // + // let job = get_job(&mut storage, job_id).await; + // let ctx = job.parts.context; + // assert_eq!(*ctx.status(), State::Done); + // assert!(ctx.done_at().is_some()); + // } + // + // #[tokio::test] + // async fn test_kill_job() { + // let mut storage = setup().await; + // + // push_email(&mut storage, example_good_email()).await; + // + // let worker = register_worker(&mut storage).await; + // + // let job = consume_one(&mut storage, &worker).await; + // let job_id = &job.parts.task_id; + // + // storage + // .kill(&worker.id(), job_id) + // .await + // .expect("failed to kill job"); + // + // let job = get_job(&mut storage, job_id).await; + // let ctx = job.parts.context; + // assert_eq!(*ctx.status(), State::Killed); + // assert!(ctx.done_at().is_some()); + // } + // + // #[tokio::test] + // async fn test_heartbeat_renqueueorphaned_pulse_last_seen_6min() { + // let mut storage = setup().await; + // + // push_email(&mut storage, example_good_email()).await; + // + // let six_minutes_ago = Utc::now() - Duration::from_secs(6 * 60); + // + // let five_minutes_ago = Utc::now() - Duration::from_secs(5 * 60); + // let worker = register_worker_at(&mut storage, six_minutes_ago.timestamp()).await; + // + // let job = consume_one(&mut storage, &worker).await; + // let job_id = &job.parts.task_id; + // storage + // .reenqueue_orphaned(1, five_minutes_ago) + // .await + // .expect("failed to heartbeat"); + // let job = get_job(&mut storage, job_id).await; + // let ctx = &job.parts.context; + // assert_eq!(*ctx.status(), State::Pending); + // assert!(ctx.done_at().is_none()); + // assert!(ctx.lock_by().is_none()); + // assert!(ctx.lock_at().is_none()); + // assert_eq!(*ctx.last_error(), Some("Job was abandoned".to_owned())); + // assert_eq!(job.parts.attempt.current(), 1); + // + // let job = consume_one(&mut storage, &worker).await; + // let ctx = &job.parts.context; + // // Simulate worker + // job.parts.attempt.increment(); + // storage + // .ack( + // ctx, + // &Response::new(Ok("success".to_owned()), job_id.clone(), job.parts.attempt), + // ) + // .await + // .unwrap(); + // //end simulate worker + // + // let job = get_job(&mut storage, &job_id).await; + // let ctx = &job.parts.context; + // assert_eq!(*ctx.status(), State::Done); + // assert_eq!(*ctx.lock_by(), Some(worker.id().clone())); + // assert!(ctx.lock_at().is_some()); + // assert_eq!(*ctx.last_error(), Some("{\"Ok\":\"success\"}".to_owned())); + // assert_eq!(job.parts.attempt.current(), 2); + // } + // + // #[tokio::test] + // async fn test_heartbeat_renqueueorphaned_pulse_last_seen_4min() { + // let mut storage = setup().await; + // + // push_email(&mut storage, example_good_email()).await; + // + // let six_minutes_ago = Utc::now() - Duration::from_secs(6 * 60); + // let four_minutes_ago = Utc::now() - Duration::from_secs(4 * 60); + // let worker = register_worker_at(&mut storage, four_minutes_ago.timestamp()).await; + // + // let job = consume_one(&mut storage, &worker).await; + // let job_id = job.parts.task_id; + // storage + // .reenqueue_orphaned(1, six_minutes_ago) + // .await + // .expect("failed to heartbeat"); + // + // let job = get_job(&mut storage, &job_id).await; + // let ctx = &job.parts.context; + // + // // Simulate worker + // job.parts.attempt.increment(); + // storage + // .ack( + // ctx, + // &Response::new(Ok("success".to_owned()), job_id.clone(), job.parts.attempt), + // ) + // .await + // .unwrap(); + // //end simulate worker + // + // let job = get_job(&mut storage, &job_id).await; + // let ctx = &job.parts.context; + // assert_eq!(*ctx.status(), State::Done); + // assert_eq!(*ctx.lock_by(), Some(worker.id().clone())); + // assert!(ctx.lock_at().is_some()); + // assert_eq!(*ctx.last_error(), Some("{\"Ok\":\"success\"}".to_owned())); + // assert_eq!(job.parts.attempt.current(), 1); + // } +}