mirror of
https://github.com/trailbaseio/trailbase.git
synced 2026-01-09 03:10:17 -06:00
Enable apalis' generic and default storage backend tests.
This commit is contained in:
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -6778,13 +6778,14 @@ dependencies = [
|
||||
"async-std",
|
||||
"async-stream",
|
||||
"chrono",
|
||||
"email_address",
|
||||
"futures",
|
||||
"futures-lite",
|
||||
"log",
|
||||
"once_cell",
|
||||
"rusqlite",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_rusqlite",
|
||||
"thiserror 2.0.12",
|
||||
"tokio",
|
||||
"trailbase-apalis",
|
||||
|
||||
@@ -21,9 +21,9 @@ chrono = { version = "0.4", features = ["serde"] }
|
||||
futures = "0.3.30"
|
||||
futures-lite = "2.3.0"
|
||||
log = "0.4.21"
|
||||
rusqlite.workspace = true
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
serde_rusqlite = "0.38.0"
|
||||
thiserror = "2.0.0"
|
||||
tokio = { workspace = true }
|
||||
trailbase-refinery-core = { workspace = true }
|
||||
@@ -33,6 +33,7 @@ trailbase-sqlite = { workspace = true }
|
||||
[dev-dependencies]
|
||||
apalis = { version = "0.7.0" }
|
||||
apalis-core = { version = "0.7.0", default-features = false, features = [ "test-utils" ] }
|
||||
email_address = "0.2.9"
|
||||
once_cell = "1.19.0"
|
||||
tokio = { workspace = true }
|
||||
trailbase-apalis = { path = "." }
|
||||
|
||||
62
trailbase-apalis/src/from_row.rs
Normal file
62
trailbase-apalis/src/from_row.rs
Normal file
@@ -0,0 +1,62 @@
|
||||
use apalis_core::request::Parts;
|
||||
use apalis_core::task::attempt::Attempt;
|
||||
use apalis_core::task::task_id::TaskId;
|
||||
use apalis_core::{request::Request, worker::WorkerId};
|
||||
|
||||
use crate::context::SqlContext;
|
||||
|
||||
pub(crate) type SqlRequest<T> = Request<T, SqlContext>;
|
||||
|
||||
pub(crate) fn from_row(row: &rusqlite::Row) -> Result<SqlRequest<String>, trailbase_sqlite::Error> {
|
||||
use chrono::DateTime;
|
||||
use std::str::FromStr;
|
||||
|
||||
let job: String = row.get("job")?;
|
||||
let task_id: TaskId = TaskId::from_str(&row.get::<_, String>("id")?)
|
||||
.map_err(|e| trailbase_sqlite::Error::Other(e.into()))?;
|
||||
let mut parts = Parts::<SqlContext>::default();
|
||||
parts.task_id = task_id;
|
||||
|
||||
let attempt: i32 = row.get("attempts").unwrap_or(0);
|
||||
parts.attempt = Attempt::new_with_value(attempt as usize);
|
||||
|
||||
let mut context = crate::context::SqlContext::new();
|
||||
|
||||
let run_at: i64 = row.get("run_at")?;
|
||||
context.set_run_at(DateTime::from_timestamp(run_at, 0).unwrap_or_default());
|
||||
|
||||
if let Ok(max_attempts) = row.get("max_attempts") {
|
||||
context.set_max_attempts(max_attempts)
|
||||
}
|
||||
|
||||
let done_at: Option<i64> = row.get("done_at").unwrap_or_default();
|
||||
context.set_done_at(done_at);
|
||||
|
||||
let lock_at: Option<i64> = row.get("lock_at").unwrap_or_default();
|
||||
context.set_lock_at(lock_at);
|
||||
|
||||
let last_error = row.get("last_error").unwrap_or_default();
|
||||
context.set_last_error(last_error);
|
||||
|
||||
let status: String = row.get("status")?;
|
||||
context.set_status(
|
||||
status
|
||||
.parse()
|
||||
.map_err(|_| trailbase_sqlite::Error::Other("parse failed".into()))?,
|
||||
);
|
||||
|
||||
let lock_by: Option<String> = row.get("lock_by").unwrap_or_default();
|
||||
context.set_lock_by(
|
||||
lock_by
|
||||
.as_deref()
|
||||
.map(WorkerId::from_str)
|
||||
.transpose()
|
||||
.map_err(|_| trailbase_sqlite::Error::Other("transpose failed".into()))?,
|
||||
);
|
||||
|
||||
let priority: i32 = row.get("priority").unwrap_or_default();
|
||||
context.set_priority(priority);
|
||||
|
||||
parts.context = context;
|
||||
Ok(SqlRequest { args: job, parts })
|
||||
}
|
||||
@@ -13,6 +13,8 @@ use apalis_core::{error::Error, request::State, response::Response};
|
||||
|
||||
/// The context of the sql job
|
||||
pub mod context;
|
||||
/// Util for fetching rows
|
||||
pub mod from_row;
|
||||
|
||||
/// Sqlite Storage for apalis.
|
||||
/// Uses a transaction and min(rowid)
|
||||
@@ -199,18 +201,15 @@ macro_rules! sql_storage_tests {
|
||||
.unwrap();
|
||||
|
||||
let (job_id, res) = storage.execute_next().await.unwrap();
|
||||
|
||||
assert_eq!(res, Err("AbortError: Invalid character.".to_owned()));
|
||||
apalis_core::sleep(Duration::from_secs(1)).await;
|
||||
let job = storage
|
||||
.fetch_by_id(&job_id)
|
||||
.await
|
||||
.unwrap()
|
||||
.expect("No job found");
|
||||
let job = storage.fetch_by_id(&job_id).await.unwrap().unwrap();
|
||||
let ctx = job.parts.context;
|
||||
assert_eq!(*ctx.status(), State::Killed);
|
||||
// assert!(ctx.done_at().is_some());
|
||||
assert_eq!(
|
||||
ctx.last_error().clone().unwrap(),
|
||||
ctx.last_error().clone().expect("error"),
|
||||
"{\"Err\":\"AbortError: Invalid character.\"}"
|
||||
);
|
||||
}
|
||||
@@ -221,12 +220,16 @@ macro_rules! sql_storage_tests {
|
||||
storage
|
||||
.push(email_service::example_good_email())
|
||||
.await
|
||||
.unwrap();
|
||||
.expect("email");
|
||||
|
||||
let (job_id, res) = storage.execute_next().await.unwrap();
|
||||
let (job_id, res) = storage.execute_next().await.expect("exec next");
|
||||
assert_eq!(res, Ok("()".to_owned()));
|
||||
apalis_core::sleep(Duration::from_secs(1)).await;
|
||||
let job = storage.fetch_by_id(&job_id).await.unwrap().unwrap();
|
||||
let job = storage
|
||||
.fetch_by_id(&job_id)
|
||||
.await
|
||||
.expect("result")
|
||||
.expect("job");
|
||||
let ctx = job.parts.context;
|
||||
assert_eq!(*ctx.status(), State::Done);
|
||||
assert!(ctx.done_at().is_some());
|
||||
|
||||
@@ -28,7 +28,7 @@ use std::{marker::PhantomData, time::Duration};
|
||||
|
||||
pub use trailbase_sqlite::Connection;
|
||||
|
||||
type SqlRequest<T> = Request<T, crate::context::SqlContext>;
|
||||
use crate::from_row::{from_row, SqlRequest};
|
||||
|
||||
/// Represents a [Storage] that persists to Sqlite
|
||||
pub struct SqliteStorage<T, C = JsonCodec<String>> {
|
||||
@@ -70,10 +70,16 @@ mod main {
|
||||
impl SqliteStorage<()> {
|
||||
/// Perform migrations for storage
|
||||
pub async fn setup(conn: &Connection) -> Result<(), trailbase_sqlite::Error> {
|
||||
conn.execute("PRAGMA journal_mode = 'WAL'", ()).await?;
|
||||
conn.execute("PRAGMA temp_store = 2", ()).await?;
|
||||
conn.execute("PRAGMA synchronous = NORMAL", ()).await?;
|
||||
conn.execute("PRAGMA cache_size = 64000", ()).await?;
|
||||
conn
|
||||
.execute_batch(
|
||||
r#"
|
||||
PRAGMA journal_mode = 'WAL';
|
||||
PRAGMA temp_store = 2;
|
||||
PRAGMA synchronous = NORMAL;
|
||||
PRAGMA cache_size = 64000;
|
||||
"#,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let runner = main::migrations::runner();
|
||||
|
||||
@@ -169,7 +175,9 @@ async fn fetch_next(
|
||||
config: &Config,
|
||||
) -> Result<Option<SqlRequest<String>>, trailbase_sqlite::Error> {
|
||||
let now: i64 = Utc::now().timestamp();
|
||||
const UPDATE_QUERY : &str = "UPDATE Jobs SET status = 'Running', lock_by = ?2, lock_at = ?3 WHERE id = ?1 AND job_type = ?4 AND status = 'Pending' AND lock_by IS NULL; Select * from Jobs where id = ?1 AND lock_by = ?2 AND job_type = ?4";
|
||||
const UPDATE_QUERY: &str = r#"
|
||||
UPDATE Jobs SET status = 'Running', lock_by = ?2, lock_at = ?3 WHERE id = ?1 AND job_type = ?4 AND status = 'Pending' AND lock_by IS NULL RETURNING *
|
||||
"#;
|
||||
|
||||
let job: Option<SqlRequest<String>> = conn
|
||||
.query_row_f(
|
||||
@@ -180,11 +188,27 @@ async fn fetch_next(
|
||||
now,
|
||||
config.namespace.clone()
|
||||
),
|
||||
serde_rusqlite::from_row,
|
||||
from_row,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(job)
|
||||
if job.is_none() {
|
||||
let job: Option<SqlRequest<String>> = conn
|
||||
.query_row_f(
|
||||
"SELECT * FROM Jobs WHERE id = ?1 AND lock_by = ?2 AND job_type = ?3",
|
||||
trailbase_sqlite::params!(
|
||||
id.to_string(),
|
||||
worker_id.to_string(),
|
||||
config.namespace.clone()
|
||||
),
|
||||
from_row,
|
||||
)
|
||||
.await?;
|
||||
|
||||
return Ok(job);
|
||||
}
|
||||
|
||||
return Ok(job);
|
||||
}
|
||||
|
||||
impl<T, C> SqliteStorage<T, C>
|
||||
@@ -347,7 +371,7 @@ where
|
||||
|
||||
let res: Option<SqlRequest<String>> = self
|
||||
.conn
|
||||
.query_row_f(FETCH_QUERY, (job_id.to_string(),), serde_rusqlite::from_row)
|
||||
.query_row_f(FETCH_QUERY, (job_id.to_string(),), from_row)
|
||||
.await?;
|
||||
|
||||
match res {
|
||||
@@ -364,7 +388,7 @@ where
|
||||
}
|
||||
|
||||
async fn len(&mut self) -> Result<i64, Self::Error> {
|
||||
const QUERY : &str = "Select Count(*) as count from Jobs WHERE (status = 'Pending' OR (status = 'Failed' AND attempts < max_attempts))";
|
||||
const QUERY : &str = "SELECT COUNT(*) AS count FROM Jobs WHERE (status = 'Pending' OR (status = 'Failed' AND attempts < max_attempts))";
|
||||
|
||||
let count: Option<i64> = self
|
||||
.conn
|
||||
@@ -565,6 +589,7 @@ where
|
||||
}
|
||||
}
|
||||
.boxed();
|
||||
|
||||
let w = worker.clone();
|
||||
let reenqueue_beat = async move {
|
||||
loop {
|
||||
@@ -728,22 +753,77 @@ impl<J: 'static + Serialize + DeserializeOwned + Unpin + Send + Sync> BackendExp
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
// use crate::sql_storage_tests;
|
||||
use crate::sql_storage_tests;
|
||||
|
||||
// use apalis_core::request::State;
|
||||
// use chrono::Utc;
|
||||
// use email_service::example_good_email;
|
||||
// use email_service::Email;
|
||||
// use futures::StreamExt;
|
||||
use apalis_core::generic_storage_test;
|
||||
use apalis_core::request::State;
|
||||
use apalis_core::test_utils::apalis_test_service_fn;
|
||||
use apalis_core::test_utils::TestWrapper;
|
||||
use futures::StreamExt;
|
||||
use std::io;
|
||||
|
||||
// use apalis_core::generic_storage_test;
|
||||
// use apalis_core::test_utils::apalis_test_service_fn;
|
||||
// use apalis_core::test_utils::TestWrapper;
|
||||
mod email_service {
|
||||
use apalis_core::error::Error;
|
||||
use email_address::EmailAddress;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
|
||||
// sql_storage_tests!(setup::<Email>, SqliteStorage<Email>, Email);
|
||||
#[derive(Debug, Deserialize, Serialize, Clone)]
|
||||
pub(super) struct Email {
|
||||
pub to: String,
|
||||
pub subject: String,
|
||||
pub text: String,
|
||||
}
|
||||
|
||||
pub(super) async fn send_email(job: Email) -> Result<(), Error> {
|
||||
let validation = EmailAddress::from_str(&job.to);
|
||||
match validation {
|
||||
Ok(email) => {
|
||||
log::info!("Attempting to send email to {}", email.as_str());
|
||||
Ok(())
|
||||
}
|
||||
Err(email_address::Error::InvalidCharacter) => {
|
||||
log::error!("Killed send email job. Invalid character {}", job.to);
|
||||
Err(Error::Abort(Arc::new(Box::new(
|
||||
email_address::Error::InvalidCharacter,
|
||||
))))
|
||||
}
|
||||
Err(e) => Err(Error::Failed(Arc::new(Box::new(e)))),
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn example_good_email() -> Email {
|
||||
Email {
|
||||
subject: "Test Subject".to_string(),
|
||||
to: "example@gmail.com".to_string(),
|
||||
text: "Some Text".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn example_killed_email() -> Email {
|
||||
Email {
|
||||
subject: "Test Subject".to_string(),
|
||||
to: "example@©.com".to_string(), // killed because it has © which is invalid
|
||||
text: "Some Text".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn example_retry_able_email() -> Email {
|
||||
Email {
|
||||
subject: "Test Subject".to_string(),
|
||||
to: "example".to_string(),
|
||||
text: "Some Text".to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
use email_service::Email;
|
||||
|
||||
generic_storage_test!(setup);
|
||||
sql_storage_tests!(setup::<Email>, SqliteStorage<Email>, Email);
|
||||
|
||||
/// migrate DB and return a storage instance.
|
||||
#[allow(unused)]
|
||||
async fn setup<T: Serialize + DeserializeOwned>() -> SqliteStorage<T> {
|
||||
// Because connections cannot be shared across async runtime
|
||||
// (different runtimes are created for each test),
|
||||
@@ -758,218 +838,225 @@ mod tests {
|
||||
storage
|
||||
}
|
||||
|
||||
// #[tokio::test]
|
||||
// async fn test_inmemory_sqlite_worker() {
|
||||
// let mut sqlite = setup().await;
|
||||
// sqlite
|
||||
// .push(Email {
|
||||
// subject: "Test Subject".to_string(),
|
||||
// to: "example@sqlite".to_string(),
|
||||
// text: "Some Text".to_string(),
|
||||
// })
|
||||
// .await
|
||||
// .expect("Unable to push job");
|
||||
// let len = sqlite.len().await.expect("Could not fetch the jobs count");
|
||||
// assert_eq!(len, 1);
|
||||
// }
|
||||
//
|
||||
// async fn consume_one(
|
||||
// storage: &mut SqliteStorage<Email>,
|
||||
// worker: &Worker<Context>,
|
||||
// ) -> Request<Email, SqlContext> {
|
||||
// let mut stream = storage
|
||||
// .stream_jobs(worker, std::time::Duration::from_secs(10), 1)
|
||||
// .boxed();
|
||||
// stream
|
||||
// .next()
|
||||
// .await
|
||||
// .expect("stream is empty")
|
||||
// .expect("failed to poll job")
|
||||
// .expect("no job is pending")
|
||||
// }
|
||||
//
|
||||
// async fn register_worker_at(
|
||||
// storage: &mut SqliteStorage<Email>,
|
||||
// last_seen: i64,
|
||||
// ) -> Worker<Context> {
|
||||
// let worker_id = WorkerId::new("test-worker");
|
||||
//
|
||||
// let worker = Worker::new(worker_id, Default::default());
|
||||
// storage
|
||||
// .keep_alive_at(&worker, last_seen)
|
||||
// .await
|
||||
// .expect("failed to register worker");
|
||||
//
|
||||
// worker.start();
|
||||
// worker
|
||||
// }
|
||||
#[tokio::test]
|
||||
async fn test_inmemory_sqlite_worker() {
|
||||
let mut sqlite = setup().await;
|
||||
sqlite
|
||||
.push(Email {
|
||||
subject: "Test Subject".to_string(),
|
||||
to: "example@sqlite".to_string(),
|
||||
text: "Some Text".to_string(),
|
||||
})
|
||||
.await
|
||||
.expect("Unable to push job");
|
||||
let len = sqlite.len().await.expect("Could not fetch the jobs count");
|
||||
assert_eq!(len, 1);
|
||||
}
|
||||
|
||||
// async fn register_worker(storage: &mut SqliteStorage<Email>) -> Worker<Context> {
|
||||
// register_worker_at(storage, Utc::now().timestamp()).await
|
||||
// }
|
||||
//
|
||||
// async fn push_email(storage: &mut SqliteStorage<Email>, email: Email) {
|
||||
// storage.push(email).await.expect("failed to push a job");
|
||||
// }
|
||||
//
|
||||
// async fn get_job(
|
||||
// storage: &mut SqliteStorage<Email>,
|
||||
// job_id: &TaskId,
|
||||
// ) -> Request<Email, SqlContext> {
|
||||
// storage
|
||||
// .fetch_by_id(job_id)
|
||||
// .await
|
||||
// .expect("failed to fetch job by id")
|
||||
// .expect("no job found by id")
|
||||
// }
|
||||
//
|
||||
// #[tokio::test]
|
||||
// async fn test_consume_last_pushed_job() {
|
||||
// let mut storage = setup().await;
|
||||
// let worker = register_worker(&mut storage).await;
|
||||
//
|
||||
// push_email(&mut storage, example_good_email()).await;
|
||||
// let len = storage.len().await.expect("Could not fetch the jobs count");
|
||||
// assert_eq!(len, 1);
|
||||
//
|
||||
// let job = consume_one(&mut storage, &worker).await;
|
||||
// let ctx = job.parts.context;
|
||||
// assert_eq!(*ctx.status(), State::Running);
|
||||
// assert_eq!(*ctx.lock_by(), Some(worker.id().clone()));
|
||||
// assert!(ctx.lock_at().is_some());
|
||||
// }
|
||||
//
|
||||
// #[tokio::test]
|
||||
// async fn test_acknowledge_job() {
|
||||
// let mut storage = setup().await;
|
||||
// let worker = register_worker(&mut storage).await;
|
||||
//
|
||||
// push_email(&mut storage, example_good_email()).await;
|
||||
// let job = consume_one(&mut storage, &worker).await;
|
||||
// let job_id = &job.parts.task_id;
|
||||
// let ctx = &job.parts.context;
|
||||
// let res = 1usize;
|
||||
// storage
|
||||
// .ack(
|
||||
// ctx,
|
||||
// &Response::success(res, job_id.clone(), job.parts.attempt.clone()),
|
||||
// )
|
||||
// .await
|
||||
// .expect("failed to acknowledge the job");
|
||||
//
|
||||
// let job = get_job(&mut storage, job_id).await;
|
||||
// let ctx = job.parts.context;
|
||||
// assert_eq!(*ctx.status(), State::Done);
|
||||
// assert!(ctx.done_at().is_some());
|
||||
// }
|
||||
//
|
||||
// #[tokio::test]
|
||||
// async fn test_kill_job() {
|
||||
// let mut storage = setup().await;
|
||||
//
|
||||
// push_email(&mut storage, example_good_email()).await;
|
||||
//
|
||||
// let worker = register_worker(&mut storage).await;
|
||||
//
|
||||
// let job = consume_one(&mut storage, &worker).await;
|
||||
// let job_id = &job.parts.task_id;
|
||||
//
|
||||
// storage
|
||||
// .kill(&worker.id(), job_id)
|
||||
// .await
|
||||
// .expect("failed to kill job");
|
||||
//
|
||||
// let job = get_job(&mut storage, job_id).await;
|
||||
// let ctx = job.parts.context;
|
||||
// assert_eq!(*ctx.status(), State::Killed);
|
||||
// assert!(ctx.done_at().is_some());
|
||||
// }
|
||||
//
|
||||
// #[tokio::test]
|
||||
// async fn test_heartbeat_renqueueorphaned_pulse_last_seen_6min() {
|
||||
// let mut storage = setup().await;
|
||||
//
|
||||
// push_email(&mut storage, example_good_email()).await;
|
||||
//
|
||||
// let six_minutes_ago = Utc::now() - Duration::from_secs(6 * 60);
|
||||
//
|
||||
// let five_minutes_ago = Utc::now() - Duration::from_secs(5 * 60);
|
||||
// let worker = register_worker_at(&mut storage, six_minutes_ago.timestamp()).await;
|
||||
//
|
||||
// let job = consume_one(&mut storage, &worker).await;
|
||||
// let job_id = &job.parts.task_id;
|
||||
// storage
|
||||
// .reenqueue_orphaned(1, five_minutes_ago)
|
||||
// .await
|
||||
// .expect("failed to heartbeat");
|
||||
// let job = get_job(&mut storage, job_id).await;
|
||||
// let ctx = &job.parts.context;
|
||||
// assert_eq!(*ctx.status(), State::Pending);
|
||||
// assert!(ctx.done_at().is_none());
|
||||
// assert!(ctx.lock_by().is_none());
|
||||
// assert!(ctx.lock_at().is_none());
|
||||
// assert_eq!(*ctx.last_error(), Some("Job was abandoned".to_owned()));
|
||||
// assert_eq!(job.parts.attempt.current(), 1);
|
||||
//
|
||||
// let job = consume_one(&mut storage, &worker).await;
|
||||
// let ctx = &job.parts.context;
|
||||
// // Simulate worker
|
||||
// job.parts.attempt.increment();
|
||||
// storage
|
||||
// .ack(
|
||||
// ctx,
|
||||
// &Response::new(Ok("success".to_owned()), job_id.clone(), job.parts.attempt),
|
||||
// )
|
||||
// .await
|
||||
// .unwrap();
|
||||
// //end simulate worker
|
||||
//
|
||||
// let job = get_job(&mut storage, &job_id).await;
|
||||
// let ctx = &job.parts.context;
|
||||
// assert_eq!(*ctx.status(), State::Done);
|
||||
// assert_eq!(*ctx.lock_by(), Some(worker.id().clone()));
|
||||
// assert!(ctx.lock_at().is_some());
|
||||
// assert_eq!(*ctx.last_error(), Some("{\"Ok\":\"success\"}".to_owned()));
|
||||
// assert_eq!(job.parts.attempt.current(), 2);
|
||||
// }
|
||||
//
|
||||
// #[tokio::test]
|
||||
// async fn test_heartbeat_renqueueorphaned_pulse_last_seen_4min() {
|
||||
// let mut storage = setup().await;
|
||||
//
|
||||
// push_email(&mut storage, example_good_email()).await;
|
||||
//
|
||||
// let six_minutes_ago = Utc::now() - Duration::from_secs(6 * 60);
|
||||
// let four_minutes_ago = Utc::now() - Duration::from_secs(4 * 60);
|
||||
// let worker = register_worker_at(&mut storage, four_minutes_ago.timestamp()).await;
|
||||
//
|
||||
// let job = consume_one(&mut storage, &worker).await;
|
||||
// let job_id = job.parts.task_id;
|
||||
// storage
|
||||
// .reenqueue_orphaned(1, six_minutes_ago)
|
||||
// .await
|
||||
// .expect("failed to heartbeat");
|
||||
//
|
||||
// let job = get_job(&mut storage, &job_id).await;
|
||||
// let ctx = &job.parts.context;
|
||||
//
|
||||
// // Simulate worker
|
||||
// job.parts.attempt.increment();
|
||||
// storage
|
||||
// .ack(
|
||||
// ctx,
|
||||
// &Response::new(Ok("success".to_owned()), job_id.clone(), job.parts.attempt),
|
||||
// )
|
||||
// .await
|
||||
// .unwrap();
|
||||
// //end simulate worker
|
||||
//
|
||||
// let job = get_job(&mut storage, &job_id).await;
|
||||
// let ctx = &job.parts.context;
|
||||
// assert_eq!(*ctx.status(), State::Done);
|
||||
// assert_eq!(*ctx.lock_by(), Some(worker.id().clone()));
|
||||
// assert!(ctx.lock_at().is_some());
|
||||
// assert_eq!(*ctx.last_error(), Some("{\"Ok\":\"success\"}".to_owned()));
|
||||
// assert_eq!(job.parts.attempt.current(), 1);
|
||||
// }
|
||||
async fn consume_one(
|
||||
storage: &mut SqliteStorage<Email>,
|
||||
worker: &Worker<Context>,
|
||||
) -> Request<Email, SqlContext> {
|
||||
let mut stream = storage
|
||||
.stream_jobs(worker, std::time::Duration::from_secs(10), 1)
|
||||
.boxed();
|
||||
|
||||
return stream
|
||||
.next()
|
||||
.await
|
||||
.expect("stream is empty")
|
||||
.expect("failed to poll job")
|
||||
.expect("no job is pending");
|
||||
}
|
||||
|
||||
async fn register_worker_at(
|
||||
storage: &mut SqliteStorage<Email>,
|
||||
last_seen: i64,
|
||||
) -> Worker<Context> {
|
||||
let worker_id = WorkerId::new("test-worker");
|
||||
|
||||
let worker = Worker::new(worker_id, Default::default());
|
||||
storage
|
||||
.keep_alive_at(&worker, last_seen)
|
||||
.await
|
||||
.expect("failed to register worker");
|
||||
|
||||
worker.start();
|
||||
worker
|
||||
}
|
||||
|
||||
async fn register_worker(storage: &mut SqliteStorage<Email>) -> Worker<Context> {
|
||||
register_worker_at(storage, Utc::now().timestamp()).await
|
||||
}
|
||||
|
||||
async fn push_email(storage: &mut SqliteStorage<Email>, email: Email) {
|
||||
storage.push(email).await.expect("failed to push a job");
|
||||
}
|
||||
|
||||
async fn get_job(
|
||||
storage: &mut SqliteStorage<Email>,
|
||||
job_id: &TaskId,
|
||||
) -> Request<Email, SqlContext> {
|
||||
storage
|
||||
.fetch_by_id(job_id)
|
||||
.await
|
||||
.expect("failed to fetch job by id")
|
||||
.expect("no job found by id")
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_consume_last_pushed_job() {
|
||||
let mut storage = setup().await;
|
||||
let worker = register_worker(&mut storage).await;
|
||||
|
||||
push_email(&mut storage, email_service::example_good_email()).await;
|
||||
let len = storage.len().await.expect("Could not fetch the jobs count");
|
||||
assert_eq!(len, 1);
|
||||
|
||||
let job = consume_one(&mut storage, &worker).await;
|
||||
let ctx = job.parts.context;
|
||||
assert_eq!(*ctx.status(), State::Running);
|
||||
assert_eq!(*ctx.lock_by(), Some(worker.id().clone()));
|
||||
assert!(ctx.lock_at().is_some());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_acknowledge_job() {
|
||||
let mut storage = setup().await;
|
||||
let worker = register_worker(&mut storage).await;
|
||||
|
||||
push_email(&mut storage, email_service::example_good_email()).await;
|
||||
let job = consume_one(&mut storage, &worker).await;
|
||||
let job_id = &job.parts.task_id;
|
||||
let ctx = &job.parts.context;
|
||||
let res = 1usize;
|
||||
storage
|
||||
.ack(
|
||||
ctx,
|
||||
&Response::success(res, job_id.clone(), job.parts.attempt.clone()),
|
||||
)
|
||||
.await
|
||||
.expect("failed to acknowledge the job");
|
||||
|
||||
let job = get_job(&mut storage, job_id).await;
|
||||
let ctx = job.parts.context;
|
||||
assert_eq!(*ctx.status(), State::Done);
|
||||
assert!(ctx.done_at().is_some());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_kill_job() {
|
||||
let mut storage = setup().await;
|
||||
|
||||
push_email(&mut storage, email_service::example_good_email()).await;
|
||||
|
||||
let _rows = storage
|
||||
.conn()
|
||||
.read_query_rows("SELECT * FROM Jobs;", ())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let worker = register_worker(&mut storage).await;
|
||||
|
||||
let job = consume_one(&mut storage, &worker).await;
|
||||
let job_id = &job.parts.task_id;
|
||||
|
||||
storage
|
||||
.kill(&worker.id(), job_id)
|
||||
.await
|
||||
.expect("failed to kill job");
|
||||
|
||||
let job = get_job(&mut storage, job_id).await;
|
||||
let ctx = job.parts.context;
|
||||
assert_eq!(*ctx.status(), State::Killed);
|
||||
assert!(ctx.done_at().is_some());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_heartbeat_renqueueorphaned_pulse_last_seen_6min() {
|
||||
let mut storage = setup().await;
|
||||
|
||||
push_email(&mut storage, email_service::example_good_email()).await;
|
||||
|
||||
let six_minutes_ago = Utc::now() - Duration::from_secs(6 * 60);
|
||||
|
||||
let five_minutes_ago = Utc::now() - Duration::from_secs(5 * 60);
|
||||
let worker = register_worker_at(&mut storage, six_minutes_ago.timestamp()).await;
|
||||
|
||||
let job = consume_one(&mut storage, &worker).await;
|
||||
let job_id = &job.parts.task_id;
|
||||
storage
|
||||
.reenqueue_orphaned(1, five_minutes_ago)
|
||||
.await
|
||||
.expect("failed to heartbeat");
|
||||
let job = get_job(&mut storage, job_id).await;
|
||||
let ctx = &job.parts.context;
|
||||
assert_eq!(*ctx.status(), State::Pending);
|
||||
assert!(ctx.done_at().is_none());
|
||||
assert!(ctx.lock_by().is_none());
|
||||
assert!(ctx.lock_at().is_none());
|
||||
assert_eq!(*ctx.last_error(), Some("Job was abandoned".to_owned()));
|
||||
assert_eq!(job.parts.attempt.current(), 1);
|
||||
|
||||
let job = consume_one(&mut storage, &worker).await;
|
||||
let ctx = &job.parts.context;
|
||||
// Simulate worker
|
||||
job.parts.attempt.increment();
|
||||
storage
|
||||
.ack(
|
||||
ctx,
|
||||
&Response::new(Ok("success".to_owned()), job_id.clone(), job.parts.attempt),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
//end simulate worker
|
||||
|
||||
let job = get_job(&mut storage, &job_id).await;
|
||||
let ctx = &job.parts.context;
|
||||
assert_eq!(*ctx.status(), State::Done);
|
||||
assert_eq!(*ctx.lock_by(), Some(worker.id().clone()));
|
||||
assert!(ctx.lock_at().is_some());
|
||||
assert_eq!(*ctx.last_error(), Some("{\"Ok\":\"success\"}".to_owned()));
|
||||
assert_eq!(job.parts.attempt.current(), 2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_heartbeat_renqueueorphaned_pulse_last_seen_4min() {
|
||||
let mut storage = setup().await;
|
||||
|
||||
push_email(&mut storage, email_service::example_good_email()).await;
|
||||
|
||||
let six_minutes_ago = Utc::now() - Duration::from_secs(6 * 60);
|
||||
let four_minutes_ago = Utc::now() - Duration::from_secs(4 * 60);
|
||||
let worker = register_worker_at(&mut storage, four_minutes_ago.timestamp()).await;
|
||||
|
||||
let job = consume_one(&mut storage, &worker).await;
|
||||
let job_id = job.parts.task_id;
|
||||
storage
|
||||
.reenqueue_orphaned(1, six_minutes_ago)
|
||||
.await
|
||||
.expect("failed to heartbeat");
|
||||
|
||||
let job = get_job(&mut storage, &job_id).await;
|
||||
let ctx = &job.parts.context;
|
||||
|
||||
// Simulate worker
|
||||
job.parts.attempt.increment();
|
||||
storage
|
||||
.ack(
|
||||
ctx,
|
||||
&Response::new(Ok("success".to_owned()), job_id.clone(), job.parts.attempt),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
//end simulate worker
|
||||
|
||||
let job = get_job(&mut storage, &job_id).await;
|
||||
let ctx = &job.parts.context;
|
||||
assert_eq!(*ctx.status(), State::Done);
|
||||
assert_eq!(*ctx.lock_by(), Some(worker.id().clone()));
|
||||
assert!(ctx.lock_at().is_some());
|
||||
assert_eq!(*ctx.last_error(), Some("{\"Ok\":\"success\"}".to_owned()));
|
||||
assert_eq!(job.parts.attempt.current(), 1);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user