From ebddc51d85e997e00a1d2a9c9688c72cf689d391 Mon Sep 17 00:00:00 2001 From: Sebastian Jeltsch Date: Sat, 19 Apr 2025 15:26:52 +0200 Subject: [PATCH] Enable apalis' generic and default storage backend tests. --- Cargo.lock | 3 +- trailbase-apalis/Cargo.toml | 3 +- trailbase-apalis/src/from_row.rs | 62 ++++ trailbase-apalis/src/lib.rs | 21 +- trailbase-apalis/src/sqlite.rs | 555 ++++++++++++++++++------------- 5 files changed, 399 insertions(+), 245 deletions(-) create mode 100644 trailbase-apalis/src/from_row.rs diff --git a/Cargo.lock b/Cargo.lock index e3befef2..931403c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6778,13 +6778,14 @@ dependencies = [ "async-std", "async-stream", "chrono", + "email_address", "futures", "futures-lite", "log", "once_cell", + "rusqlite", "serde", "serde_json", - "serde_rusqlite", "thiserror 2.0.12", "tokio", "trailbase-apalis", diff --git a/trailbase-apalis/Cargo.toml b/trailbase-apalis/Cargo.toml index e52c39fb..ae8c8221 100644 --- a/trailbase-apalis/Cargo.toml +++ b/trailbase-apalis/Cargo.toml @@ -21,9 +21,9 @@ chrono = { version = "0.4", features = ["serde"] } futures = "0.3.30" futures-lite = "2.3.0" log = "0.4.21" +rusqlite.workspace = true serde = { version = "1", features = ["derive"] } serde_json = "1" -serde_rusqlite = "0.38.0" thiserror = "2.0.0" tokio = { workspace = true } trailbase-refinery-core = { workspace = true } @@ -33,6 +33,7 @@ trailbase-sqlite = { workspace = true } [dev-dependencies] apalis = { version = "0.7.0" } apalis-core = { version = "0.7.0", default-features = false, features = [ "test-utils" ] } +email_address = "0.2.9" once_cell = "1.19.0" tokio = { workspace = true } trailbase-apalis = { path = "." } diff --git a/trailbase-apalis/src/from_row.rs b/trailbase-apalis/src/from_row.rs new file mode 100644 index 00000000..40c49f4c --- /dev/null +++ b/trailbase-apalis/src/from_row.rs @@ -0,0 +1,62 @@ +use apalis_core::request::Parts; +use apalis_core::task::attempt::Attempt; +use apalis_core::task::task_id::TaskId; +use apalis_core::{request::Request, worker::WorkerId}; + +use crate::context::SqlContext; + +pub(crate) type SqlRequest = Request; + +pub(crate) fn from_row(row: &rusqlite::Row) -> Result, trailbase_sqlite::Error> { + use chrono::DateTime; + use std::str::FromStr; + + let job: String = row.get("job")?; + let task_id: TaskId = TaskId::from_str(&row.get::<_, String>("id")?) + .map_err(|e| trailbase_sqlite::Error::Other(e.into()))?; + let mut parts = Parts::::default(); + parts.task_id = task_id; + + let attempt: i32 = row.get("attempts").unwrap_or(0); + parts.attempt = Attempt::new_with_value(attempt as usize); + + let mut context = crate::context::SqlContext::new(); + + let run_at: i64 = row.get("run_at")?; + context.set_run_at(DateTime::from_timestamp(run_at, 0).unwrap_or_default()); + + if let Ok(max_attempts) = row.get("max_attempts") { + context.set_max_attempts(max_attempts) + } + + let done_at: Option = row.get("done_at").unwrap_or_default(); + context.set_done_at(done_at); + + let lock_at: Option = row.get("lock_at").unwrap_or_default(); + context.set_lock_at(lock_at); + + let last_error = row.get("last_error").unwrap_or_default(); + context.set_last_error(last_error); + + let status: String = row.get("status")?; + context.set_status( + status + .parse() + .map_err(|_| trailbase_sqlite::Error::Other("parse failed".into()))?, + ); + + let lock_by: Option = row.get("lock_by").unwrap_or_default(); + context.set_lock_by( + lock_by + .as_deref() + .map(WorkerId::from_str) + .transpose() + .map_err(|_| trailbase_sqlite::Error::Other("transpose failed".into()))?, + ); + + let priority: i32 = row.get("priority").unwrap_or_default(); + context.set_priority(priority); + + parts.context = context; + Ok(SqlRequest { args: job, parts }) +} diff --git a/trailbase-apalis/src/lib.rs b/trailbase-apalis/src/lib.rs index bf15c02f..8307099d 100644 --- a/trailbase-apalis/src/lib.rs +++ b/trailbase-apalis/src/lib.rs @@ -13,6 +13,8 @@ use apalis_core::{error::Error, request::State, response::Response}; /// The context of the sql job pub mod context; +/// Util for fetching rows +pub mod from_row; /// Sqlite Storage for apalis. /// Uses a transaction and min(rowid) @@ -199,18 +201,15 @@ macro_rules! sql_storage_tests { .unwrap(); let (job_id, res) = storage.execute_next().await.unwrap(); + assert_eq!(res, Err("AbortError: Invalid character.".to_owned())); apalis_core::sleep(Duration::from_secs(1)).await; - let job = storage - .fetch_by_id(&job_id) - .await - .unwrap() - .expect("No job found"); + let job = storage.fetch_by_id(&job_id).await.unwrap().unwrap(); let ctx = job.parts.context; assert_eq!(*ctx.status(), State::Killed); // assert!(ctx.done_at().is_some()); assert_eq!( - ctx.last_error().clone().unwrap(), + ctx.last_error().clone().expect("error"), "{\"Err\":\"AbortError: Invalid character.\"}" ); } @@ -221,12 +220,16 @@ macro_rules! sql_storage_tests { storage .push(email_service::example_good_email()) .await - .unwrap(); + .expect("email"); - let (job_id, res) = storage.execute_next().await.unwrap(); + let (job_id, res) = storage.execute_next().await.expect("exec next"); assert_eq!(res, Ok("()".to_owned())); apalis_core::sleep(Duration::from_secs(1)).await; - let job = storage.fetch_by_id(&job_id).await.unwrap().unwrap(); + let job = storage + .fetch_by_id(&job_id) + .await + .expect("result") + .expect("job"); let ctx = job.parts.context; assert_eq!(*ctx.status(), State::Done); assert!(ctx.done_at().is_some()); diff --git a/trailbase-apalis/src/sqlite.rs b/trailbase-apalis/src/sqlite.rs index 0ad07b47..8e1ff5b5 100644 --- a/trailbase-apalis/src/sqlite.rs +++ b/trailbase-apalis/src/sqlite.rs @@ -28,7 +28,7 @@ use std::{marker::PhantomData, time::Duration}; pub use trailbase_sqlite::Connection; -type SqlRequest = Request; +use crate::from_row::{from_row, SqlRequest}; /// Represents a [Storage] that persists to Sqlite pub struct SqliteStorage> { @@ -70,10 +70,16 @@ mod main { impl SqliteStorage<()> { /// Perform migrations for storage pub async fn setup(conn: &Connection) -> Result<(), trailbase_sqlite::Error> { - conn.execute("PRAGMA journal_mode = 'WAL'", ()).await?; - conn.execute("PRAGMA temp_store = 2", ()).await?; - conn.execute("PRAGMA synchronous = NORMAL", ()).await?; - conn.execute("PRAGMA cache_size = 64000", ()).await?; + conn + .execute_batch( + r#" + PRAGMA journal_mode = 'WAL'; + PRAGMA temp_store = 2; + PRAGMA synchronous = NORMAL; + PRAGMA cache_size = 64000; + "#, + ) + .await?; let runner = main::migrations::runner(); @@ -169,7 +175,9 @@ async fn fetch_next( config: &Config, ) -> Result>, trailbase_sqlite::Error> { let now: i64 = Utc::now().timestamp(); - const UPDATE_QUERY : &str = "UPDATE Jobs SET status = 'Running', lock_by = ?2, lock_at = ?3 WHERE id = ?1 AND job_type = ?4 AND status = 'Pending' AND lock_by IS NULL; Select * from Jobs where id = ?1 AND lock_by = ?2 AND job_type = ?4"; + const UPDATE_QUERY: &str = r#" + UPDATE Jobs SET status = 'Running', lock_by = ?2, lock_at = ?3 WHERE id = ?1 AND job_type = ?4 AND status = 'Pending' AND lock_by IS NULL RETURNING * + "#; let job: Option> = conn .query_row_f( @@ -180,11 +188,27 @@ async fn fetch_next( now, config.namespace.clone() ), - serde_rusqlite::from_row, + from_row, ) .await?; - Ok(job) + if job.is_none() { + let job: Option> = conn + .query_row_f( + "SELECT * FROM Jobs WHERE id = ?1 AND lock_by = ?2 AND job_type = ?3", + trailbase_sqlite::params!( + id.to_string(), + worker_id.to_string(), + config.namespace.clone() + ), + from_row, + ) + .await?; + + return Ok(job); + } + + return Ok(job); } impl SqliteStorage @@ -347,7 +371,7 @@ where let res: Option> = self .conn - .query_row_f(FETCH_QUERY, (job_id.to_string(),), serde_rusqlite::from_row) + .query_row_f(FETCH_QUERY, (job_id.to_string(),), from_row) .await?; match res { @@ -364,7 +388,7 @@ where } async fn len(&mut self) -> Result { - const QUERY : &str = "Select Count(*) as count from Jobs WHERE (status = 'Pending' OR (status = 'Failed' AND attempts < max_attempts))"; + const QUERY : &str = "SELECT COUNT(*) AS count FROM Jobs WHERE (status = 'Pending' OR (status = 'Failed' AND attempts < max_attempts))"; let count: Option = self .conn @@ -565,6 +589,7 @@ where } } .boxed(); + let w = worker.clone(); let reenqueue_beat = async move { loop { @@ -728,22 +753,77 @@ impl BackendExp #[cfg(test)] mod tests { use super::*; - // use crate::sql_storage_tests; + use crate::sql_storage_tests; - // use apalis_core::request::State; - // use chrono::Utc; - // use email_service::example_good_email; - // use email_service::Email; - // use futures::StreamExt; + use apalis_core::generic_storage_test; + use apalis_core::request::State; + use apalis_core::test_utils::apalis_test_service_fn; + use apalis_core::test_utils::TestWrapper; + use futures::StreamExt; + use std::io; - // use apalis_core::generic_storage_test; - // use apalis_core::test_utils::apalis_test_service_fn; - // use apalis_core::test_utils::TestWrapper; + mod email_service { + use apalis_core::error::Error; + use email_address::EmailAddress; + use serde::{Deserialize, Serialize}; + use std::str::FromStr; + use std::sync::Arc; - // sql_storage_tests!(setup::, SqliteStorage, Email); + #[derive(Debug, Deserialize, Serialize, Clone)] + pub(super) struct Email { + pub to: String, + pub subject: String, + pub text: String, + } + + pub(super) async fn send_email(job: Email) -> Result<(), Error> { + let validation = EmailAddress::from_str(&job.to); + match validation { + Ok(email) => { + log::info!("Attempting to send email to {}", email.as_str()); + Ok(()) + } + Err(email_address::Error::InvalidCharacter) => { + log::error!("Killed send email job. Invalid character {}", job.to); + Err(Error::Abort(Arc::new(Box::new( + email_address::Error::InvalidCharacter, + )))) + } + Err(e) => Err(Error::Failed(Arc::new(Box::new(e)))), + } + } + + pub(super) fn example_good_email() -> Email { + Email { + subject: "Test Subject".to_string(), + to: "example@gmail.com".to_string(), + text: "Some Text".to_string(), + } + } + + pub(super) fn example_killed_email() -> Email { + Email { + subject: "Test Subject".to_string(), + to: "example@©.com".to_string(), // killed because it has © which is invalid + text: "Some Text".to_string(), + } + } + + pub(super) fn example_retry_able_email() -> Email { + Email { + subject: "Test Subject".to_string(), + to: "example".to_string(), + text: "Some Text".to_string(), + } + } + } + + use email_service::Email; + + generic_storage_test!(setup); + sql_storage_tests!(setup::, SqliteStorage, Email); /// migrate DB and return a storage instance. - #[allow(unused)] async fn setup() -> SqliteStorage { // Because connections cannot be shared across async runtime // (different runtimes are created for each test), @@ -758,218 +838,225 @@ mod tests { storage } - // #[tokio::test] - // async fn test_inmemory_sqlite_worker() { - // let mut sqlite = setup().await; - // sqlite - // .push(Email { - // subject: "Test Subject".to_string(), - // to: "example@sqlite".to_string(), - // text: "Some Text".to_string(), - // }) - // .await - // .expect("Unable to push job"); - // let len = sqlite.len().await.expect("Could not fetch the jobs count"); - // assert_eq!(len, 1); - // } - // - // async fn consume_one( - // storage: &mut SqliteStorage, - // worker: &Worker, - // ) -> Request { - // let mut stream = storage - // .stream_jobs(worker, std::time::Duration::from_secs(10), 1) - // .boxed(); - // stream - // .next() - // .await - // .expect("stream is empty") - // .expect("failed to poll job") - // .expect("no job is pending") - // } - // - // async fn register_worker_at( - // storage: &mut SqliteStorage, - // last_seen: i64, - // ) -> Worker { - // let worker_id = WorkerId::new("test-worker"); - // - // let worker = Worker::new(worker_id, Default::default()); - // storage - // .keep_alive_at(&worker, last_seen) - // .await - // .expect("failed to register worker"); - // - // worker.start(); - // worker - // } + #[tokio::test] + async fn test_inmemory_sqlite_worker() { + let mut sqlite = setup().await; + sqlite + .push(Email { + subject: "Test Subject".to_string(), + to: "example@sqlite".to_string(), + text: "Some Text".to_string(), + }) + .await + .expect("Unable to push job"); + let len = sqlite.len().await.expect("Could not fetch the jobs count"); + assert_eq!(len, 1); + } - // async fn register_worker(storage: &mut SqliteStorage) -> Worker { - // register_worker_at(storage, Utc::now().timestamp()).await - // } - // - // async fn push_email(storage: &mut SqliteStorage, email: Email) { - // storage.push(email).await.expect("failed to push a job"); - // } - // - // async fn get_job( - // storage: &mut SqliteStorage, - // job_id: &TaskId, - // ) -> Request { - // storage - // .fetch_by_id(job_id) - // .await - // .expect("failed to fetch job by id") - // .expect("no job found by id") - // } - // - // #[tokio::test] - // async fn test_consume_last_pushed_job() { - // let mut storage = setup().await; - // let worker = register_worker(&mut storage).await; - // - // push_email(&mut storage, example_good_email()).await; - // let len = storage.len().await.expect("Could not fetch the jobs count"); - // assert_eq!(len, 1); - // - // let job = consume_one(&mut storage, &worker).await; - // let ctx = job.parts.context; - // assert_eq!(*ctx.status(), State::Running); - // assert_eq!(*ctx.lock_by(), Some(worker.id().clone())); - // assert!(ctx.lock_at().is_some()); - // } - // - // #[tokio::test] - // async fn test_acknowledge_job() { - // let mut storage = setup().await; - // let worker = register_worker(&mut storage).await; - // - // push_email(&mut storage, example_good_email()).await; - // let job = consume_one(&mut storage, &worker).await; - // let job_id = &job.parts.task_id; - // let ctx = &job.parts.context; - // let res = 1usize; - // storage - // .ack( - // ctx, - // &Response::success(res, job_id.clone(), job.parts.attempt.clone()), - // ) - // .await - // .expect("failed to acknowledge the job"); - // - // let job = get_job(&mut storage, job_id).await; - // let ctx = job.parts.context; - // assert_eq!(*ctx.status(), State::Done); - // assert!(ctx.done_at().is_some()); - // } - // - // #[tokio::test] - // async fn test_kill_job() { - // let mut storage = setup().await; - // - // push_email(&mut storage, example_good_email()).await; - // - // let worker = register_worker(&mut storage).await; - // - // let job = consume_one(&mut storage, &worker).await; - // let job_id = &job.parts.task_id; - // - // storage - // .kill(&worker.id(), job_id) - // .await - // .expect("failed to kill job"); - // - // let job = get_job(&mut storage, job_id).await; - // let ctx = job.parts.context; - // assert_eq!(*ctx.status(), State::Killed); - // assert!(ctx.done_at().is_some()); - // } - // - // #[tokio::test] - // async fn test_heartbeat_renqueueorphaned_pulse_last_seen_6min() { - // let mut storage = setup().await; - // - // push_email(&mut storage, example_good_email()).await; - // - // let six_minutes_ago = Utc::now() - Duration::from_secs(6 * 60); - // - // let five_minutes_ago = Utc::now() - Duration::from_secs(5 * 60); - // let worker = register_worker_at(&mut storage, six_minutes_ago.timestamp()).await; - // - // let job = consume_one(&mut storage, &worker).await; - // let job_id = &job.parts.task_id; - // storage - // .reenqueue_orphaned(1, five_minutes_ago) - // .await - // .expect("failed to heartbeat"); - // let job = get_job(&mut storage, job_id).await; - // let ctx = &job.parts.context; - // assert_eq!(*ctx.status(), State::Pending); - // assert!(ctx.done_at().is_none()); - // assert!(ctx.lock_by().is_none()); - // assert!(ctx.lock_at().is_none()); - // assert_eq!(*ctx.last_error(), Some("Job was abandoned".to_owned())); - // assert_eq!(job.parts.attempt.current(), 1); - // - // let job = consume_one(&mut storage, &worker).await; - // let ctx = &job.parts.context; - // // Simulate worker - // job.parts.attempt.increment(); - // storage - // .ack( - // ctx, - // &Response::new(Ok("success".to_owned()), job_id.clone(), job.parts.attempt), - // ) - // .await - // .unwrap(); - // //end simulate worker - // - // let job = get_job(&mut storage, &job_id).await; - // let ctx = &job.parts.context; - // assert_eq!(*ctx.status(), State::Done); - // assert_eq!(*ctx.lock_by(), Some(worker.id().clone())); - // assert!(ctx.lock_at().is_some()); - // assert_eq!(*ctx.last_error(), Some("{\"Ok\":\"success\"}".to_owned())); - // assert_eq!(job.parts.attempt.current(), 2); - // } - // - // #[tokio::test] - // async fn test_heartbeat_renqueueorphaned_pulse_last_seen_4min() { - // let mut storage = setup().await; - // - // push_email(&mut storage, example_good_email()).await; - // - // let six_minutes_ago = Utc::now() - Duration::from_secs(6 * 60); - // let four_minutes_ago = Utc::now() - Duration::from_secs(4 * 60); - // let worker = register_worker_at(&mut storage, four_minutes_ago.timestamp()).await; - // - // let job = consume_one(&mut storage, &worker).await; - // let job_id = job.parts.task_id; - // storage - // .reenqueue_orphaned(1, six_minutes_ago) - // .await - // .expect("failed to heartbeat"); - // - // let job = get_job(&mut storage, &job_id).await; - // let ctx = &job.parts.context; - // - // // Simulate worker - // job.parts.attempt.increment(); - // storage - // .ack( - // ctx, - // &Response::new(Ok("success".to_owned()), job_id.clone(), job.parts.attempt), - // ) - // .await - // .unwrap(); - // //end simulate worker - // - // let job = get_job(&mut storage, &job_id).await; - // let ctx = &job.parts.context; - // assert_eq!(*ctx.status(), State::Done); - // assert_eq!(*ctx.lock_by(), Some(worker.id().clone())); - // assert!(ctx.lock_at().is_some()); - // assert_eq!(*ctx.last_error(), Some("{\"Ok\":\"success\"}".to_owned())); - // assert_eq!(job.parts.attempt.current(), 1); - // } + async fn consume_one( + storage: &mut SqliteStorage, + worker: &Worker, + ) -> Request { + let mut stream = storage + .stream_jobs(worker, std::time::Duration::from_secs(10), 1) + .boxed(); + + return stream + .next() + .await + .expect("stream is empty") + .expect("failed to poll job") + .expect("no job is pending"); + } + + async fn register_worker_at( + storage: &mut SqliteStorage, + last_seen: i64, + ) -> Worker { + let worker_id = WorkerId::new("test-worker"); + + let worker = Worker::new(worker_id, Default::default()); + storage + .keep_alive_at(&worker, last_seen) + .await + .expect("failed to register worker"); + + worker.start(); + worker + } + + async fn register_worker(storage: &mut SqliteStorage) -> Worker { + register_worker_at(storage, Utc::now().timestamp()).await + } + + async fn push_email(storage: &mut SqliteStorage, email: Email) { + storage.push(email).await.expect("failed to push a job"); + } + + async fn get_job( + storage: &mut SqliteStorage, + job_id: &TaskId, + ) -> Request { + storage + .fetch_by_id(job_id) + .await + .expect("failed to fetch job by id") + .expect("no job found by id") + } + + #[tokio::test] + async fn test_consume_last_pushed_job() { + let mut storage = setup().await; + let worker = register_worker(&mut storage).await; + + push_email(&mut storage, email_service::example_good_email()).await; + let len = storage.len().await.expect("Could not fetch the jobs count"); + assert_eq!(len, 1); + + let job = consume_one(&mut storage, &worker).await; + let ctx = job.parts.context; + assert_eq!(*ctx.status(), State::Running); + assert_eq!(*ctx.lock_by(), Some(worker.id().clone())); + assert!(ctx.lock_at().is_some()); + } + + #[tokio::test] + async fn test_acknowledge_job() { + let mut storage = setup().await; + let worker = register_worker(&mut storage).await; + + push_email(&mut storage, email_service::example_good_email()).await; + let job = consume_one(&mut storage, &worker).await; + let job_id = &job.parts.task_id; + let ctx = &job.parts.context; + let res = 1usize; + storage + .ack( + ctx, + &Response::success(res, job_id.clone(), job.parts.attempt.clone()), + ) + .await + .expect("failed to acknowledge the job"); + + let job = get_job(&mut storage, job_id).await; + let ctx = job.parts.context; + assert_eq!(*ctx.status(), State::Done); + assert!(ctx.done_at().is_some()); + } + + #[tokio::test] + async fn test_kill_job() { + let mut storage = setup().await; + + push_email(&mut storage, email_service::example_good_email()).await; + + let _rows = storage + .conn() + .read_query_rows("SELECT * FROM Jobs;", ()) + .await + .unwrap(); + + let worker = register_worker(&mut storage).await; + + let job = consume_one(&mut storage, &worker).await; + let job_id = &job.parts.task_id; + + storage + .kill(&worker.id(), job_id) + .await + .expect("failed to kill job"); + + let job = get_job(&mut storage, job_id).await; + let ctx = job.parts.context; + assert_eq!(*ctx.status(), State::Killed); + assert!(ctx.done_at().is_some()); + } + + #[tokio::test] + async fn test_heartbeat_renqueueorphaned_pulse_last_seen_6min() { + let mut storage = setup().await; + + push_email(&mut storage, email_service::example_good_email()).await; + + let six_minutes_ago = Utc::now() - Duration::from_secs(6 * 60); + + let five_minutes_ago = Utc::now() - Duration::from_secs(5 * 60); + let worker = register_worker_at(&mut storage, six_minutes_ago.timestamp()).await; + + let job = consume_one(&mut storage, &worker).await; + let job_id = &job.parts.task_id; + storage + .reenqueue_orphaned(1, five_minutes_ago) + .await + .expect("failed to heartbeat"); + let job = get_job(&mut storage, job_id).await; + let ctx = &job.parts.context; + assert_eq!(*ctx.status(), State::Pending); + assert!(ctx.done_at().is_none()); + assert!(ctx.lock_by().is_none()); + assert!(ctx.lock_at().is_none()); + assert_eq!(*ctx.last_error(), Some("Job was abandoned".to_owned())); + assert_eq!(job.parts.attempt.current(), 1); + + let job = consume_one(&mut storage, &worker).await; + let ctx = &job.parts.context; + // Simulate worker + job.parts.attempt.increment(); + storage + .ack( + ctx, + &Response::new(Ok("success".to_owned()), job_id.clone(), job.parts.attempt), + ) + .await + .unwrap(); + //end simulate worker + + let job = get_job(&mut storage, &job_id).await; + let ctx = &job.parts.context; + assert_eq!(*ctx.status(), State::Done); + assert_eq!(*ctx.lock_by(), Some(worker.id().clone())); + assert!(ctx.lock_at().is_some()); + assert_eq!(*ctx.last_error(), Some("{\"Ok\":\"success\"}".to_owned())); + assert_eq!(job.parts.attempt.current(), 2); + } + + #[tokio::test] + async fn test_heartbeat_renqueueorphaned_pulse_last_seen_4min() { + let mut storage = setup().await; + + push_email(&mut storage, email_service::example_good_email()).await; + + let six_minutes_ago = Utc::now() - Duration::from_secs(6 * 60); + let four_minutes_ago = Utc::now() - Duration::from_secs(4 * 60); + let worker = register_worker_at(&mut storage, four_minutes_ago.timestamp()).await; + + let job = consume_one(&mut storage, &worker).await; + let job_id = job.parts.task_id; + storage + .reenqueue_orphaned(1, six_minutes_ago) + .await + .expect("failed to heartbeat"); + + let job = get_job(&mut storage, &job_id).await; + let ctx = &job.parts.context; + + // Simulate worker + job.parts.attempt.increment(); + storage + .ack( + ctx, + &Response::new(Ok("success".to_owned()), job_id.clone(), job.parts.attempt), + ) + .await + .unwrap(); + //end simulate worker + + let job = get_job(&mut storage, &job_id).await; + let ctx = &job.parts.context; + assert_eq!(*ctx.status(), State::Done); + assert_eq!(*ctx.lock_by(), Some(worker.id().clone())); + assert!(ctx.lock_at().is_some()); + assert_eq!(*ctx.last_error(), Some("{\"Ok\":\"success\"}".to_owned())); + assert_eq!(job.parts.attempt.current(), 1); + } }