From 120296763bf46e3723eeb1136059fe5be777fcbe Mon Sep 17 00:00:00 2001 From: Alex Holliday Date: Thu, 31 Oct 2024 16:00:29 +0800 Subject: [PATCH] Attempt to stop active jobs --- Server/service/jobQueue.js | 35 ++++++++++++++++++++++++++--------- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/Server/service/jobQueue.js b/Server/service/jobQueue.js index 6d46052f1..3793a256a 100644 --- a/Server/service/jobQueue.js +++ b/Server/service/jobQueue.js @@ -36,10 +36,10 @@ class JobQueue { host: redisHost, port: redisPort, }; - this.connection = connection; this.queue = new Queue(QUEUE_NAME, { connection, }); + this.connection = connection; this.workers = []; this.db = null; this.networkService = null; @@ -406,9 +406,6 @@ class JobQueue { delayed: await this.queue.getDelayedCount(), repeatableJobs: (await this.queue.getRepeatableJobs()).length, }; - this.logger.info({ - message: metrics, - }); return metrics; } catch (error) { this.logger.error({ @@ -424,17 +421,33 @@ class JobQueue { * @async * @returns {Promise} - Returns true if obliteration is successful */ + async obliterate() { try { - let metrics = await this.getMetrics(); - this.logger.info({ message: metrics }); + this.logger.info({ message: "Attempting to obliterate job queue..." }); await this.queue.pause(); const jobs = await this.getJobs(); + // Stop currently active jobs + const redisClient = await this.queue.client; + const activeJobs = await this.queue.getJobs(["active"]); + if (activeJobs.length !== 0) { + this.logger.info({ message: "Attempting to stop active jobs..." }); + } + await Promise.all( + activeJobs.map(async (job) => { + await redisClient.del(`${this.queue.toKey(job.id)}:lock`); + await job.remove(); + }) + ); + + // Remove all repeatable jobs for (const job of jobs) { await this.queue.removeRepeatableByKey(job.key); await this.queue.remove(job.id); } + + // Close workers await Promise.all( this.workers.map(async (worker) => { await worker.close(); @@ -442,9 +455,13 @@ class JobQueue { ); await this.queue.obliterate(); - metrics = await this.getMetrics(); - this.logger.info({ message: metrics }); - this.logger.info({ message: successMessages.JOB_QUEUE_OBLITERATE }); + const metrics = await this.getMetrics(); + this.logger.info({ + message: successMessages.JOB_QUEUE_OBLITERATE, + service: SERVICE_NAME, + method: "obliterate", + details: metrics, + }); return true; } catch (error) { error.service === undefined ? (error.service = SERVICE_NAME) : null;