Added metrics endpoint to queue, improved cleanup, improved logging

This commit is contained in:
Alex Holliday
2024-08-15 14:13:12 -07:00
parent 29b790a8e2
commit ea00cf07a1
4 changed files with 53 additions and 10 deletions

View File

@@ -1,7 +1,18 @@
const express = require("express");
const logger = require("../utils/logger");
const SERVICE_NAME = "JobQueue";
const getMetrics = async (req, res, next) => {
try {
const metrics = await req.jobQueue.getMetrics();
res
.status(200)
.json({ success: true, msg: "Metrics retrieved", data: metrics });
} catch (error) {
error.service = SERVICE_NAME;
next(error);
return;
}
};
const getJobs = async (req, res, next) => {
try {
const jobs = await req.jobQueue.getJobStats();
@@ -36,6 +47,7 @@ const obliterateQueue = async (req, res, next) => {
};
module.exports = {
getMetrics,
getJobs,
addJob,
obliterateQueue,

View File

@@ -5,7 +5,7 @@ require("dotenv").config();
const logger = require("./utils/logger");
const { verifyJWT } = require("./middleware/verifyJWT");
const { handleErrors } = require("./middleware/handleErrors");
const { errorMessages } = require("./utils/messages");
const authRouter = require("./routes/authRoute");
const monitorRouter = require("./routes/monitorRoute");
const checkRouter = require("./routes/checkRoute");
@@ -126,12 +126,19 @@ const startApp = async () => {
return;
}
cleaningUp = true;
console.log("Shutting down gracefully");
await jobQueue.obliterate();
console.log("Finished cleanup");
try {
console.log("Shutting down gracefully");
await jobQueue.obliterate();
console.log("Finished cleanup");
} catch (error) {
logger.error(errorMessages.JOB_QUEUE_DELETE_JOB, {
service: SERVICE_NAME,
errorMsg: error.message,
});
}
process.exit(0);
};
process.on("SIGUSR2", cleanup);
process.on("SIGINT", cleanup);
process.on("SIGTERM", cleanup);
};

View File

@@ -1,6 +1,8 @@
const router = require("express").Router();
const queueController = require("../controllers/queueController");
router.get("/metrics", queueController.getMetrics);
// Get Jobs
router.get("/", queueController.getJobs);

View File

@@ -280,13 +280,36 @@ class JobQueue {
}
}
async getMetrics() {
try {
const metrics = {
waiting: await this.queue.getWaitingCount(),
active: await this.queue.getActiveCount(),
completed: await this.queue.getCompletedCount(),
failed: await this.queue.getFailedCount(),
delayed: await this.queue.getDelayedCount(),
repeatableJobs: (await this.queue.getRepeatableJobs()).length,
};
return metrics;
} catch (error) {
logger.error("Failed to retrieve job queue metrics", {
service: SERVICE_NAME,
errorMsg: error.message,
});
}
}
/**
* @async
* @returns {Promise<boolean>} - Returns true if obliteration is successful
*/
async obliterate() {
try {
let metrics = await this.getMetrics();
console.log(metrics);
await this.queue.pause();
const jobs = await this.getJobs();
for (const job of jobs) {
await this.queue.removeRepeatableByKey(job.key);
await this.queue.remove(job.id);
@@ -298,17 +321,16 @@ class JobQueue {
);
await this.queue.obliterate();
metrics = await this.getMetrics();
console.log(metrics);
logger.info(successMessages.JOB_QUEUE_OBLITERATE, {
service: SERVICE_NAME,
});
return true;
} catch (error) {
logger.error(errorMessages.JOB_QUEUE_OBLITERATE);
throw error;
}
}
//TODO Cleanup Queue on shutdown
}
module.exports = JobQueue;