mirror of
https://github.com/bluewave-labs/Checkmate.git
synced 2026-05-03 15:09:34 -05:00
Merge pull request #2291 from bluewave-labs/feat/job-queue-refactor
feat: job queue refactor
This commit is contained in:
@@ -23,7 +23,7 @@ class JobQueueController {
|
||||
|
||||
getJobs = async (req, res, next) => {
|
||||
try {
|
||||
const jobs = await this.jobQueue.getJobStats();
|
||||
const jobs = await this.jobQueue.getJobs();
|
||||
return res.success({
|
||||
msg: this.stringService.queueGetMetrics,
|
||||
data: jobs,
|
||||
@@ -60,7 +60,7 @@ class JobQueueController {
|
||||
|
||||
flushQueue = async (req, res, next) => {
|
||||
try {
|
||||
const result = await this.jobQueue.flushQueue();
|
||||
const result = await this.jobQueue.flushQueues();
|
||||
return res.success({
|
||||
msg: this.stringService.jobQueueFlush,
|
||||
data: result,
|
||||
|
||||
@@ -43,7 +43,7 @@ const createCheck = async (checkData) => {
|
||||
|
||||
const createChecks = async (checks) => {
|
||||
try {
|
||||
await Check.insertMany(checks);
|
||||
await Check.insertMany(checks, { ordered: false });
|
||||
} catch (error) {
|
||||
error.service = SERVICE_NAME;
|
||||
error.method = "createCheck";
|
||||
|
||||
@@ -128,9 +128,8 @@ const createDistributedChecks = async (checksData) => {
|
||||
};
|
||||
});
|
||||
|
||||
// Execute bulk operation
|
||||
await DistributedUptimeCheck.bulkWrite(bulkOps, {
|
||||
ordered: false, // Allow parallel processing
|
||||
ordered: false,
|
||||
});
|
||||
} catch (error) {
|
||||
error.service = SERVICE_NAME;
|
||||
|
||||
@@ -51,7 +51,7 @@ const createHardwareCheck = async (hardwareCheckData) => {
|
||||
|
||||
const createHardwareChecks = async (hardwareChecks) => {
|
||||
try {
|
||||
await HardwareCheck.insertMany(hardwareChecks);
|
||||
await HardwareCheck.insertMany(hardwareChecks, { ordered: false });
|
||||
return true;
|
||||
} catch (error) {
|
||||
error.service = SERVICE_NAME;
|
||||
|
||||
@@ -26,7 +26,7 @@ const createPageSpeedCheck = async (pageSpeedCheckData) => {
|
||||
};
|
||||
const createPageSpeedChecks = async (pageSpeedChecks) => {
|
||||
try {
|
||||
await PageSpeedCheck.insertMany(pageSpeedChecks);
|
||||
await PageSpeedCheck.insertMany(pageSpeedChecks, { ordered: false });
|
||||
return true;
|
||||
} catch (error) {
|
||||
error.service = SERVICE_NAME;
|
||||
|
||||
+18
-23
@@ -46,7 +46,8 @@ import DiagnosticRoutes from "./routes/diagnosticRoute.js";
|
||||
import DiagnosticController from "./controllers/diagnosticController.js";
|
||||
|
||||
//JobQueue service and dependencies
|
||||
import JobQueue from "./service/jobQueue.js";
|
||||
import JobQueue from "./service/JobQueue/JobQueue.js";
|
||||
import JobQueueHelper from "./service/JobQueue/JobQueueHelper.js";
|
||||
import { Queue, Worker } from "bullmq";
|
||||
|
||||
//Network service and dependencies
|
||||
@@ -112,17 +113,13 @@ const shutdown = async () => {
|
||||
service: SERVICE_NAME,
|
||||
method: "shutdown",
|
||||
});
|
||||
// flush Redis
|
||||
const redisService = ServiceRegistry.get(RedisService.SERVICE_NAME);
|
||||
await redisService.flushall();
|
||||
|
||||
await ServiceRegistry.get(RedisService.SERVICE_NAME).flushRedis();
|
||||
process.exit(1);
|
||||
}, SHUTDOWN_TIMEOUT);
|
||||
try {
|
||||
server.close();
|
||||
await ServiceRegistry.get(JobQueue.SERVICE_NAME).obliterate();
|
||||
await ServiceRegistry.get(JobQueue.SERVICE_NAME).shutdown();
|
||||
await ServiceRegistry.get(MongoDB.SERVICE_NAME).disconnect();
|
||||
await ServiceRegistry.get(RedisService.SERVICE_NAME).flushall();
|
||||
logger.info({ message: "Graceful shutdown complete" });
|
||||
process.exit(0);
|
||||
} catch (error) {
|
||||
@@ -182,23 +179,23 @@ const startApp = async () => {
|
||||
stringService
|
||||
);
|
||||
|
||||
const redisService = await RedisService.createInstance({
|
||||
logger,
|
||||
IORedis,
|
||||
SettingsService: settingsService,
|
||||
});
|
||||
const redisService = new RedisService({ Redis: IORedis, logger });
|
||||
|
||||
const jobQueue = new JobQueue({
|
||||
db,
|
||||
statusService,
|
||||
networkService,
|
||||
notificationService,
|
||||
settingsService,
|
||||
stringService,
|
||||
logger,
|
||||
const jobQueueHelper = new JobQueueHelper({
|
||||
redisService,
|
||||
Queue,
|
||||
Worker,
|
||||
redisService,
|
||||
logger,
|
||||
db,
|
||||
networkService,
|
||||
statusService,
|
||||
notificationService,
|
||||
});
|
||||
const jobQueue = await JobQueue.create({
|
||||
db,
|
||||
jobQueueHelper,
|
||||
logger,
|
||||
stringService,
|
||||
});
|
||||
|
||||
// Register services
|
||||
@@ -310,8 +307,6 @@ const startApp = async () => {
|
||||
);
|
||||
const notificationRoutes = new NotificationRoutes(notificationController);
|
||||
const diagnosticRoutes = new DiagnosticRoutes(diagnosticController);
|
||||
// Init job queue
|
||||
await jobQueue.initJobQueue();
|
||||
// Middleware
|
||||
app.use(responseHandler);
|
||||
app.use(
|
||||
|
||||
@@ -0,0 +1,310 @@
|
||||
const QUEUE_NAMES = ["uptime", "pagespeed", "hardware", "distributed"];
|
||||
const SERVICE_NAME = "JobQueue";
|
||||
const HEALTH_CHECK_INTERVAL = 10 * 60 * 1000; // 10 minutes
|
||||
const QUEUE_LOOKUP = {
|
||||
hardware: "hardware",
|
||||
http: "uptime",
|
||||
ping: "uptime",
|
||||
port: "uptime",
|
||||
docker: "uptime",
|
||||
pagespeed: "pagespeed",
|
||||
distributed_http: "distributed",
|
||||
};
|
||||
const getSchedulerId = (monitor) => `scheduler:${monitor.type}:${monitor._id}`;
|
||||
|
||||
class JobQueue {
|
||||
static SERVICE_NAME = SERVICE_NAME;
|
||||
|
||||
constructor({ db, jobQueueHelper, logger, stringService }) {
|
||||
this.db = db;
|
||||
this.jobQueueHelper = jobQueueHelper;
|
||||
this.stringService = stringService;
|
||||
this.logger = logger;
|
||||
this.queues = {};
|
||||
this.workers = [];
|
||||
}
|
||||
|
||||
static async create({ db, jobQueueHelper, logger, stringService }) {
|
||||
const instance = new JobQueue({ db, jobQueueHelper, logger, stringService });
|
||||
await instance.init();
|
||||
return instance;
|
||||
}
|
||||
|
||||
async init() {
|
||||
try {
|
||||
await this.initQueues();
|
||||
await this.initWorkers();
|
||||
const monitors = await this.db.getAllMonitors();
|
||||
await Promise.all(
|
||||
monitors
|
||||
.filter((monitor) => monitor.isActive)
|
||||
.map(async (monitor) => {
|
||||
try {
|
||||
await this.addJob(monitor._id, monitor);
|
||||
} catch (error) {
|
||||
this.logger.error({
|
||||
message: error.message,
|
||||
service: SERVICE_NAME,
|
||||
method: "initJobQueue",
|
||||
stack: error.stack,
|
||||
});
|
||||
}
|
||||
})
|
||||
);
|
||||
|
||||
this.healthCheckInterval = setInterval(async () => {
|
||||
try {
|
||||
const queueHealthChecks = await this.checkQueueHealth();
|
||||
const queueIsStuck = queueHealthChecks.some((healthCheck) => healthCheck.stuck);
|
||||
if (queueIsStuck) {
|
||||
this.logger.warn({
|
||||
message: "Queue is stuck",
|
||||
service: SERVICE_NAME,
|
||||
method: "periodicHealthCheck",
|
||||
details: queueHealthChecks,
|
||||
});
|
||||
await this.flushQueues();
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error({
|
||||
message: error.message,
|
||||
service: SERVICE_NAME,
|
||||
method: "periodicHealthCheck",
|
||||
stack: error.stack,
|
||||
});
|
||||
}
|
||||
}, HEALTH_CHECK_INTERVAL);
|
||||
} catch (error) {
|
||||
this.logger.error({
|
||||
message: error.message,
|
||||
service: SERVICE_NAME,
|
||||
method: "initJobQueue",
|
||||
stack: error.stack,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async initQueues() {
|
||||
const readyPromises = [];
|
||||
|
||||
for (const queueName of QUEUE_NAMES) {
|
||||
const q = this.jobQueueHelper.createQueue(queueName);
|
||||
this.queues[queueName] = q;
|
||||
readyPromises.push(q.waitUntilReady());
|
||||
}
|
||||
await Promise.all(readyPromises);
|
||||
this.logger.info({
|
||||
message: "Queues ready",
|
||||
service: SERVICE_NAME,
|
||||
method: "initQueues",
|
||||
});
|
||||
}
|
||||
|
||||
async initWorkers() {
|
||||
const workerReadyPromises = [];
|
||||
|
||||
for (const queueName of QUEUE_NAMES) {
|
||||
const worker = this.jobQueueHelper.createWorker(queueName, this.queues[queueName]);
|
||||
this.workers.push(worker);
|
||||
workerReadyPromises.push(worker.waitUntilReady());
|
||||
}
|
||||
await Promise.all(workerReadyPromises);
|
||||
this.logger.info({
|
||||
message: "Workers ready",
|
||||
service: SERVICE_NAME,
|
||||
method: "initWorkers",
|
||||
});
|
||||
}
|
||||
|
||||
async addJob(jobName, monitor) {
|
||||
this.logger.info({
|
||||
message: `Adding job ${monitor?.url ?? "No URL"}`,
|
||||
service: SERVICE_NAME,
|
||||
method: "addJob",
|
||||
});
|
||||
|
||||
const queueName = QUEUE_LOOKUP[monitor.type];
|
||||
const queue = this.queues[queueName];
|
||||
if (typeof queue === "undefined") {
|
||||
throw new Error(`Queue for ${monitor.type} not found`);
|
||||
}
|
||||
const jobTemplate = {
|
||||
name: jobName,
|
||||
data: monitor,
|
||||
opts: {
|
||||
attempts: 1,
|
||||
backoff: {
|
||||
type: "exponential",
|
||||
delay: 1000,
|
||||
},
|
||||
removeOnComplete: true,
|
||||
removeOnFail: false,
|
||||
timeout: 1 * 60 * 1000,
|
||||
},
|
||||
};
|
||||
|
||||
const schedulerId = getSchedulerId(monitor);
|
||||
await queue.upsertJobScheduler(
|
||||
schedulerId,
|
||||
{ every: monitor?.interval ?? 60000 },
|
||||
jobTemplate
|
||||
);
|
||||
}
|
||||
|
||||
async deleteJob(monitor) {
|
||||
try {
|
||||
const queue = this.queues[QUEUE_LOOKUP[monitor.type]];
|
||||
const schedulerId = getSchedulerId(monitor);
|
||||
const wasDeleted = await queue.removeJobScheduler(schedulerId);
|
||||
|
||||
if (wasDeleted === true) {
|
||||
this.logger.info({
|
||||
message: this.stringService.jobQueueDeleteJob,
|
||||
service: SERVICE_NAME,
|
||||
method: "deleteJob",
|
||||
details: `Deleted job ${monitor._id}`,
|
||||
});
|
||||
return true;
|
||||
} else {
|
||||
this.logger.error({
|
||||
message: this.stringService.jobQueueDeleteJob,
|
||||
service: SERVICE_NAME,
|
||||
method: "deleteJob",
|
||||
details: `Failed to delete job ${monitor._id}`,
|
||||
});
|
||||
return false;
|
||||
}
|
||||
} catch (error) {
|
||||
error.service === undefined ? (error.service = SERVICE_NAME) : null;
|
||||
error.method === undefined ? (error.method = "deleteJob") : null;
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async getJobs() {
|
||||
try {
|
||||
let stats = {};
|
||||
await Promise.all(
|
||||
QUEUE_NAMES.map(async (name) => {
|
||||
const queue = this.queues[name];
|
||||
const jobs = await queue.getJobs();
|
||||
const ret = await Promise.all(
|
||||
jobs.map(async (job) => {
|
||||
const state = await job.getState();
|
||||
return { url: job.data.url, state, progress: job.progress };
|
||||
})
|
||||
);
|
||||
stats[name] = { jobs: ret };
|
||||
})
|
||||
);
|
||||
return stats;
|
||||
} catch (error) {
|
||||
error.service === undefined ? (error.service = SERVICE_NAME) : null;
|
||||
error.method === undefined ? (error.method = "getJobStats") : null;
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async getMetrics() {
|
||||
try {
|
||||
let metrics = {};
|
||||
|
||||
await Promise.all(
|
||||
QUEUE_NAMES.map(async (name) => {
|
||||
const queue = this.queues[name];
|
||||
const [waiting, active, failed, delayed, repeatableJobs] = await Promise.all([
|
||||
queue.getWaitingCount(),
|
||||
queue.getActiveCount(),
|
||||
queue.getFailedCount(),
|
||||
queue.getDelayedCount(),
|
||||
queue.getRepeatableJobs(),
|
||||
]);
|
||||
|
||||
metrics[name] = {
|
||||
waiting,
|
||||
active,
|
||||
failed,
|
||||
delayed,
|
||||
repeatableJobs: repeatableJobs.length,
|
||||
};
|
||||
})
|
||||
);
|
||||
|
||||
return metrics;
|
||||
} catch (error) {
|
||||
this.logger.error({
|
||||
message: error.message,
|
||||
service: SERVICE_NAME,
|
||||
method: "getMetrics",
|
||||
stack: error.stack,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async checkQueueHealth() {
|
||||
const res = [];
|
||||
for (const queueName of QUEUE_NAMES) {
|
||||
const q = this.queues[queueName];
|
||||
await q.waitUntilReady();
|
||||
|
||||
const lastJobProcessedTime = q.lastJobProcessedTime;
|
||||
const currentTime = Date.now();
|
||||
const timeDiff = currentTime - lastJobProcessedTime;
|
||||
|
||||
// Check for jobs
|
||||
const jobCounts = await q.getJobCounts();
|
||||
const hasJobs = Object.values(jobCounts).some((count) => count > 0);
|
||||
|
||||
res.push({
|
||||
queueName,
|
||||
timeSinceLastJob: timeDiff,
|
||||
stuck: hasJobs && timeDiff > 10000,
|
||||
jobCounts,
|
||||
});
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
async flushQueues() {
|
||||
try {
|
||||
this.logger.warn({
|
||||
message: "Flushing queues",
|
||||
method: "flushQueues",
|
||||
service: SERVICE_NAME,
|
||||
});
|
||||
for (const worker of this.workers) {
|
||||
await worker.close();
|
||||
}
|
||||
this.workers = [];
|
||||
|
||||
for (const queue of Object.values(this.queues)) {
|
||||
await queue.obliterate();
|
||||
}
|
||||
this.queue = {};
|
||||
await this.init();
|
||||
return true;
|
||||
} catch (error) {
|
||||
this.logger.warn({
|
||||
message: `${error.message} - Flushing redis manually`,
|
||||
service: SERVICE_NAME,
|
||||
method: "flushQueues",
|
||||
});
|
||||
return await this.jobQueueHelper.flushRedis();
|
||||
}
|
||||
}
|
||||
|
||||
async shutdown() {
|
||||
if (this.healthCheckInterval) {
|
||||
clearInterval(this.healthCheckInterval);
|
||||
this.healthCheckInterval = null;
|
||||
}
|
||||
for (const worker of this.workers) {
|
||||
await worker.close();
|
||||
}
|
||||
for (const queue of Object.values(this.queues)) {
|
||||
await queue.obliterate();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export default JobQueue;
|
||||
@@ -0,0 +1,323 @@
|
||||
const SERVICE_NAME = "JobQueueHelper";
|
||||
|
||||
class JobQueueHelper {
|
||||
constructor({
|
||||
redisService,
|
||||
Queue,
|
||||
Worker,
|
||||
logger,
|
||||
db,
|
||||
networkService,
|
||||
statusService,
|
||||
notificationService,
|
||||
}) {
|
||||
this.db = db;
|
||||
this.redisService = redisService;
|
||||
this.Queue = Queue;
|
||||
this.Worker = Worker;
|
||||
this.logger = logger;
|
||||
this.networkService = networkService;
|
||||
this.statusService = statusService;
|
||||
this.notificationService = notificationService;
|
||||
}
|
||||
|
||||
createQueue(queueName) {
|
||||
const connection = this.redisService.getNewConnection();
|
||||
const q = new this.Queue(queueName, {
|
||||
connection,
|
||||
});
|
||||
q.lastJobProcessedTime = Date.now();
|
||||
q.on("cleaned", (jobs, type) => {
|
||||
this.logger.debug({
|
||||
message: `Queue ${queueName} is cleaned with jobs: ${jobs} and type: ${type}`,
|
||||
service: SERVICE_NAME,
|
||||
method: "createQueue:cleaned",
|
||||
});
|
||||
});
|
||||
q.on("error", (err) => {
|
||||
this.logger.error({
|
||||
message: `Queue ${queueName} is error with msg: ${err}`,
|
||||
service: SERVICE_NAME,
|
||||
method: "createQueue:error",
|
||||
});
|
||||
});
|
||||
q.on("ioredis:close", () => {
|
||||
this.logger.debug({
|
||||
message: `Queue ${queueName} is ioredis:close`,
|
||||
service: SERVICE_NAME,
|
||||
method: "createQueue:ioredis:close",
|
||||
});
|
||||
});
|
||||
q.on("paused", () => {
|
||||
this.logger.debug({
|
||||
message: `Queue ${queueName} is paused`,
|
||||
service: SERVICE_NAME,
|
||||
method: "createQueue:paused",
|
||||
});
|
||||
});
|
||||
q.on("progress", (job, progress) => {
|
||||
this.logger.debug({
|
||||
message: `Queue ${queueName} is progress with msg: ${progress}`,
|
||||
service: SERVICE_NAME,
|
||||
method: "createQueue:progress",
|
||||
});
|
||||
});
|
||||
q.on("removed", (job) => {
|
||||
this.logger.debug({
|
||||
message: `Queue ${queueName} is removed with msg: ${job}`,
|
||||
service: SERVICE_NAME,
|
||||
method: "createQueue:removed",
|
||||
});
|
||||
});
|
||||
q.on("resumed", () => {
|
||||
this.logger.debug({
|
||||
message: `Queue ${queueName} is resumed`,
|
||||
service: SERVICE_NAME,
|
||||
method: "createQueue:resumed",
|
||||
});
|
||||
});
|
||||
q.on("waiting", () => {
|
||||
this.logger.debug({
|
||||
message: `Queue ${queueName} is waiting`,
|
||||
service: SERVICE_NAME,
|
||||
method: "createQueue:waiting",
|
||||
});
|
||||
});
|
||||
return q;
|
||||
}
|
||||
|
||||
createWorker(queueName, queue) {
|
||||
const connection = this.redisService.getNewConnection({
|
||||
maxRetriesPerRequest: null,
|
||||
});
|
||||
const worker = new this.Worker(queueName, this.createJobHandler(queue), {
|
||||
connection,
|
||||
concurrency: 50,
|
||||
});
|
||||
worker.on("active", (job) => {
|
||||
this.logger.debug({
|
||||
message: `Worker ${queueName} is active`,
|
||||
service: SERVICE_NAME,
|
||||
method: "createWorker:active",
|
||||
});
|
||||
});
|
||||
|
||||
worker.on("closed", () => {
|
||||
this.logger.debug({
|
||||
message: `Worker ${queueName} is closed`,
|
||||
service: SERVICE_NAME,
|
||||
method: "createWorker:closed",
|
||||
});
|
||||
});
|
||||
|
||||
worker.on("closing", (msg) => {
|
||||
this.logger.debug({
|
||||
message: `Worker ${queueName} is closing with msg: ${msg}`,
|
||||
service: SERVICE_NAME,
|
||||
method: "createWorker:closing",
|
||||
});
|
||||
});
|
||||
|
||||
worker.on("completed", (job) => {
|
||||
this.logger.debug({
|
||||
message: `Worker ${queueName} is completed`,
|
||||
service: SERVICE_NAME,
|
||||
method: "createWorker:completed",
|
||||
});
|
||||
});
|
||||
|
||||
worker.on("drained", () => {
|
||||
this.logger.debug({
|
||||
message: `Worker ${queueName} is drained`,
|
||||
service: SERVICE_NAME,
|
||||
method: "createWorker:drained",
|
||||
});
|
||||
});
|
||||
|
||||
worker.on("error", (failedReason) => {
|
||||
this.logger.error({
|
||||
message: `Worker ${queueName} is error with msg: ${failedReason}`,
|
||||
service: SERVICE_NAME,
|
||||
method: "createWorker:error",
|
||||
});
|
||||
});
|
||||
|
||||
worker.on("failed", (job, error, prev) => {
|
||||
this.logger.warn({
|
||||
message: `Worker ${queueName} is failed with msg: ${error}`,
|
||||
service: SERVICE_NAME,
|
||||
method: "createWorker:failed",
|
||||
});
|
||||
});
|
||||
|
||||
worker.on("ioredis:close", () => {
|
||||
this.logger.debug({
|
||||
message: `Worker ${queueName} is ioredis:close`,
|
||||
service: SERVICE_NAME,
|
||||
method: "createWorker:ioredis:close",
|
||||
});
|
||||
});
|
||||
|
||||
worker.on("paused", () => {
|
||||
this.logger.debug({
|
||||
message: `Worker ${queueName} is paused`,
|
||||
service: SERVICE_NAME,
|
||||
method: "createWorker:paused",
|
||||
});
|
||||
});
|
||||
|
||||
worker.on("progress", (job, progress) => {
|
||||
this.logger.debug({
|
||||
message: `Worker ${queueName} is progress with msg: ${progress}`,
|
||||
service: SERVICE_NAME,
|
||||
method: "createWorker:progress",
|
||||
});
|
||||
});
|
||||
|
||||
worker.on("ready", () => {
|
||||
this.logger.debug({
|
||||
message: `Worker ${queueName} is ready`,
|
||||
service: SERVICE_NAME,
|
||||
method: "createWorker:ready",
|
||||
});
|
||||
});
|
||||
|
||||
worker.on("resumed", () => {
|
||||
this.logger.debug({
|
||||
message: `Worker ${queueName} is resumed`,
|
||||
service: SERVICE_NAME,
|
||||
method: "createWorker:resumed",
|
||||
});
|
||||
});
|
||||
|
||||
worker.on("stalled", () => {
|
||||
this.logger.warn({
|
||||
message: `Worker ${queueName} is stalled`,
|
||||
service: SERVICE_NAME,
|
||||
method: "createWorker:stalled",
|
||||
});
|
||||
});
|
||||
return worker;
|
||||
}
|
||||
|
||||
async isInMaintenanceWindow(monitorId) {
|
||||
const maintenanceWindows = await this.db.getMaintenanceWindowsByMonitorId(monitorId);
|
||||
// Check for active maintenance window:
|
||||
const maintenanceWindowIsActive = maintenanceWindows.reduce((acc, window) => {
|
||||
if (window.active) {
|
||||
const start = new Date(window.start);
|
||||
const end = new Date(window.end);
|
||||
const now = new Date();
|
||||
const repeatInterval = window.repeat || 0;
|
||||
|
||||
// If start is < now and end > now, we're in maintenance
|
||||
if (start <= now && end >= now) return true;
|
||||
|
||||
// If maintenance window was set in the past with a repeat,
|
||||
// we need to advance start and end to see if we are in range
|
||||
|
||||
while (start < now && repeatInterval !== 0) {
|
||||
start.setTime(start.getTime() + repeatInterval);
|
||||
end.setTime(end.getTime() + repeatInterval);
|
||||
if (start <= now && end >= now) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
return acc;
|
||||
}, false);
|
||||
return maintenanceWindowIsActive;
|
||||
}
|
||||
|
||||
createJobHandler(q) {
|
||||
return async (job) => {
|
||||
try {
|
||||
// Update the last job processed time for this queue
|
||||
q.lastJobProcessedTime = Date.now();
|
||||
// Get all maintenance windows for this monitor
|
||||
await job.updateProgress(0);
|
||||
const monitorId = job.data._id;
|
||||
const maintenanceWindowActive = await this.isInMaintenanceWindow(monitorId);
|
||||
// If a maintenance window is active, we're done
|
||||
|
||||
if (maintenanceWindowActive) {
|
||||
await job.updateProgress(100);
|
||||
this.logger.info({
|
||||
message: `Monitor ${monitorId} is in maintenance window`,
|
||||
service: SERVICE_NAME,
|
||||
method: "createWorker",
|
||||
});
|
||||
return false;
|
||||
}
|
||||
|
||||
// Get the current status
|
||||
await job.updateProgress(30);
|
||||
const networkResponse = await this.networkService.getStatus(job);
|
||||
if (
|
||||
job.data.type === "distributed_http" ||
|
||||
job.data.type === "distributed_test"
|
||||
) {
|
||||
await job.updateProgress(100);
|
||||
return true;
|
||||
}
|
||||
|
||||
// If the network response is not found, we're done
|
||||
if (!networkResponse) {
|
||||
await job.updateProgress(100);
|
||||
return false;
|
||||
}
|
||||
|
||||
// Handle status change
|
||||
await job.updateProgress(60);
|
||||
const { monitor, statusChanged, prevStatus } =
|
||||
await this.statusService.updateStatus(networkResponse);
|
||||
// Handle notifications
|
||||
await job.updateProgress(80);
|
||||
this.notificationService
|
||||
.handleNotifications({
|
||||
...networkResponse,
|
||||
monitor,
|
||||
prevStatus,
|
||||
statusChanged,
|
||||
})
|
||||
.catch((error) => {
|
||||
this.logger.error({
|
||||
message: error.message,
|
||||
service: SERVICE_NAME,
|
||||
method: "createJobHandler",
|
||||
details: `Error sending notifications for job ${job.id}: ${error.message}`,
|
||||
stack: error.stack,
|
||||
});
|
||||
});
|
||||
await job.updateProgress(100);
|
||||
return true;
|
||||
} catch (error) {
|
||||
this.logger.error({
|
||||
message: error.message,
|
||||
service: error.service ?? SERVICE_NAME,
|
||||
method: error.method ?? "createJobHandler",
|
||||
details: `Error processing job ${job.id}: ${error.message}`,
|
||||
stack: error.stack,
|
||||
});
|
||||
throw error;
|
||||
}
|
||||
};
|
||||
}
|
||||
async flushRedis() {
|
||||
try {
|
||||
const connection = this.redisService.getNewConnection();
|
||||
const flushResult = await connection.flushall();
|
||||
return flushResult;
|
||||
} catch (error) {
|
||||
this.logger.warn({
|
||||
message: error.message,
|
||||
service: SERVICE_NAME,
|
||||
method: "flushRedis",
|
||||
});
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export default JobQueueHelper;
|
||||
@@ -1,814 +0,0 @@
|
||||
const QUEUE_NAMES = ["uptime", "pagespeed", "hardware", "distributed"];
|
||||
const SERVICE_NAME = "JobQueue";
|
||||
const JOBS_PER_WORKER = 5;
|
||||
const HEALTH_CHECK_INTERVAL = 10 * 60 * 1000; // 10 minutes
|
||||
const QUEUE_LOOKUP = {
|
||||
hardware: "hardware",
|
||||
http: "uptime",
|
||||
ping: "uptime",
|
||||
port: "uptime",
|
||||
docker: "uptime",
|
||||
pagespeed: "pagespeed",
|
||||
distributed_http: "distributed",
|
||||
};
|
||||
const getSchedulerId = (monitor) => `scheduler:${monitor.type}:${monitor._id}`;
|
||||
|
||||
class NewJobQueue {
|
||||
static SERVICE_NAME = SERVICE_NAME;
|
||||
|
||||
constructor({
|
||||
db,
|
||||
statusService,
|
||||
networkService,
|
||||
notificationService,
|
||||
settingsService,
|
||||
stringService,
|
||||
logger,
|
||||
Queue,
|
||||
Worker,
|
||||
redisService,
|
||||
}) {
|
||||
this.connection = redisService.getConnection();
|
||||
this.queues = {};
|
||||
this.workers = {};
|
||||
this.lastJobProcessedTime = {};
|
||||
|
||||
this.db = db;
|
||||
this.networkService = networkService;
|
||||
this.statusService = statusService;
|
||||
this.notificationService = notificationService;
|
||||
this.settingsService = settingsService;
|
||||
this.logger = logger;
|
||||
this.Worker = Worker;
|
||||
this.stringService = stringService;
|
||||
|
||||
QUEUE_NAMES.forEach((name) => {
|
||||
const q = new Queue(name, { connection: this.connection });
|
||||
this.lastJobProcessedTime[q.name] = Date.now();
|
||||
q.on("error", (error) => {
|
||||
this.logger.error({
|
||||
message: error.message,
|
||||
service: SERVICE_NAME,
|
||||
method: "queue:error",
|
||||
stack: error.stack,
|
||||
});
|
||||
});
|
||||
this.queues[name] = q;
|
||||
|
||||
this.workers[name] = [];
|
||||
});
|
||||
|
||||
this.healthCheckInterval = setInterval(async () => {
|
||||
try {
|
||||
const health = await this.checkQueueHealth();
|
||||
if (health.stuck === true) {
|
||||
this.logger.error({
|
||||
message: `Queue is stuck: ${health.stuckQueues.join(", ")}`,
|
||||
service: SERVICE_NAME,
|
||||
method: "healthCheckInterval",
|
||||
});
|
||||
await this.flushQueue();
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error({
|
||||
message: error.message,
|
||||
service: SERVICE_NAME,
|
||||
method: "periodicHealthCheck",
|
||||
stack: error.stack,
|
||||
});
|
||||
}
|
||||
}, HEALTH_CHECK_INTERVAL);
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes job queues by adding jobs for all active monitors
|
||||
* @async
|
||||
* @function initJobQueue
|
||||
* @description Retrieves all monitors from the database and adds jobs for active ones to their respective queues
|
||||
* @throws {Error} If there's an error retrieving monitors or adding jobs
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
async initJobQueue() {
|
||||
await this.connection.flushall();
|
||||
const monitors = await this.db.getAllMonitors();
|
||||
await Promise.all(
|
||||
monitors
|
||||
.filter((monitor) => monitor.isActive)
|
||||
.map(async (monitor) => {
|
||||
try {
|
||||
await this.addJob(monitor._id, monitor);
|
||||
} catch (error) {
|
||||
this.logger.error({
|
||||
message: `Failed to add job for monitor ${monitor._id}:`,
|
||||
service: SERVICE_NAME,
|
||||
method: "initJobQueue",
|
||||
stack: error.stack,
|
||||
});
|
||||
}
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if a monitor is currently in a maintenance window
|
||||
* @async
|
||||
* @param {string} monitorId - The ID of the monitor to check
|
||||
* @returns {Promise<boolean>} Returns true if the monitor is in an active maintenance window, false otherwise
|
||||
* @throws {Error} If there's an error retrieving maintenance windows from the database
|
||||
* @description
|
||||
* Retrieves all maintenance windows for a monitor and checks if any are currently active.
|
||||
* A maintenance window is considered active if:
|
||||
* 1. The window is marked as active AND
|
||||
* 2. Either:
|
||||
* - Current time falls between start and end times
|
||||
* - For repeating windows: Current time falls between any repeated interval
|
||||
*/
|
||||
async isInMaintenanceWindow(monitorId) {
|
||||
const maintenanceWindows = await this.db.getMaintenanceWindowsByMonitorId(monitorId);
|
||||
// Check for active maintenance window:
|
||||
const maintenanceWindowIsActive = maintenanceWindows.reduce((acc, window) => {
|
||||
if (window.active) {
|
||||
const start = new Date(window.start);
|
||||
const end = new Date(window.end);
|
||||
const now = new Date();
|
||||
const repeatInterval = window.repeat || 0;
|
||||
|
||||
// If start is < now and end > now, we're in maintenance
|
||||
if (start <= now && end >= now) return true;
|
||||
|
||||
// If maintenance window was set in the past with a repeat,
|
||||
// we need to advance start and end to see if we are in range
|
||||
|
||||
while (start < now && repeatInterval !== 0) {
|
||||
start.setTime(start.getTime() + repeatInterval);
|
||||
end.setTime(end.getTime() + repeatInterval);
|
||||
if (start <= now && end >= now) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
return acc;
|
||||
}, false);
|
||||
return maintenanceWindowIsActive;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a job processing handler for monitor checks
|
||||
* @function createJobHandler
|
||||
* @returns {Function} An async function that processes monitor check jobs
|
||||
* @description
|
||||
* Creates and returns a job handler that:
|
||||
* 1. Checks if monitor is in maintenance window
|
||||
* 2. If not in maintenance, performs network status check
|
||||
* 3. Updates monitor status in database
|
||||
* 4. Triggers notifications if status changed
|
||||
*
|
||||
* @param {Object} job - The job to process
|
||||
* @param {Object} job.data - The monitor data
|
||||
* @param {string} job.data._id - Monitor ID
|
||||
* @param {string} job.id - Job ID
|
||||
*
|
||||
* @throws {Error} Logs errors but doesn't throw them to prevent job failure
|
||||
* @returns {Promise<void>} Resolves when job processing is complete
|
||||
*/
|
||||
createJobHandler() {
|
||||
return async (job) => {
|
||||
try {
|
||||
// Update the last job processed time for this queue
|
||||
this.lastJobProcessedTime[job.queue.name] = Date.now();
|
||||
// Get all maintenance windows for this monitor
|
||||
await job.updateProgress(0);
|
||||
const monitorId = job.data._id;
|
||||
const maintenanceWindowActive = await this.isInMaintenanceWindow(monitorId);
|
||||
// If a maintenance window is active, we're done
|
||||
|
||||
if (maintenanceWindowActive) {
|
||||
await job.updateProgress(100);
|
||||
this.logger.info({
|
||||
message: `Monitor ${monitorId} is in maintenance window`,
|
||||
service: SERVICE_NAME,
|
||||
method: "createWorker",
|
||||
});
|
||||
return false;
|
||||
}
|
||||
|
||||
// Get the current status
|
||||
await job.updateProgress(30);
|
||||
const networkResponse = await this.networkService.getStatus(job);
|
||||
if (
|
||||
job.data.type === "distributed_http" ||
|
||||
job.data.type === "distributed_test"
|
||||
) {
|
||||
return;
|
||||
}
|
||||
// Handle status change
|
||||
await job.updateProgress(60);
|
||||
const { monitor, statusChanged, prevStatus } =
|
||||
await this.statusService.updateStatus(networkResponse);
|
||||
// Handle notifications
|
||||
await job.updateProgress(80);
|
||||
this.notificationService
|
||||
.handleNotifications({
|
||||
...networkResponse,
|
||||
monitor,
|
||||
prevStatus,
|
||||
statusChanged,
|
||||
})
|
||||
.catch((error) => {
|
||||
this.logger.error({
|
||||
message: error.message,
|
||||
service: SERVICE_NAME,
|
||||
method: "createJobHandler",
|
||||
details: `Error sending notifications for job ${job.id}: ${error.message}`,
|
||||
stack: error.stack,
|
||||
});
|
||||
});
|
||||
await job.updateProgress(100);
|
||||
return true;
|
||||
} catch (error) {
|
||||
this.logger.error({
|
||||
message: error.message,
|
||||
service: error.service ?? SERVICE_NAME,
|
||||
method: error.method ?? "createJobHandler",
|
||||
details: `Error processing job ${job.id}: ${error.message}`,
|
||||
stack: error.stack,
|
||||
});
|
||||
throw error;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new worker for processing jobs in a queue
|
||||
* @param {Queue} queue - The BullMQ queue to create a worker for
|
||||
* @returns {Worker} A new BullMQ worker instance
|
||||
* @description
|
||||
* Creates and configures a new worker with:
|
||||
* - Queue-specific job handler
|
||||
* - Redis connection settings
|
||||
* - Default worker options
|
||||
* The worker processes jobs from the specified queue using the job handler
|
||||
* created by createJobHandler()
|
||||
*
|
||||
* @throws {Error} If worker creation fails or connection is invalid
|
||||
*/
|
||||
createWorker(queue) {
|
||||
try {
|
||||
const worker = new this.Worker(queue.name, this.createJobHandler(), {
|
||||
connection: this.connection,
|
||||
concurrency: 5,
|
||||
stalledInterval: 10000,
|
||||
maxStalledCount: 1,
|
||||
lockDuration: 60000,
|
||||
});
|
||||
|
||||
worker.on("failed", (job, err) => {
|
||||
this.logger.error({
|
||||
message: `Job ${job.id} failed: ${err.message}`,
|
||||
service: SERVICE_NAME,
|
||||
method: "worker:failed",
|
||||
stack: err.stack,
|
||||
jobData: job.data,
|
||||
});
|
||||
});
|
||||
worker.on("error", (job, err) => {
|
||||
this.logger.error({
|
||||
message: `Job ${job.id} error: ${err.message}`,
|
||||
service: SERVICE_NAME,
|
||||
method: "worker:error",
|
||||
stack: err.stack,
|
||||
jobData: job.data,
|
||||
});
|
||||
});
|
||||
|
||||
worker.on("stalled", (jobId) => {
|
||||
this.logger.warn({
|
||||
message: `Job ${jobId} stalled`,
|
||||
service: SERVICE_NAME,
|
||||
method: "worker:stalled",
|
||||
});
|
||||
});
|
||||
|
||||
return worker;
|
||||
} catch (error) {
|
||||
this.logger.error({
|
||||
message: error.message,
|
||||
service: SERVICE_NAME,
|
||||
method: "createWorker",
|
||||
stack: error.stack,
|
||||
});
|
||||
error.service = SERVICE_NAME;
|
||||
error.method = "createWorker";
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets stats related to the workers
|
||||
* This is used for scaling workers right now
|
||||
* In the future we will likely want to scale based on server performance metrics
|
||||
* CPU Usage & memory usage, if too high, scale down workers.
|
||||
* When to scale up? If jobs are taking too long to complete?
|
||||
* @async
|
||||
* @returns {Promise<WorkerStats>} - Returns the worker stats
|
||||
*/
|
||||
async getWorkerStats(queue) {
|
||||
try {
|
||||
const jobs = await queue.getRepeatableJobs();
|
||||
const load = jobs.length / this.workers[queue.name].length;
|
||||
return { jobs, load };
|
||||
} catch (error) {
|
||||
error.service === undefined ? (error.service = SERVICE_NAME) : null;
|
||||
error.method === undefined ? (error.method = "getWorkerStats") : null;
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Scales workers up or down based on queue load
|
||||
* @async
|
||||
* @param {Object} workerStats - Statistics about current worker load
|
||||
* @param {number} workerStats.load - Current load per worker
|
||||
* @param {Array} workerStats.jobs - Array of current jobs
|
||||
* @param {Queue} queue - The BullMQ queue to scale workers for
|
||||
* @returns {Promise<boolean>} True if scaling occurred, false if no scaling was needed
|
||||
* @throws {Error} If no workers array exists for the queue
|
||||
* @description
|
||||
* Scales workers based on these rules:
|
||||
* - Maintains minimum of 5 workers
|
||||
* - Adds workers if load exceeds JOBS_PER_WORKER
|
||||
* - Removes workers if load is below JOBS_PER_WORKER
|
||||
* - Creates initial workers if none exist
|
||||
* Worker scaling is calculated based on excess jobs or excess capacity
|
||||
*/
|
||||
async scaleWorkers(workerStats, queue) {
|
||||
const workers = this.workers[queue.name];
|
||||
if (workers === undefined) {
|
||||
throw new Error(`No workers found for ${queue.name}`);
|
||||
}
|
||||
|
||||
if (workers.length === 0) {
|
||||
// There are no workers, need to add one
|
||||
for (let i = 0; i < 5; i++) {
|
||||
const worker = this.createWorker(queue);
|
||||
workers.push(worker);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
if (workerStats.load > JOBS_PER_WORKER) {
|
||||
// Find out how many more jobs we have than current workers can handle
|
||||
const excessJobs = workerStats.jobs.length - workers.length * JOBS_PER_WORKER;
|
||||
// Divide by jobs/worker to find out how many workers to add
|
||||
const workersToAdd = Math.ceil(excessJobs / JOBS_PER_WORKER);
|
||||
for (let i = 0; i < workersToAdd; i++) {
|
||||
const worker = this.createWorker(queue);
|
||||
workers.push(worker);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
if (workerStats.load < JOBS_PER_WORKER) {
|
||||
// Find out how much excess capacity we have
|
||||
const workerCapacity = workers.length * JOBS_PER_WORKER;
|
||||
const excessCapacity = workerCapacity - workerStats.jobs.length;
|
||||
// Calculate how many workers to remove
|
||||
let workersToRemove = Math.floor(excessCapacity / JOBS_PER_WORKER); // Make sure there are always at least 5
|
||||
while (workersToRemove > 0 && workers.length > 5) {
|
||||
const worker = workers.pop();
|
||||
workersToRemove--;
|
||||
await worker.close().catch((error) => {
|
||||
// Catch the error instead of throwing it
|
||||
this.logger.error({
|
||||
message: error.message,
|
||||
service: SERVICE_NAME,
|
||||
method: "scaleWorkers",
|
||||
stack: error.stack,
|
||||
});
|
||||
});
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets all jobs in the queue.
|
||||
*
|
||||
* @async
|
||||
* @returns {Promise<Array<Job>>}
|
||||
* @throws {Error} - Throws error if getting jobs fails
|
||||
*/
|
||||
async getJobs(queue) {
|
||||
try {
|
||||
const jobs = await queue.getRepeatableJobs();
|
||||
return jobs;
|
||||
} catch (error) {
|
||||
error.service === undefined ? (error.service = SERVICE_NAME) : null;
|
||||
error.method === undefined ? (error.method = "getJobs") : null;
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves detailed statistics about jobs and workers for all queues
|
||||
* @async
|
||||
* @returns {Promise<Object>} Queue statistics object
|
||||
* @throws {Error} If there's an error retrieving job information
|
||||
* @description
|
||||
* Returns an object with statistics for each queue including:
|
||||
* - List of jobs with their URLs and current states
|
||||
* - Number of workers assigned to the queue
|
||||
*/
|
||||
async getJobStats() {
|
||||
try {
|
||||
let stats = {};
|
||||
await Promise.all(
|
||||
QUEUE_NAMES.map(async (name) => {
|
||||
const queue = this.queues[name];
|
||||
const jobs = await queue.getJobs();
|
||||
const ret = await Promise.all(
|
||||
jobs.map(async (job) => {
|
||||
const state = await job.getState();
|
||||
return { url: job.data.url, state, progress: job.progress };
|
||||
})
|
||||
);
|
||||
stats[name] = { jobs: ret, workers: this.workers[name].length };
|
||||
})
|
||||
);
|
||||
return stats;
|
||||
} catch (error) {
|
||||
error.service === undefined ? (error.service = SERVICE_NAME) : null;
|
||||
error.method === undefined ? (error.method = "getJobStats") : null;
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds both immediate and repeatable jobs to the appropriate queue
|
||||
* @async
|
||||
* @param {string} jobName - Name identifier for the job
|
||||
* @param {Object} monitor - Job data and configuration
|
||||
* @param {string} monitor.type - Type of monitor/queue ('uptime', 'pagespeed', 'hardware')
|
||||
* @param {string} [monitor.url] - URL to monitor (optional)
|
||||
* @param {number} [monitor.interval=60000] - Repeat interval in milliseconds
|
||||
* @param {string} monitor._id - Monitor ID
|
||||
* @throws {Error} If queue not found for payload type
|
||||
* @throws {Error} If job addition fails
|
||||
* @description
|
||||
* 1. Identifies correct queue based on payload type
|
||||
* 2. Adds immediate job execution
|
||||
* 3. Adds repeatable job with specified interval
|
||||
* 4. Scales workers based on updated queue load
|
||||
* Jobs are configured with exponential backoff, single attempt,
|
||||
* and automatic removal on completion
|
||||
*/
|
||||
async addJob(jobName, monitor) {
|
||||
try {
|
||||
this.logger.info({
|
||||
message: `Adding job ${monitor?.url ?? "No URL"}`,
|
||||
service: SERVICE_NAME,
|
||||
method: "addJob",
|
||||
});
|
||||
|
||||
// Find the correct queue
|
||||
|
||||
const queue = this.queues[QUEUE_LOOKUP[monitor.type]];
|
||||
if (queue === undefined) {
|
||||
throw new Error(`Queue for ${monitor.type} not found`);
|
||||
}
|
||||
|
||||
const jobTemplate = {
|
||||
name: jobName,
|
||||
data: monitor,
|
||||
opts: {
|
||||
attempts: 1,
|
||||
backoff: {
|
||||
type: "exponential",
|
||||
delay: 1000,
|
||||
},
|
||||
removeOnComplete: true,
|
||||
removeOnFail: false,
|
||||
timeout: 1 * 60 * 1000,
|
||||
},
|
||||
};
|
||||
|
||||
const schedulerId = getSchedulerId(monitor);
|
||||
await queue.upsertJobScheduler(
|
||||
schedulerId,
|
||||
{ every: monitor?.interval ?? 60000 },
|
||||
jobTemplate
|
||||
);
|
||||
|
||||
const workerStats = await this.getWorkerStats(queue);
|
||||
await this.scaleWorkers(workerStats, queue);
|
||||
} catch (error) {
|
||||
error.service === undefined ? (error.service = SERVICE_NAME) : null;
|
||||
error.method === undefined ? (error.method = "addJob") : null;
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes a repeatable job from its queue and adjusts worker scaling
|
||||
* @async
|
||||
* @param {Object} monitor - Monitor object containing job details
|
||||
* @param {string} monitor._id - ID of the monitor/job to delete
|
||||
* @param {string} monitor.type - Type of monitor determining queue selection
|
||||
* @param {number} monitor.interval - Job repeat interval in milliseconds
|
||||
* @throws {Error} If queue not found for monitor type
|
||||
* @throws {Error} If job deletion fails
|
||||
* @description
|
||||
* 1. Identifies correct queue based on monitor type
|
||||
* 2. Removes repeatable job using monitor ID and interval
|
||||
* 3. Logs success or failure of deletion
|
||||
* 4. Updates worker scaling based on new queue load
|
||||
* Returns void but logs operation result
|
||||
*/
|
||||
async deleteJob(monitor) {
|
||||
try {
|
||||
const queue = this.queues[QUEUE_LOOKUP[monitor.type]];
|
||||
const schedulerId = getSchedulerId(monitor);
|
||||
const wasDeleted = await queue.removeJobScheduler(schedulerId);
|
||||
|
||||
if (wasDeleted === true) {
|
||||
this.logger.info({
|
||||
message: this.stringService.jobQueueDeleteJob,
|
||||
service: SERVICE_NAME,
|
||||
method: "deleteJob",
|
||||
details: `Deleted job ${monitor._id}`,
|
||||
});
|
||||
const workerStats = await this.getWorkerStats(queue);
|
||||
await this.scaleWorkers(workerStats, queue);
|
||||
} else {
|
||||
this.logger.error({
|
||||
message: this.stringService.jobQueueDeleteJob,
|
||||
service: SERVICE_NAME,
|
||||
method: "deleteJob",
|
||||
details: `Failed to delete job ${monitor._id}`,
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
error.service === undefined ? (error.service = SERVICE_NAME) : null;
|
||||
error.method === undefined ? (error.method = "deleteJob") : null;
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves comprehensive metrics for all queues
|
||||
* @async
|
||||
* @returns {Promise<Object.<string, QueueMetrics>>} Object with metrics for each queue
|
||||
* @throws {Error} If metrics retrieval fails
|
||||
* @description
|
||||
* Collects the following metrics for each queue:
|
||||
* - Number of waiting jobs
|
||||
* - Number of active jobs
|
||||
* - Number of completed jobs
|
||||
* - Number of failed jobs
|
||||
* - Number of delayed jobs
|
||||
* - Number of repeatable jobs
|
||||
* - Number of active workers
|
||||
*
|
||||
* @typedef {Object} QueueMetrics
|
||||
* @property {number} waiting - Count of jobs waiting to be processed
|
||||
* @property {number} active - Count of jobs currently being processed
|
||||
* @property {number} completed - Count of successfully completed jobs
|
||||
* @property {number} failed - Count of failed jobs
|
||||
* @property {number} delayed - Count of delayed jobs
|
||||
* @property {number} repeatableJobs - Count of repeatable job patterns
|
||||
* @property {number} workers - Count of active workers for this queue
|
||||
*/
|
||||
async getMetrics() {
|
||||
try {
|
||||
let metrics = {};
|
||||
|
||||
await Promise.all(
|
||||
QUEUE_NAMES.map(async (name) => {
|
||||
const queue = this.queues[name];
|
||||
const workers = this.workers[name];
|
||||
const [waiting, active, completed, failed, delayed, repeatableJobs] =
|
||||
await Promise.all([
|
||||
queue.getWaitingCount(),
|
||||
queue.getActiveCount(),
|
||||
queue.getCompletedCount(),
|
||||
queue.getFailedCount(),
|
||||
queue.getDelayedCount(),
|
||||
queue.getRepeatableJobs(),
|
||||
]);
|
||||
|
||||
metrics[name] = {
|
||||
waiting,
|
||||
active,
|
||||
completed,
|
||||
failed,
|
||||
delayed,
|
||||
repeatableJobs: repeatableJobs.length,
|
||||
workers: workers.length,
|
||||
};
|
||||
})
|
||||
);
|
||||
|
||||
return metrics;
|
||||
} catch (error) {
|
||||
this.logger.error({
|
||||
message: error.message,
|
||||
service: SERVICE_NAME,
|
||||
method: "getMetrics",
|
||||
stack: error.stack,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @async
|
||||
* @returns {Promise<boolean>} - Returns true if obliteration is successful
|
||||
*/
|
||||
|
||||
async obliterate() {
|
||||
try {
|
||||
this.logger.info({
|
||||
message: "Attempting to obliterate job queue...",
|
||||
service: SERVICE_NAME,
|
||||
method: "obliterate",
|
||||
});
|
||||
|
||||
if (this.healthCheckInterval) {
|
||||
clearInterval(this.healthCheckInterval);
|
||||
this.healthCheckInterval = null;
|
||||
}
|
||||
|
||||
await Promise.all(
|
||||
QUEUE_NAMES.map(async (name) => {
|
||||
const queue = this.queues[name];
|
||||
await queue.pause();
|
||||
const jobs = await this.getJobs(queue);
|
||||
|
||||
// Remove all repeatable jobs
|
||||
for (const job of jobs) {
|
||||
await queue.removeRepeatableByKey(job.key);
|
||||
await queue.remove(job.id);
|
||||
}
|
||||
})
|
||||
);
|
||||
|
||||
// Close workers
|
||||
await Promise.all(
|
||||
QUEUE_NAMES.map(async (name) => {
|
||||
const workers = this.workers[name];
|
||||
await Promise.all(
|
||||
workers.map(async (worker) => {
|
||||
await worker.close();
|
||||
})
|
||||
);
|
||||
})
|
||||
);
|
||||
|
||||
QUEUE_NAMES.forEach(async (name) => {
|
||||
const queue = this.queues[name];
|
||||
await queue.obliterate();
|
||||
});
|
||||
|
||||
const metrics = await this.getMetrics();
|
||||
this.logger.info({
|
||||
message: this.stringService.jobQueueObliterate,
|
||||
service: SERVICE_NAME,
|
||||
method: "obliterate",
|
||||
details: metrics,
|
||||
});
|
||||
return true;
|
||||
} catch (error) {
|
||||
error.service === undefined ? (error.service = SERVICE_NAME) : null;
|
||||
error.method === undefined ? (error.method = "obliterate") : null;
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
// **************************
|
||||
// Queue Health Checks
|
||||
// **************************
|
||||
async getKeyValuePairs() {
|
||||
try {
|
||||
// Get all keys
|
||||
const keys = await this.connection.keys("*");
|
||||
|
||||
if (keys.length === 0) {
|
||||
return {}; // Return an empty object if no keys are found
|
||||
}
|
||||
|
||||
// Get values for all keys
|
||||
const values = await this.connection.mget(keys);
|
||||
|
||||
// Combine keys and values into an object
|
||||
const keyValuePairs = keys.reduce((result, key, index) => {
|
||||
result[key] = values[index];
|
||||
return result;
|
||||
}, {});
|
||||
this.logger.info({
|
||||
message: "Redis key-value",
|
||||
service: SERVICE_NAME,
|
||||
method: "flushQueue",
|
||||
details: keyValuePairs,
|
||||
});
|
||||
return keyValuePairs;
|
||||
} catch (error) {
|
||||
error.service === undefined ? (error.service = SERVICE_NAME) : null;
|
||||
error.method === undefined ? (error.method = "getKeyValuePairs") : null;
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async flushQueue() {
|
||||
try {
|
||||
const keyValuePairs = await this.getKeyValuePairs();
|
||||
this.logger.info({
|
||||
message: "Before flush",
|
||||
service: SERVICE_NAME,
|
||||
method: "flushQueue",
|
||||
details: keyValuePairs,
|
||||
});
|
||||
const flushResult = await this.connection.flushall();
|
||||
const keyValuePairsAfter = await this.getKeyValuePairs();
|
||||
this.logger.info({
|
||||
message: "After flush",
|
||||
service: SERVICE_NAME,
|
||||
method: "flushQueue",
|
||||
details: keyValuePairsAfter,
|
||||
});
|
||||
if (flushResult !== "OK") {
|
||||
throw new Error("Failed to flush queue");
|
||||
}
|
||||
await this.initJobQueue();
|
||||
return {
|
||||
keyValuePairs,
|
||||
flush: flushResult,
|
||||
keyValuePairsAfter,
|
||||
init: true,
|
||||
};
|
||||
} catch (error) {
|
||||
error.service === undefined ? (error.service = SERVICE_NAME) : null;
|
||||
error.method === undefined ? (error.method = "flushQueue") : null;
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets metrics for a specific queue
|
||||
* @async
|
||||
* @function getQueueHealthMetrics
|
||||
* @param {Queue} queue - The queue to get metrics for
|
||||
* @returns {Promise<Object>} Queue metrics
|
||||
*/
|
||||
async getQueueHealthMetrics(queue) {
|
||||
return await queue.getJobCounts();
|
||||
}
|
||||
|
||||
getQueueIdleTimes() {
|
||||
const now = Date.now();
|
||||
const idleTimes = {};
|
||||
Object.entries(this.lastJobProcessedTime).forEach(([queueName, lastProcessed]) => {
|
||||
idleTimes[queueName] = now - lastProcessed;
|
||||
});
|
||||
return idleTimes;
|
||||
}
|
||||
async checkQueueHealth() {
|
||||
try {
|
||||
const currentTime = Date.now();
|
||||
const stuckQueues = [];
|
||||
const idleTimes = this.getQueueIdleTimes();
|
||||
for (const queueName of QUEUE_NAMES) {
|
||||
const queue = this.queues[queueName];
|
||||
|
||||
const jobCounts = await this.getQueueHealthMetrics(queue);
|
||||
const hasJobs = Object.values(jobCounts).some((count) => count > 0);
|
||||
|
||||
const timeSinceLastProcessed = currentTime - this.lastJobProcessedTime[queueName];
|
||||
const isStuck = hasJobs && timeSinceLastProcessed > HEALTH_CHECK_INTERVAL;
|
||||
|
||||
if (isStuck) {
|
||||
stuckQueues.push(queueName);
|
||||
}
|
||||
}
|
||||
|
||||
const queueHealth = { stuck: false, stuckQueues, idleTimes };
|
||||
|
||||
if (stuckQueues.length > 0) {
|
||||
queueHealth.stuck = true;
|
||||
}
|
||||
|
||||
this.logger.info({
|
||||
message: "Queue health check",
|
||||
service: SERVICE_NAME,
|
||||
method: "checkQueueHealth",
|
||||
details: queueHealth,
|
||||
});
|
||||
return queueHealth;
|
||||
} catch (error) {
|
||||
error.service === undefined ? (error.service = SERVICE_NAME) : null;
|
||||
error.method === undefined ? (error.method = "checkQueueHealth") : null;
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export default NewJobQueue;
|
||||
@@ -258,8 +258,8 @@ class NetworkService {
|
||||
if (dbSettings?.pagespeedApiKey) {
|
||||
pagespeedUrl += `&key=${dbSettings.pagespeedApiKey}`;
|
||||
} else {
|
||||
this.logger.info({
|
||||
message: "Pagespeed API key not found",
|
||||
this.logger.warn({
|
||||
message: "Pagespeed API key not found, job not executed",
|
||||
service: this.SERVICE_NAME,
|
||||
method: "requestPagespeed",
|
||||
details: { url },
|
||||
|
||||
@@ -1,71 +1,58 @@
|
||||
const SERVICE_NAME = "RedisService";
|
||||
|
||||
class RedisService {
|
||||
static SERVICE_NAME = "RedisService";
|
||||
|
||||
constructor({ logger, IORedis, SettingsService }) {
|
||||
static SERVICE_NAME = SERVICE_NAME;
|
||||
constructor({ Redis, logger }) {
|
||||
this.Redis = Redis;
|
||||
this.connections = new Set();
|
||||
this.logger = logger;
|
||||
this.IORedis = IORedis;
|
||||
this.SettingsService = SettingsService;
|
||||
this.connection = null;
|
||||
}
|
||||
|
||||
static async createInstance({ logger, IORedis, SettingsService }) {
|
||||
const instance = new RedisService({ logger, IORedis, SettingsService });
|
||||
await instance.connect();
|
||||
return instance;
|
||||
}
|
||||
|
||||
async connect() {
|
||||
const settings = this.SettingsService.getSettings();
|
||||
const { redisUrl } = settings;
|
||||
this.connection = new this.IORedis(redisUrl, {
|
||||
maxRetriesPerRequest: null,
|
||||
getNewConnection(options = {}) {
|
||||
const connection = new this.Redis(process.env.REDIS_URL, {
|
||||
retryStrategy: (times) => {
|
||||
if (times >= 5) {
|
||||
throw new Error("Failed to connect to Redis");
|
||||
}
|
||||
this.logger.debug({
|
||||
message: "Retrying Redis connection",
|
||||
service: RedisService.SERVICE_NAME,
|
||||
details: { times },
|
||||
});
|
||||
return Math.min(times * 100, 2000);
|
||||
return null;
|
||||
},
|
||||
...options,
|
||||
});
|
||||
this.connections.add(connection);
|
||||
return connection;
|
||||
}
|
||||
|
||||
await new Promise((resolve, reject) => {
|
||||
let errorOccurred = false;
|
||||
|
||||
this.connection.on("ready", () => {
|
||||
if (!errorOccurred) {
|
||||
this.logger.info({
|
||||
message: "Redis connection established",
|
||||
service: RedisService.SERVICE_NAME,
|
||||
});
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
|
||||
this.connection.on("error", (err) => {
|
||||
errorOccurred = true;
|
||||
async closeAllConnections() {
|
||||
const closePromises = Array.from(this.connections).map((conn) =>
|
||||
conn.quit().catch((err) => {
|
||||
this.logger.error({
|
||||
message: "Redis connection error",
|
||||
service: RedisService.SERVICE_NAME,
|
||||
error: err,
|
||||
message: "Error closing Redis connection",
|
||||
service: SERVICE_NAME,
|
||||
method: "closeAllConnections",
|
||||
details: { error: err },
|
||||
});
|
||||
setTimeout(() => reject(err), 5000);
|
||||
});
|
||||
})
|
||||
);
|
||||
|
||||
await Promise.all(closePromises);
|
||||
this.connections.clear();
|
||||
this.logger.info({
|
||||
message: "All Redis connections closed",
|
||||
service: SERVICE_NAME,
|
||||
method: "closeAllConnections",
|
||||
});
|
||||
}
|
||||
async flushall() {
|
||||
this.logger.debug({
|
||||
message: "Flushing all Redis data",
|
||||
service: RedisService.SERVICE_NAME,
|
||||
});
|
||||
await this.connection.flushall();
|
||||
}
|
||||
|
||||
getConnection() {
|
||||
return this.connection;
|
||||
async flushRedis() {
|
||||
this.logger.info({
|
||||
message: "Flushing Redis",
|
||||
service: SERVICE_NAME,
|
||||
method: "flushRedis",
|
||||
});
|
||||
const flushPromises = Array.from(this.connections).map((conn) => conn.flushall());
|
||||
await Promise.all(flushPromises);
|
||||
this.logger.info({
|
||||
message: "Redis flushed",
|
||||
service: SERVICE_NAME,
|
||||
method: "flushRedis",
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user