mirror of
https://github.com/bluewave-labs/Checkmate.git
synced 2026-01-24 19:01:01 -06:00
add separate queue for each monitor type
This commit is contained in:
@@ -1,30 +1,22 @@
|
||||
const QUEUE_NAME = "monitors";
|
||||
const QUEUE_NAMES = ["uptime", "pagespeed", "hardware"];
|
||||
const SERVICE_NAME = "NewJobQueue";
|
||||
const JOBS_PER_WORKER = 5;
|
||||
const SERVICE_NAME = "JobQueue";
|
||||
const QUEUE_LOOKUP = {
|
||||
hardware: "hardware",
|
||||
http: "uptime",
|
||||
ping: "uptime",
|
||||
docker: "uptime",
|
||||
pagespeed: "pagespeed",
|
||||
};
|
||||
|
||||
import { errorMessages, successMessages } from "../utils/messages.js";
|
||||
/**
|
||||
* JobQueue
|
||||
*
|
||||
* This service is responsible for managing the job queue.
|
||||
* It handles enqueuing, dequeuing, and processing jobs.
|
||||
* It scales the number of workers based on the number of jobs/worker
|
||||
*/
|
||||
class JobQueue {
|
||||
import { successMessages, errorMessages } from "../utils/messages.js";
|
||||
class NewJobQueue {
|
||||
static SERVICE_NAME = SERVICE_NAME;
|
||||
/**
|
||||
* @class JobQueue
|
||||
* @classdesc Manages job queue and workers.
|
||||
*
|
||||
* @param {Object} statusService - Service for handling status updates.
|
||||
* @param {Object} notificationService - Service for handling notifications.
|
||||
* @param {Object} settingsService - Service for retrieving settings.
|
||||
* @param {Object} logger - Logger for logging information.
|
||||
* @param {Function} Queue - Queue constructor.
|
||||
* @param {Function} Worker - Worker constructor.
|
||||
*/
|
||||
|
||||
constructor(
|
||||
db,
|
||||
statusService,
|
||||
networkService,
|
||||
notificationService,
|
||||
settingsService,
|
||||
logger,
|
||||
@@ -32,102 +24,60 @@ class JobQueue {
|
||||
Worker
|
||||
) {
|
||||
const settings = settingsService.getSettings() || {};
|
||||
|
||||
const { redisHost = "127.0.0.1", redisPort = 6379 } = settings;
|
||||
const connection = {
|
||||
host: redisHost,
|
||||
port: redisPort,
|
||||
};
|
||||
this.queue = new Queue(QUEUE_NAME, {
|
||||
connection,
|
||||
});
|
||||
|
||||
this.queues = {};
|
||||
this.workers = {};
|
||||
|
||||
this.connection = connection;
|
||||
this.workers = [];
|
||||
this.db = null;
|
||||
this.networkService = null;
|
||||
this.db = db;
|
||||
this.networkService = networkService;
|
||||
this.statusService = statusService;
|
||||
this.notificationService = notificationService;
|
||||
this.settingsService = settingsService;
|
||||
this.logger = logger;
|
||||
this.Worker = Worker;
|
||||
|
||||
QUEUE_NAMES.forEach((name) => {
|
||||
this.queues[name] = new Queue(name, { connection });
|
||||
this.workers[name] = [];
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates and initializes a JobQueue instance.
|
||||
*
|
||||
* @param {Object} db - Database service for accessing monitors.
|
||||
* @param {Object} networkService - Service for network operations.
|
||||
* @param {Object} statusService - Service for handling status updates.
|
||||
* @param {Object} notificationService - Service for handling notifications.
|
||||
* @param {Object} settingsService - Service for retrieving settings.
|
||||
* @param {Object} logger - Logger for logging information.
|
||||
* @param {Function} Queue - Queue constructor.
|
||||
* @param {Function} Worker - Worker constructor.
|
||||
* @returns {Promise<JobQueue>} - The initialized JobQueue instance.
|
||||
* @throws {Error} - Throws an error if initialization fails.
|
||||
* Initializes job queues by adding jobs for all active monitors
|
||||
* @async
|
||||
* @function initJobQueue
|
||||
* @description Retrieves all monitors from the database and adds jobs for active ones to their respective queues
|
||||
* @throws {Error} If there's an error retrieving monitors or adding jobs
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
static async createJobQueue(
|
||||
db,
|
||||
networkService,
|
||||
statusService,
|
||||
notificationService,
|
||||
settingsService,
|
||||
logger,
|
||||
Queue,
|
||||
Worker
|
||||
) {
|
||||
const queue = new JobQueue(
|
||||
statusService,
|
||||
notificationService,
|
||||
settingsService,
|
||||
logger,
|
||||
Queue,
|
||||
Worker
|
||||
);
|
||||
try {
|
||||
queue.db = db;
|
||||
queue.networkService = networkService;
|
||||
const monitors = await db.getAllMonitors();
|
||||
for (const monitor of monitors) {
|
||||
if (monitor.isActive) {
|
||||
queue.addJob(monitor.id, monitor).catch((error) => {
|
||||
this.logger.error({
|
||||
message: error.message,
|
||||
service: SERVICE_NAME,
|
||||
method: "createJobQueue",
|
||||
stack: error.stack,
|
||||
});
|
||||
});
|
||||
}
|
||||
async initJobQueue() {
|
||||
const monitors = await this.db.getAllMonitors();
|
||||
for (const monitor of monitors) {
|
||||
if (monitor.isActive) {
|
||||
await this.addJob(monitor.type, monitor);
|
||||
}
|
||||
|
||||
queue
|
||||
.getWorkerStats()
|
||||
.then((workerStats) => {
|
||||
queue.scaleWorkers(workerStats);
|
||||
})
|
||||
.catch((error) => {
|
||||
this.logger.error({
|
||||
message: error.message,
|
||||
service: SERVICE_NAME,
|
||||
method: "createJobQueue",
|
||||
stack: error.stack,
|
||||
});
|
||||
});
|
||||
return queue;
|
||||
} catch (error) {
|
||||
error.service === undefined ? (error.service = SERVICE_NAME) : null;
|
||||
error.method === undefined ? (error.method = "createJobQueue") : null;
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the given monitor is in a maintenance window.
|
||||
*
|
||||
* @param {string} monitorId - The ID of the monitor to check.
|
||||
* @returns {Promise<boolean>} - Returns true if the monitor is in a maintenance window, otherwise false.
|
||||
* @throws {Error} - Throws an error if the database query fails.
|
||||
* Checks if a monitor is currently in a maintenance window
|
||||
* @async
|
||||
* @param {string} monitorId - The ID of the monitor to check
|
||||
* @returns {Promise<boolean>} Returns true if the monitor is in an active maintenance window, false otherwise
|
||||
* @throws {Error} If there's an error retrieving maintenance windows from the database
|
||||
* @description
|
||||
* Retrieves all maintenance windows for a monitor and checks if any are currently active.
|
||||
* A maintenance window is considered active if:
|
||||
* 1. The window is marked as active AND
|
||||
* 2. Either:
|
||||
* - Current time falls between start and end times
|
||||
* - For repeating windows: Current time falls between any repeated interval
|
||||
*/
|
||||
async isInMaintenanceWindow(monitorId) {
|
||||
const maintenanceWindows = await this.db.getMaintenanceWindowsByMonitorId(monitorId);
|
||||
@@ -160,9 +110,23 @@ class JobQueue {
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a job handler function for processing jobs.
|
||||
* Creates a job processing handler for monitor checks
|
||||
* @function createJobHandler
|
||||
* @returns {Function} An async function that processes monitor check jobs
|
||||
* @description
|
||||
* Creates and returns a job handler that:
|
||||
* 1. Checks if monitor is in maintenance window
|
||||
* 2. If not in maintenance, performs network status check
|
||||
* 3. Updates monitor status in database
|
||||
* 4. Triggers notifications if status changed
|
||||
*
|
||||
* @returns {Function} An async function that processes a job.
|
||||
* @param {Object} job - The job to process
|
||||
* @param {Object} job.data - The monitor data
|
||||
* @param {string} job.data._id - Monitor ID
|
||||
* @param {string} job.id - Job ID
|
||||
*
|
||||
* @throws {Error} Logs errors but doesn't throw them to prevent job failure
|
||||
* @returns {Promise<void>} Resolves when job processing is complete
|
||||
*/
|
||||
createJobHandler() {
|
||||
return async (job) => {
|
||||
@@ -206,24 +170,26 @@ class JobQueue {
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a worker for the queue
|
||||
* Operations are carried out in the async callback
|
||||
* @returns {Worker} The newly created worker
|
||||
* Creates a new worker for processing jobs in a queue
|
||||
* @param {Queue} queue - The BullMQ queue to create a worker for
|
||||
* @returns {Worker} A new BullMQ worker instance
|
||||
* @description
|
||||
* Creates and configures a new worker with:
|
||||
* - Queue-specific job handler
|
||||
* - Redis connection settings
|
||||
* - Default worker options
|
||||
* The worker processes jobs from the specified queue using the job handler
|
||||
* created by createJobHandler()
|
||||
*
|
||||
* @throws {Error} If worker creation fails or connection is invalid
|
||||
*/
|
||||
createWorker() {
|
||||
const worker = new this.Worker(QUEUE_NAME, this.createJobHandler(), {
|
||||
createWorker(queue) {
|
||||
const worker = new this.Worker(queue.name, this.createJobHandler(), {
|
||||
connection: this.connection,
|
||||
});
|
||||
return worker;
|
||||
}
|
||||
|
||||
/**
|
||||
* @typedef {Object} WorkerStats
|
||||
* @property {Array<Job>} jobs - Array of jobs in the Queue
|
||||
* @property {number} - workerLoad - The number of jobs per worker
|
||||
*
|
||||
*/
|
||||
|
||||
/**
|
||||
* Gets stats related to the workers
|
||||
* This is used for scaling workers right now
|
||||
@@ -233,10 +199,10 @@ class JobQueue {
|
||||
* @async
|
||||
* @returns {Promise<WorkerStats>} - Returns the worker stats
|
||||
*/
|
||||
async getWorkerStats() {
|
||||
async getWorkerStats(queue) {
|
||||
try {
|
||||
const jobs = await this.queue.getRepeatableJobs();
|
||||
const load = jobs.length / this.workers.length;
|
||||
const jobs = await queue.getRepeatableJobs();
|
||||
const load = jobs.length / this.workers[queue.name].length;
|
||||
return { jobs, load };
|
||||
} catch (error) {
|
||||
error.service === undefined ? (error.service = SERVICE_NAME) : null;
|
||||
@@ -246,46 +212,56 @@ class JobQueue {
|
||||
}
|
||||
|
||||
/**
|
||||
* Scale Workers
|
||||
* This function scales workers based on the load per worker
|
||||
* If the load is higher than the JOBS_PER_WORKER threshold, we add more workers
|
||||
* If the load is lower than the JOBS_PER_WORKER threshold, we release workers
|
||||
* This approach ignores server performance, which we should add in the future
|
||||
*
|
||||
* Scales workers up or down based on queue load
|
||||
* @async
|
||||
* @param {Object} workerStats - Statistics about current worker load
|
||||
* @param {number} workerStats.load - Current load per worker
|
||||
* @param {Array} workerStats.jobs - Array of current jobs
|
||||
* @param {Queue} queue - The BullMQ queue to scale workers for
|
||||
* @returns {Promise<boolean>} True if scaling occurred, false if no scaling was needed
|
||||
* @throws {Error} If no workers array exists for the queue
|
||||
* @description
|
||||
* Scales workers based on these rules:
|
||||
* - Maintains minimum of 5 workers
|
||||
* - Adds workers if load exceeds JOBS_PER_WORKER
|
||||
* - Removes workers if load is below JOBS_PER_WORKER
|
||||
* - Creates initial workers if none exist
|
||||
* Worker scaling is calculated based on excess jobs or excess capacity
|
||||
*/
|
||||
async scaleWorkers(workerStats, queue) {
|
||||
const workers = this.workers[queue.name];
|
||||
if (workers === undefined) {
|
||||
throw new Error(`No workers found for ${queue.name}`);
|
||||
}
|
||||
|
||||
* @async
|
||||
* @param {WorkerStats} workerStats - The payload for the job.
|
||||
* @returns {Promise<boolean>}
|
||||
*/
|
||||
async scaleWorkers(workerStats) {
|
||||
if (this.workers.length === 0) {
|
||||
if (workers.length === 0) {
|
||||
// There are no workers, need to add one
|
||||
for (let i = 0; i < 5; i++) {
|
||||
const worker = this.createWorker();
|
||||
this.workers.push(worker);
|
||||
const worker = this.createWorker(queue);
|
||||
workers.push(worker);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
if (workerStats.load > JOBS_PER_WORKER) {
|
||||
// Find out how many more jobs we have than current workers can handle
|
||||
const excessJobs = workerStats.jobs.length - this.workers.length * JOBS_PER_WORKER;
|
||||
const excessJobs = workerStats.jobs.length - workers.length * JOBS_PER_WORKER;
|
||||
// Divide by jobs/worker to find out how many workers to add
|
||||
const workersToAdd = Math.ceil(excessJobs / JOBS_PER_WORKER);
|
||||
for (let i = 0; i < workersToAdd; i++) {
|
||||
const worker = this.createWorker();
|
||||
this.workers.push(worker);
|
||||
const worker = this.createWorker(queue);
|
||||
workers.push(worker);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
if (workerStats.load < JOBS_PER_WORKER) {
|
||||
// Find out how much excess capacity we have
|
||||
const workerCapacity = this.workers.length * JOBS_PER_WORKER;
|
||||
const workerCapacity = workers.length * JOBS_PER_WORKER;
|
||||
const excessCapacity = workerCapacity - workerStats.jobs.length;
|
||||
// Calculate how many workers to remove
|
||||
let workersToRemove = Math.floor(excessCapacity / JOBS_PER_WORKER); // Make sure there are always at least 5
|
||||
while (workersToRemove > 0 && this.workers.length > 5) {
|
||||
const worker = this.workers.pop();
|
||||
while (workersToRemove > 0 && workers.length > 5) {
|
||||
const worker = workers.pop();
|
||||
workersToRemove--;
|
||||
await worker.close().catch((error) => {
|
||||
// Catch the error instead of throwing it
|
||||
@@ -309,9 +285,9 @@ class JobQueue {
|
||||
* @returns {Promise<Array<Job>>}
|
||||
* @throws {Error} - Throws error if getting jobs fails
|
||||
*/
|
||||
async getJobs() {
|
||||
async getJobs(queue) {
|
||||
try {
|
||||
const jobs = await this.queue.getRepeatableJobs();
|
||||
const jobs = await queue.getRepeatableJobs();
|
||||
return jobs;
|
||||
} catch (error) {
|
||||
error.service === undefined ? (error.service = SERVICE_NAME) : null;
|
||||
@@ -321,21 +297,32 @@ class JobQueue {
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the statistics of jobs and workers.
|
||||
*
|
||||
* @returns {Promise<Object>} - An object containing job statistics and the number of workers.
|
||||
* @throws {Error} - Throws an error if the job statistics retrieval fails.
|
||||
* Retrieves detailed statistics about jobs and workers for all queues
|
||||
* @async
|
||||
* @returns {Promise<Object>} Queue statistics object
|
||||
* @throws {Error} If there's an error retrieving job information
|
||||
* @description
|
||||
* Returns an object with statistics for each queue including:
|
||||
* - List of jobs with their URLs and current states
|
||||
* - Number of workers assigned to the queue
|
||||
*/
|
||||
async getJobStats() {
|
||||
try {
|
||||
const jobs = await this.queue.getJobs();
|
||||
const ret = await Promise.all(
|
||||
jobs.map(async (job) => {
|
||||
const state = await job.getState();
|
||||
return { url: job.data.url, state };
|
||||
let stats = {};
|
||||
await Promise.all(
|
||||
QUEUE_NAMES.map(async (name) => {
|
||||
const queue = this.queues[name];
|
||||
const jobs = await queue.getJobs();
|
||||
const ret = await Promise.all(
|
||||
jobs.map(async (job) => {
|
||||
const state = await job.getState();
|
||||
return { url: job.data.url, state };
|
||||
})
|
||||
);
|
||||
stats[name] = { jobs: ret, workers: this.workers[name].length };
|
||||
})
|
||||
);
|
||||
return { jobs: ret, workers: this.workers.length };
|
||||
return stats;
|
||||
} catch (error) {
|
||||
error.service === undefined ? (error.service = SERVICE_NAME) : null;
|
||||
error.method === undefined ? (error.method = "getJobStats") : null;
|
||||
@@ -344,25 +331,59 @@ class JobQueue {
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a job to the queue and scales workers based on worker stats.
|
||||
*
|
||||
* Adds both immediate and repeatable jobs to the appropriate queue
|
||||
* @async
|
||||
* @param {string} jobName - The name of the job to be added.
|
||||
* @param {Monitor} payload - The payload for the job.
|
||||
* @throws {Error} - Will throw an error if the job cannot be added or workers don't scale
|
||||
* @param {string} jobName - Name identifier for the job
|
||||
* @param {Object} payload - Job data and configuration
|
||||
* @param {string} payload.type - Type of monitor/queue ('uptime', 'pagespeed', 'hardware')
|
||||
* @param {string} [payload.url] - URL to monitor (optional)
|
||||
* @param {number} [payload.interval=60000] - Repeat interval in milliseconds
|
||||
* @param {string} payload._id - Monitor ID
|
||||
* @throws {Error} If queue not found for payload type
|
||||
* @throws {Error} If job addition fails
|
||||
* @description
|
||||
* 1. Identifies correct queue based on payload type
|
||||
* 2. Adds immediate job execution
|
||||
* 3. Adds repeatable job with specified interval
|
||||
* 4. Scales workers based on updated queue load
|
||||
* Jobs are configured with exponential backoff, single attempt,
|
||||
* and automatic removal on completion
|
||||
*/
|
||||
async addJob(jobName, payload) {
|
||||
try {
|
||||
this.logger.info({ message: `Adding job ${payload?.url ?? "No URL"}` });
|
||||
|
||||
// Find the correct queue
|
||||
|
||||
const queue = this.queues[QUEUE_LOOKUP[payload.type]];
|
||||
if (queue === undefined) {
|
||||
throw new Error(`Queue for ${payload.type} not found`);
|
||||
}
|
||||
|
||||
// build job options
|
||||
const jobOptions = {
|
||||
attempts: 1,
|
||||
backoff: {
|
||||
type: "exponential",
|
||||
delay: 1000,
|
||||
},
|
||||
removeOnComplete: true,
|
||||
removeOnFail: false,
|
||||
timeout: 1 * 60 * 1000,
|
||||
};
|
||||
|
||||
// Execute job immediately
|
||||
await this.queue.add(jobName, payload);
|
||||
await this.queue.add(jobName, payload, {
|
||||
await queue.add(jobName, payload, jobOptions);
|
||||
await queue.add(jobName, payload, {
|
||||
...jobOptions,
|
||||
repeat: {
|
||||
every: payload?.interval ?? 60000,
|
||||
immediately: false,
|
||||
},
|
||||
});
|
||||
const workerStats = await this.getWorkerStats();
|
||||
await this.scaleWorkers(workerStats);
|
||||
|
||||
const workerStats = await this.getWorkerStats(queue);
|
||||
await this.scaleWorkers(workerStats, queue);
|
||||
} catch (error) {
|
||||
error.service === undefined ? (error.service = SERVICE_NAME) : null;
|
||||
error.method === undefined ? (error.method = "addJob") : null;
|
||||
@@ -371,15 +392,25 @@ class JobQueue {
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes a job from the queue.
|
||||
*
|
||||
* Deletes a repeatable job from its queue and adjusts worker scaling
|
||||
* @async
|
||||
* @param {Monitor} monitor - The monitor to remove.
|
||||
* @throws {Error}
|
||||
* @param {Object} monitor - Monitor object containing job details
|
||||
* @param {string} monitor._id - ID of the monitor/job to delete
|
||||
* @param {string} monitor.type - Type of monitor determining queue selection
|
||||
* @param {number} monitor.interval - Job repeat interval in milliseconds
|
||||
* @throws {Error} If queue not found for monitor type
|
||||
* @throws {Error} If job deletion fails
|
||||
* @description
|
||||
* 1. Identifies correct queue based on monitor type
|
||||
* 2. Removes repeatable job using monitor ID and interval
|
||||
* 3. Logs success or failure of deletion
|
||||
* 4. Updates worker scaling based on new queue load
|
||||
* Returns void but logs operation result
|
||||
*/
|
||||
async deleteJob(monitor) {
|
||||
try {
|
||||
const wasDeleted = await this.queue.removeRepeatable(monitor._id, {
|
||||
const queue = this.queues[QUEUE_LOOKUP[monitor.type]];
|
||||
const wasDeleted = await queue.removeRepeatable(monitor._id, {
|
||||
every: monitor.interval,
|
||||
});
|
||||
if (wasDeleted === true) {
|
||||
@@ -389,8 +420,8 @@ class JobQueue {
|
||||
method: "deleteJob",
|
||||
details: `Deleted job ${monitor._id}`,
|
||||
});
|
||||
const workerStats = await this.getWorkerStats();
|
||||
await this.scaleWorkers(workerStats);
|
||||
const workerStats = await this.getWorkerStats(queue);
|
||||
await this.scaleWorkers(workerStats, queue);
|
||||
} else {
|
||||
this.logger.error({
|
||||
message: errorMessages.JOB_QUEUE_DELETE_JOB,
|
||||
@@ -407,21 +438,59 @@ class JobQueue {
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the metrics of the job queue.
|
||||
* Retrieves comprehensive metrics for all queues
|
||||
* @async
|
||||
* @returns {Promise<Object.<string, QueueMetrics>>} Object with metrics for each queue
|
||||
* @throws {Error} If metrics retrieval fails
|
||||
* @description
|
||||
* Collects the following metrics for each queue:
|
||||
* - Number of waiting jobs
|
||||
* - Number of active jobs
|
||||
* - Number of completed jobs
|
||||
* - Number of failed jobs
|
||||
* - Number of delayed jobs
|
||||
* - Number of repeatable jobs
|
||||
* - Number of active workers
|
||||
*
|
||||
* @returns {Promise<Object>} - An object containing various job queue metrics.
|
||||
* @throws {Error} - Throws an error if the metrics retrieval fails.
|
||||
* @typedef {Object} QueueMetrics
|
||||
* @property {number} waiting - Count of jobs waiting to be processed
|
||||
* @property {number} active - Count of jobs currently being processed
|
||||
* @property {number} completed - Count of successfully completed jobs
|
||||
* @property {number} failed - Count of failed jobs
|
||||
* @property {number} delayed - Count of delayed jobs
|
||||
* @property {number} repeatableJobs - Count of repeatable job patterns
|
||||
* @property {number} workers - Count of active workers for this queue
|
||||
*/
|
||||
async getMetrics() {
|
||||
try {
|
||||
const metrics = {
|
||||
waiting: await this.queue.getWaitingCount(),
|
||||
active: await this.queue.getActiveCount(),
|
||||
completed: await this.queue.getCompletedCount(),
|
||||
failed: await this.queue.getFailedCount(),
|
||||
delayed: await this.queue.getDelayedCount(),
|
||||
repeatableJobs: (await this.queue.getRepeatableJobs()).length,
|
||||
};
|
||||
let metrics = {};
|
||||
|
||||
await Promise.all(
|
||||
QUEUE_NAMES.map(async (name) => {
|
||||
const queue = this.queues[name];
|
||||
const workers = this.workers[name];
|
||||
const [waiting, active, completed, failed, delayed, repeatableJobs] =
|
||||
await Promise.all([
|
||||
queue.getWaitingCount(),
|
||||
queue.getActiveCount(),
|
||||
queue.getCompletedCount(),
|
||||
queue.getFailedCount(),
|
||||
queue.getDelayedCount(),
|
||||
queue.getRepeatableJobs(),
|
||||
]);
|
||||
|
||||
metrics[name] = {
|
||||
waiting,
|
||||
active,
|
||||
completed,
|
||||
failed,
|
||||
delayed,
|
||||
repeatableJobs: repeatableJobs.length,
|
||||
workers: workers.length,
|
||||
};
|
||||
})
|
||||
);
|
||||
|
||||
return metrics;
|
||||
} catch (error) {
|
||||
this.logger.error({
|
||||
@@ -441,23 +510,37 @@ class JobQueue {
|
||||
async obliterate() {
|
||||
try {
|
||||
this.logger.info({ message: "Attempting to obliterate job queue..." });
|
||||
await this.queue.pause();
|
||||
const jobs = await this.getJobs();
|
||||
|
||||
// Remove all repeatable jobs
|
||||
for (const job of jobs) {
|
||||
await this.queue.removeRepeatableByKey(job.key);
|
||||
await this.queue.remove(job.id);
|
||||
}
|
||||
|
||||
// Close workers
|
||||
await Promise.all(
|
||||
this.workers.map(async (worker) => {
|
||||
await worker.close();
|
||||
QUEUE_NAMES.map(async (name) => {
|
||||
const queue = this.queues[name];
|
||||
await queue.pause();
|
||||
const jobs = await this.getJobs(queue);
|
||||
|
||||
// Remove all repeatable jobs
|
||||
for (const job of jobs) {
|
||||
await queue.removeRepeatableByKey(job.key);
|
||||
await queue.remove(job.id);
|
||||
}
|
||||
})
|
||||
);
|
||||
|
||||
await this.queue.obliterate();
|
||||
// Close workers
|
||||
await Promise.all(
|
||||
QUEUE_NAMES.map(async (name) => {
|
||||
const workers = this.workers[name];
|
||||
await Promise.all(
|
||||
workers.map(async (worker) => {
|
||||
await worker.close();
|
||||
})
|
||||
);
|
||||
})
|
||||
);
|
||||
|
||||
QUEUE_NAMES.forEach(async (name) => {
|
||||
const queue = this.queues[name];
|
||||
await queue.obliterate();
|
||||
});
|
||||
|
||||
const metrics = await this.getMetrics();
|
||||
this.logger.info({
|
||||
message: successMessages.JOB_QUEUE_OBLITERATE,
|
||||
@@ -474,4 +557,4 @@ class JobQueue {
|
||||
}
|
||||
}
|
||||
|
||||
export default JobQueue;
|
||||
export default NewJobQueue;
|
||||
|
||||
Reference in New Issue
Block a user