Files
Checkmate/server/service/JobQueue/JobQueue.js
Alex Holliday bfa6832beb format
2025-07-21 10:34:16 -07:00

318 lines
7.5 KiB
JavaScript

const QUEUE_NAMES = ["uptime", "pagespeed", "hardware"];
const SERVICE_NAME = "JobQueue";
const HEALTH_CHECK_INTERVAL = 10 * 60 * 1000; // 10 minutes
const QUEUE_LOOKUP = {
hardware: "hardware",
http: "uptime",
ping: "uptime",
port: "uptime",
docker: "uptime",
pagespeed: "pagespeed",
};
const getSchedulerId = (monitor) => `scheduler:${monitor.type}:${monitor._id}`;
class JobQueue {
static SERVICE_NAME = SERVICE_NAME;
constructor({ db, jobQueueHelper, logger, stringService }) {
this.db = db;
this.jobQueueHelper = jobQueueHelper;
this.stringService = stringService;
this.logger = logger;
this.queues = {};
this.workers = [];
}
static async create({ db, jobQueueHelper, logger, stringService }) {
const instance = new JobQueue({ db, jobQueueHelper, logger, stringService });
await instance.init();
return instance;
}
async init() {
try {
await this.initQueues();
await this.initWorkers();
const monitors = await this.db.getAllMonitors();
await Promise.all(
monitors
.filter((monitor) => monitor.isActive)
.map(async (monitor) => {
try {
await this.addJob(monitor._id, monitor);
} catch (error) {
this.logger.error({
message: error.message,
service: SERVICE_NAME,
method: "initJobQueue",
stack: error.stack,
});
}
})
);
this.healthCheckInterval = setInterval(async () => {
try {
const queueHealthChecks = await this.checkQueueHealth();
const queueIsStuck = queueHealthChecks.some((healthCheck) => healthCheck.stuck);
if (queueIsStuck) {
this.logger.warn({
message: "Queue is stuck",
service: SERVICE_NAME,
method: "periodicHealthCheck",
details: queueHealthChecks,
});
await this.flushQueues();
}
} catch (error) {
this.logger.error({
message: error.message,
service: SERVICE_NAME,
method: "periodicHealthCheck",
stack: error.stack,
});
}
}, HEALTH_CHECK_INTERVAL);
} catch (error) {
this.logger.error({
message: error.message,
service: SERVICE_NAME,
method: "initJobQueue",
stack: error.stack,
});
}
}
async initQueues() {
const readyPromises = [];
for (const queueName of QUEUE_NAMES) {
const q = this.jobQueueHelper.createQueue(queueName);
this.queues[queueName] = q;
readyPromises.push(q.waitUntilReady());
}
await Promise.all(readyPromises);
this.logger.info({
message: "Queues ready",
service: SERVICE_NAME,
method: "initQueues",
});
}
async initWorkers() {
const workerReadyPromises = [];
for (const queueName of QUEUE_NAMES) {
const worker = this.jobQueueHelper.createWorker(queueName, this.queues[queueName]);
this.workers.push(worker);
workerReadyPromises.push(worker.waitUntilReady());
}
await Promise.all(workerReadyPromises);
this.logger.info({
message: "Workers ready",
service: SERVICE_NAME,
method: "initWorkers",
});
}
pauseJob = async (monitor) => {
this.deleteJob(monitor);
};
resumeJob = async (monitor) => {
this.addJob(monitor._id, monitor);
};
async addJob(jobName, monitor) {
this.logger.info({
message: `Adding job ${monitor?.url ?? "No URL"}`,
service: SERVICE_NAME,
method: "addJob",
});
const queueName = QUEUE_LOOKUP[monitor.type];
const queue = this.queues[queueName];
if (typeof queue === "undefined") {
throw new Error(`Queue for ${monitor.type} not found`);
}
const jobTemplate = {
name: jobName,
data: monitor,
opts: {
attempts: 1,
backoff: {
type: "exponential",
delay: 1000,
},
removeOnComplete: true,
removeOnFail: false,
timeout: 1 * 60 * 1000,
},
};
const schedulerId = getSchedulerId(monitor);
await queue.upsertJobScheduler(schedulerId, { every: monitor?.interval ?? 60000 }, jobTemplate);
}
async deleteJob(monitor) {
try {
const queue = this.queues[QUEUE_LOOKUP[monitor.type]];
const schedulerId = getSchedulerId(monitor);
const wasDeleted = await queue.removeJobScheduler(schedulerId);
if (wasDeleted === true) {
this.logger.info({
message: this.stringService.jobQueueDeleteJob,
service: SERVICE_NAME,
method: "deleteJob",
details: `Deleted job ${monitor._id}`,
});
return true;
} else {
this.logger.error({
message: this.stringService.jobQueueDeleteJob,
service: SERVICE_NAME,
method: "deleteJob",
details: `Failed to delete job ${monitor._id}`,
});
return false;
}
} catch (error) {
error.service === undefined ? (error.service = SERVICE_NAME) : null;
error.method === undefined ? (error.method = "deleteJob") : null;
throw error;
}
}
async updateJob(monitor) {
await this.deleteJob(monitor);
await this.addJob(monitor._id, monitor);
}
async getJobs() {
try {
let stats = {};
await Promise.all(
QUEUE_NAMES.map(async (name) => {
const queue = this.queues[name];
const jobs = await queue.getJobs();
const ret = await Promise.all(
jobs.map(async (job) => {
const state = await job.getState();
return { url: job.data.url, state, progress: job.progress };
})
);
stats[name] = { jobs: ret };
})
);
return stats;
} catch (error) {
error.service === undefined ? (error.service = SERVICE_NAME) : null;
error.method === undefined ? (error.method = "getJobStats") : null;
throw error;
}
}
async getMetrics() {
try {
let metrics = {};
await Promise.all(
QUEUE_NAMES.map(async (name) => {
const queue = this.queues[name];
const [waiting, active, failed, delayed, repeatableJobs] = await Promise.all([
queue.getWaitingCount(),
queue.getActiveCount(),
queue.getFailedCount(),
queue.getDelayedCount(),
queue.getRepeatableJobs(),
]);
metrics[name] = {
waiting,
active,
failed,
delayed,
repeatableJobs: repeatableJobs.length,
};
})
);
return metrics;
} catch (error) {
this.logger.error({
message: error.message,
service: SERVICE_NAME,
method: "getMetrics",
stack: error.stack,
});
}
}
async checkQueueHealth() {
const res = [];
for (const queueName of QUEUE_NAMES) {
const q = this.queues[queueName];
await q.waitUntilReady();
const lastJobProcessedTime = q.lastJobProcessedTime;
const currentTime = Date.now();
const timeDiff = currentTime - lastJobProcessedTime;
// Check for jobs
const jobCounts = await q.getJobCounts();
const hasJobs = Object.values(jobCounts).some((count) => count > 0);
res.push({
queueName,
timeSinceLastJob: timeDiff,
stuck: hasJobs && timeDiff > 10000,
jobCounts,
});
}
return res;
}
async flushQueues() {
try {
this.logger.warn({
message: "Flushing queues",
method: "flushQueues",
service: SERVICE_NAME,
});
for (const worker of this.workers) {
await worker.close();
}
this.workers = [];
for (const queue of Object.values(this.queues)) {
await queue.obliterate();
}
this.queue = {};
await this.init();
return true;
} catch (error) {
this.logger.warn({
message: `${error.message} - Flushing redis manually`,
service: SERVICE_NAME,
method: "flushQueues",
});
return await this.jobQueueHelper.flushRedis();
}
}
async shutdown() {
if (this.healthCheckInterval) {
clearInterval(this.healthCheckInterval);
this.healthCheckInterval = null;
}
for (const worker of this.workers) {
await worker.close();
}
for (const queue of Object.values(this.queues)) {
await queue.obliterate();
}
}
}
export default JobQueue;