From e222b13b5135cb05736eb859074017da1fb964ef Mon Sep 17 00:00:00 2001 From: Alex Holliday Date: Wed, 1 Jan 2025 13:32:18 -0800 Subject: [PATCH] add separate queue for each monitor type --- Server/service/jobQueue.js | 459 ++++++++++++++++++++++--------------- 1 file changed, 271 insertions(+), 188 deletions(-) diff --git a/Server/service/jobQueue.js b/Server/service/jobQueue.js index bf7e11479..7942e729b 100644 --- a/Server/service/jobQueue.js +++ b/Server/service/jobQueue.js @@ -1,30 +1,22 @@ -const QUEUE_NAME = "monitors"; +const QUEUE_NAMES = ["uptime", "pagespeed", "hardware"]; +const SERVICE_NAME = "NewJobQueue"; const JOBS_PER_WORKER = 5; -const SERVICE_NAME = "JobQueue"; +const QUEUE_LOOKUP = { + hardware: "hardware", + http: "uptime", + ping: "uptime", + docker: "uptime", + pagespeed: "pagespeed", +}; -import { errorMessages, successMessages } from "../utils/messages.js"; -/** - * JobQueue - * - * This service is responsible for managing the job queue. - * It handles enqueuing, dequeuing, and processing jobs. - * It scales the number of workers based on the number of jobs/worker - */ -class JobQueue { +import { successMessages, errorMessages } from "../utils/messages.js"; +class NewJobQueue { static SERVICE_NAME = SERVICE_NAME; - /** - * @class JobQueue - * @classdesc Manages job queue and workers. - * - * @param {Object} statusService - Service for handling status updates. - * @param {Object} notificationService - Service for handling notifications. - * @param {Object} settingsService - Service for retrieving settings. - * @param {Object} logger - Logger for logging information. - * @param {Function} Queue - Queue constructor. - * @param {Function} Worker - Worker constructor. - */ + constructor( + db, statusService, + networkService, notificationService, settingsService, logger, @@ -32,102 +24,60 @@ class JobQueue { Worker ) { const settings = settingsService.getSettings() || {}; - const { redisHost = "127.0.0.1", redisPort = 6379 } = settings; const connection = { host: redisHost, port: redisPort, }; - this.queue = new Queue(QUEUE_NAME, { - connection, - }); + + this.queues = {}; + this.workers = {}; + this.connection = connection; - this.workers = []; - this.db = null; - this.networkService = null; + this.db = db; + this.networkService = networkService; this.statusService = statusService; this.notificationService = notificationService; this.settingsService = settingsService; this.logger = logger; this.Worker = Worker; + + QUEUE_NAMES.forEach((name) => { + this.queues[name] = new Queue(name, { connection }); + this.workers[name] = []; + }); } /** - * Creates and initializes a JobQueue instance. - * - * @param {Object} db - Database service for accessing monitors. - * @param {Object} networkService - Service for network operations. - * @param {Object} statusService - Service for handling status updates. - * @param {Object} notificationService - Service for handling notifications. - * @param {Object} settingsService - Service for retrieving settings. - * @param {Object} logger - Logger for logging information. - * @param {Function} Queue - Queue constructor. - * @param {Function} Worker - Worker constructor. - * @returns {Promise} - The initialized JobQueue instance. - * @throws {Error} - Throws an error if initialization fails. + * Initializes job queues by adding jobs for all active monitors + * @async + * @function initJobQueue + * @description Retrieves all monitors from the database and adds jobs for active ones to their respective queues + * @throws {Error} If there's an error retrieving monitors or adding jobs + * @returns {Promise} */ - static async createJobQueue( - db, - networkService, - statusService, - notificationService, - settingsService, - logger, - Queue, - Worker - ) { - const queue = new JobQueue( - statusService, - notificationService, - settingsService, - logger, - Queue, - Worker - ); - try { - queue.db = db; - queue.networkService = networkService; - const monitors = await db.getAllMonitors(); - for (const monitor of monitors) { - if (monitor.isActive) { - queue.addJob(monitor.id, monitor).catch((error) => { - this.logger.error({ - message: error.message, - service: SERVICE_NAME, - method: "createJobQueue", - stack: error.stack, - }); - }); - } + async initJobQueue() { + const monitors = await this.db.getAllMonitors(); + for (const monitor of monitors) { + if (monitor.isActive) { + await this.addJob(monitor.type, monitor); } - - queue - .getWorkerStats() - .then((workerStats) => { - queue.scaleWorkers(workerStats); - }) - .catch((error) => { - this.logger.error({ - message: error.message, - service: SERVICE_NAME, - method: "createJobQueue", - stack: error.stack, - }); - }); - return queue; - } catch (error) { - error.service === undefined ? (error.service = SERVICE_NAME) : null; - error.method === undefined ? (error.method = "createJobQueue") : null; - throw error; } } /** - * Checks if the given monitor is in a maintenance window. - * - * @param {string} monitorId - The ID of the monitor to check. - * @returns {Promise} - Returns true if the monitor is in a maintenance window, otherwise false. - * @throws {Error} - Throws an error if the database query fails. + * Checks if a monitor is currently in a maintenance window + * @async + * @param {string} monitorId - The ID of the monitor to check + * @returns {Promise} Returns true if the monitor is in an active maintenance window, false otherwise + * @throws {Error} If there's an error retrieving maintenance windows from the database + * @description + * Retrieves all maintenance windows for a monitor and checks if any are currently active. + * A maintenance window is considered active if: + * 1. The window is marked as active AND + * 2. Either: + * - Current time falls between start and end times + * - For repeating windows: Current time falls between any repeated interval */ async isInMaintenanceWindow(monitorId) { const maintenanceWindows = await this.db.getMaintenanceWindowsByMonitorId(monitorId); @@ -160,9 +110,23 @@ class JobQueue { } /** - * Creates a job handler function for processing jobs. + * Creates a job processing handler for monitor checks + * @function createJobHandler + * @returns {Function} An async function that processes monitor check jobs + * @description + * Creates and returns a job handler that: + * 1. Checks if monitor is in maintenance window + * 2. If not in maintenance, performs network status check + * 3. Updates monitor status in database + * 4. Triggers notifications if status changed * - * @returns {Function} An async function that processes a job. + * @param {Object} job - The job to process + * @param {Object} job.data - The monitor data + * @param {string} job.data._id - Monitor ID + * @param {string} job.id - Job ID + * + * @throws {Error} Logs errors but doesn't throw them to prevent job failure + * @returns {Promise} Resolves when job processing is complete */ createJobHandler() { return async (job) => { @@ -206,24 +170,26 @@ class JobQueue { } /** - * Creates a worker for the queue - * Operations are carried out in the async callback - * @returns {Worker} The newly created worker + * Creates a new worker for processing jobs in a queue + * @param {Queue} queue - The BullMQ queue to create a worker for + * @returns {Worker} A new BullMQ worker instance + * @description + * Creates and configures a new worker with: + * - Queue-specific job handler + * - Redis connection settings + * - Default worker options + * The worker processes jobs from the specified queue using the job handler + * created by createJobHandler() + * + * @throws {Error} If worker creation fails or connection is invalid */ - createWorker() { - const worker = new this.Worker(QUEUE_NAME, this.createJobHandler(), { + createWorker(queue) { + const worker = new this.Worker(queue.name, this.createJobHandler(), { connection: this.connection, }); return worker; } - /** - * @typedef {Object} WorkerStats - * @property {Array} jobs - Array of jobs in the Queue - * @property {number} - workerLoad - The number of jobs per worker - * - */ - /** * Gets stats related to the workers * This is used for scaling workers right now @@ -233,10 +199,10 @@ class JobQueue { * @async * @returns {Promise} - Returns the worker stats */ - async getWorkerStats() { + async getWorkerStats(queue) { try { - const jobs = await this.queue.getRepeatableJobs(); - const load = jobs.length / this.workers.length; + const jobs = await queue.getRepeatableJobs(); + const load = jobs.length / this.workers[queue.name].length; return { jobs, load }; } catch (error) { error.service === undefined ? (error.service = SERVICE_NAME) : null; @@ -246,46 +212,56 @@ class JobQueue { } /** - * Scale Workers - * This function scales workers based on the load per worker - * If the load is higher than the JOBS_PER_WORKER threshold, we add more workers - * If the load is lower than the JOBS_PER_WORKER threshold, we release workers - * This approach ignores server performance, which we should add in the future - * + * Scales workers up or down based on queue load + * @async + * @param {Object} workerStats - Statistics about current worker load + * @param {number} workerStats.load - Current load per worker + * @param {Array} workerStats.jobs - Array of current jobs + * @param {Queue} queue - The BullMQ queue to scale workers for + * @returns {Promise} True if scaling occurred, false if no scaling was needed + * @throws {Error} If no workers array exists for the queue + * @description + * Scales workers based on these rules: + * - Maintains minimum of 5 workers + * - Adds workers if load exceeds JOBS_PER_WORKER + * - Removes workers if load is below JOBS_PER_WORKER + * - Creates initial workers if none exist + * Worker scaling is calculated based on excess jobs or excess capacity + */ + async scaleWorkers(workerStats, queue) { + const workers = this.workers[queue.name]; + if (workers === undefined) { + throw new Error(`No workers found for ${queue.name}`); + } - * @async - * @param {WorkerStats} workerStats - The payload for the job. - * @returns {Promise} - */ - async scaleWorkers(workerStats) { - if (this.workers.length === 0) { + if (workers.length === 0) { // There are no workers, need to add one for (let i = 0; i < 5; i++) { - const worker = this.createWorker(); - this.workers.push(worker); + const worker = this.createWorker(queue); + workers.push(worker); } return true; } if (workerStats.load > JOBS_PER_WORKER) { // Find out how many more jobs we have than current workers can handle - const excessJobs = workerStats.jobs.length - this.workers.length * JOBS_PER_WORKER; + const excessJobs = workerStats.jobs.length - workers.length * JOBS_PER_WORKER; // Divide by jobs/worker to find out how many workers to add const workersToAdd = Math.ceil(excessJobs / JOBS_PER_WORKER); for (let i = 0; i < workersToAdd; i++) { - const worker = this.createWorker(); - this.workers.push(worker); + const worker = this.createWorker(queue); + workers.push(worker); } return true; } if (workerStats.load < JOBS_PER_WORKER) { // Find out how much excess capacity we have - const workerCapacity = this.workers.length * JOBS_PER_WORKER; + const workerCapacity = workers.length * JOBS_PER_WORKER; const excessCapacity = workerCapacity - workerStats.jobs.length; // Calculate how many workers to remove let workersToRemove = Math.floor(excessCapacity / JOBS_PER_WORKER); // Make sure there are always at least 5 - while (workersToRemove > 0 && this.workers.length > 5) { - const worker = this.workers.pop(); + while (workersToRemove > 0 && workers.length > 5) { + const worker = workers.pop(); workersToRemove--; await worker.close().catch((error) => { // Catch the error instead of throwing it @@ -309,9 +285,9 @@ class JobQueue { * @returns {Promise>} * @throws {Error} - Throws error if getting jobs fails */ - async getJobs() { + async getJobs(queue) { try { - const jobs = await this.queue.getRepeatableJobs(); + const jobs = await queue.getRepeatableJobs(); return jobs; } catch (error) { error.service === undefined ? (error.service = SERVICE_NAME) : null; @@ -321,21 +297,32 @@ class JobQueue { } /** - * Retrieves the statistics of jobs and workers. - * - * @returns {Promise} - An object containing job statistics and the number of workers. - * @throws {Error} - Throws an error if the job statistics retrieval fails. + * Retrieves detailed statistics about jobs and workers for all queues + * @async + * @returns {Promise} Queue statistics object + * @throws {Error} If there's an error retrieving job information + * @description + * Returns an object with statistics for each queue including: + * - List of jobs with their URLs and current states + * - Number of workers assigned to the queue */ async getJobStats() { try { - const jobs = await this.queue.getJobs(); - const ret = await Promise.all( - jobs.map(async (job) => { - const state = await job.getState(); - return { url: job.data.url, state }; + let stats = {}; + await Promise.all( + QUEUE_NAMES.map(async (name) => { + const queue = this.queues[name]; + const jobs = await queue.getJobs(); + const ret = await Promise.all( + jobs.map(async (job) => { + const state = await job.getState(); + return { url: job.data.url, state }; + }) + ); + stats[name] = { jobs: ret, workers: this.workers[name].length }; }) ); - return { jobs: ret, workers: this.workers.length }; + return stats; } catch (error) { error.service === undefined ? (error.service = SERVICE_NAME) : null; error.method === undefined ? (error.method = "getJobStats") : null; @@ -344,25 +331,59 @@ class JobQueue { } /** - * Adds a job to the queue and scales workers based on worker stats. - * + * Adds both immediate and repeatable jobs to the appropriate queue * @async - * @param {string} jobName - The name of the job to be added. - * @param {Monitor} payload - The payload for the job. - * @throws {Error} - Will throw an error if the job cannot be added or workers don't scale + * @param {string} jobName - Name identifier for the job + * @param {Object} payload - Job data and configuration + * @param {string} payload.type - Type of monitor/queue ('uptime', 'pagespeed', 'hardware') + * @param {string} [payload.url] - URL to monitor (optional) + * @param {number} [payload.interval=60000] - Repeat interval in milliseconds + * @param {string} payload._id - Monitor ID + * @throws {Error} If queue not found for payload type + * @throws {Error} If job addition fails + * @description + * 1. Identifies correct queue based on payload type + * 2. Adds immediate job execution + * 3. Adds repeatable job with specified interval + * 4. Scales workers based on updated queue load + * Jobs are configured with exponential backoff, single attempt, + * and automatic removal on completion */ async addJob(jobName, payload) { try { this.logger.info({ message: `Adding job ${payload?.url ?? "No URL"}` }); + + // Find the correct queue + + const queue = this.queues[QUEUE_LOOKUP[payload.type]]; + if (queue === undefined) { + throw new Error(`Queue for ${payload.type} not found`); + } + + // build job options + const jobOptions = { + attempts: 1, + backoff: { + type: "exponential", + delay: 1000, + }, + removeOnComplete: true, + removeOnFail: false, + timeout: 1 * 60 * 1000, + }; + // Execute job immediately - await this.queue.add(jobName, payload); - await this.queue.add(jobName, payload, { + await queue.add(jobName, payload, jobOptions); + await queue.add(jobName, payload, { + ...jobOptions, repeat: { every: payload?.interval ?? 60000, + immediately: false, }, }); - const workerStats = await this.getWorkerStats(); - await this.scaleWorkers(workerStats); + + const workerStats = await this.getWorkerStats(queue); + await this.scaleWorkers(workerStats, queue); } catch (error) { error.service === undefined ? (error.service = SERVICE_NAME) : null; error.method === undefined ? (error.method = "addJob") : null; @@ -371,15 +392,25 @@ class JobQueue { } /** - * Deletes a job from the queue. - * + * Deletes a repeatable job from its queue and adjusts worker scaling * @async - * @param {Monitor} monitor - The monitor to remove. - * @throws {Error} + * @param {Object} monitor - Monitor object containing job details + * @param {string} monitor._id - ID of the monitor/job to delete + * @param {string} monitor.type - Type of monitor determining queue selection + * @param {number} monitor.interval - Job repeat interval in milliseconds + * @throws {Error} If queue not found for monitor type + * @throws {Error} If job deletion fails + * @description + * 1. Identifies correct queue based on monitor type + * 2. Removes repeatable job using monitor ID and interval + * 3. Logs success or failure of deletion + * 4. Updates worker scaling based on new queue load + * Returns void but logs operation result */ async deleteJob(monitor) { try { - const wasDeleted = await this.queue.removeRepeatable(monitor._id, { + const queue = this.queues[QUEUE_LOOKUP[monitor.type]]; + const wasDeleted = await queue.removeRepeatable(monitor._id, { every: monitor.interval, }); if (wasDeleted === true) { @@ -389,8 +420,8 @@ class JobQueue { method: "deleteJob", details: `Deleted job ${monitor._id}`, }); - const workerStats = await this.getWorkerStats(); - await this.scaleWorkers(workerStats); + const workerStats = await this.getWorkerStats(queue); + await this.scaleWorkers(workerStats, queue); } else { this.logger.error({ message: errorMessages.JOB_QUEUE_DELETE_JOB, @@ -407,21 +438,59 @@ class JobQueue { } /** - * Retrieves the metrics of the job queue. + * Retrieves comprehensive metrics for all queues + * @async + * @returns {Promise>} Object with metrics for each queue + * @throws {Error} If metrics retrieval fails + * @description + * Collects the following metrics for each queue: + * - Number of waiting jobs + * - Number of active jobs + * - Number of completed jobs + * - Number of failed jobs + * - Number of delayed jobs + * - Number of repeatable jobs + * - Number of active workers * - * @returns {Promise} - An object containing various job queue metrics. - * @throws {Error} - Throws an error if the metrics retrieval fails. + * @typedef {Object} QueueMetrics + * @property {number} waiting - Count of jobs waiting to be processed + * @property {number} active - Count of jobs currently being processed + * @property {number} completed - Count of successfully completed jobs + * @property {number} failed - Count of failed jobs + * @property {number} delayed - Count of delayed jobs + * @property {number} repeatableJobs - Count of repeatable job patterns + * @property {number} workers - Count of active workers for this queue */ async getMetrics() { try { - const metrics = { - waiting: await this.queue.getWaitingCount(), - active: await this.queue.getActiveCount(), - completed: await this.queue.getCompletedCount(), - failed: await this.queue.getFailedCount(), - delayed: await this.queue.getDelayedCount(), - repeatableJobs: (await this.queue.getRepeatableJobs()).length, - }; + let metrics = {}; + + await Promise.all( + QUEUE_NAMES.map(async (name) => { + const queue = this.queues[name]; + const workers = this.workers[name]; + const [waiting, active, completed, failed, delayed, repeatableJobs] = + await Promise.all([ + queue.getWaitingCount(), + queue.getActiveCount(), + queue.getCompletedCount(), + queue.getFailedCount(), + queue.getDelayedCount(), + queue.getRepeatableJobs(), + ]); + + metrics[name] = { + waiting, + active, + completed, + failed, + delayed, + repeatableJobs: repeatableJobs.length, + workers: workers.length, + }; + }) + ); + return metrics; } catch (error) { this.logger.error({ @@ -441,23 +510,37 @@ class JobQueue { async obliterate() { try { this.logger.info({ message: "Attempting to obliterate job queue..." }); - await this.queue.pause(); - const jobs = await this.getJobs(); - - // Remove all repeatable jobs - for (const job of jobs) { - await this.queue.removeRepeatableByKey(job.key); - await this.queue.remove(job.id); - } - - // Close workers await Promise.all( - this.workers.map(async (worker) => { - await worker.close(); + QUEUE_NAMES.map(async (name) => { + const queue = this.queues[name]; + await queue.pause(); + const jobs = await this.getJobs(queue); + + // Remove all repeatable jobs + for (const job of jobs) { + await queue.removeRepeatableByKey(job.key); + await queue.remove(job.id); + } }) ); - await this.queue.obliterate(); + // Close workers + await Promise.all( + QUEUE_NAMES.map(async (name) => { + const workers = this.workers[name]; + await Promise.all( + workers.map(async (worker) => { + await worker.close(); + }) + ); + }) + ); + + QUEUE_NAMES.forEach(async (name) => { + const queue = this.queues[name]; + await queue.obliterate(); + }); + const metrics = await this.getMetrics(); this.logger.info({ message: successMessages.JOB_QUEUE_OBLITERATE, @@ -474,4 +557,4 @@ class JobQueue { } } -export default JobQueue; +export default NewJobQueue;