diff --git a/Server/service/jobQueue.js b/Server/service/jobQueue.js index 177ca34c5..5a9ab335c 100644 --- a/Server/service/jobQueue.js +++ b/Server/service/jobQueue.js @@ -8,6 +8,7 @@ const QUEUE_LOOKUP = { docker: "uptime", pagespeed: "pagespeed", }; +const getSchedulerId = (monitor) => `scheduler:${monitor.type}:${monitor._id}`; import { successMessages, errorMessages } from "../utils/messages.js"; class NewJobQueue { @@ -440,27 +441,27 @@ class NewJobQueue { throw new Error(`Queue for ${monitor.type} not found`); } - // build job options - const jobOptions = { - attempts: 1, - backoff: { - type: "exponential", - delay: 1000, + const jobTemplate = { + name: jobName, + data: monitor, + opts: { + attempts: 1, + backoff: { + type: "exponential", + delay: 1000, + }, + removeOnComplete: true, + removeOnFail: false, + timeout: 1 * 60 * 1000, }, - removeOnComplete: true, - removeOnFail: false, - timeout: 1 * 60 * 1000, }; - // Execute job immediately - await queue.add(jobName, monitor, jobOptions); - await queue.add(jobName, monitor, { - ...jobOptions, - repeat: { - every: monitor?.interval ?? 60000, - immediately: false, - }, - }); + const schedulerId = getSchedulerId(monitor); + await queue.upsertJobScheduler( + schedulerId, + { every: monitor?.interval ?? 60000 }, + jobTemplate + ); const workerStats = await this.getWorkerStats(queue); await this.scaleWorkers(workerStats, queue); @@ -490,9 +491,9 @@ class NewJobQueue { async deleteJob(monitor) { try { const queue = this.queues[QUEUE_LOOKUP[monitor.type]]; - const wasDeleted = await queue.removeRepeatable(monitor._id, { - every: monitor.interval, - }); + const schedulerId = getSchedulerId(monitor); + const wasDeleted = await queue.removeJobScheduler(schedulerId); + if (wasDeleted === true) { this.logger.info({ message: successMessages.JOB_QUEUE_DELETE_JOB,