Added more logging, improved cleanup

This commit is contained in:
Alex Holliday
2024-08-06 13:52:48 -07:00
parent 9da7c00bd3
commit c11d5f4d5a
3 changed files with 52 additions and 15 deletions
+1 -1
View File
@@ -4,7 +4,7 @@ const SERVICE_NAME = "JobQueue";
const getJobs = async (req, res, next) => {
try {
const jobs = await req.jobQueue.getJobs();
const jobs = await req.jobQueue.getJobStats();
return res.status(200).json({ jobs });
} catch (error) {
error.service = SERVICE_NAME;
+7
View File
@@ -19,6 +19,8 @@ const JobQueue = require("./service/jobQueue");
const EmailService = require("./service/emailService");
const PageSpeedService = require("./service/pageSpeedService");
let cleaningUp = false;
// Need to wrap server setup in a function to handle async nature of JobQueue
const startApp = async () => {
// **************************
@@ -117,6 +119,11 @@ const startApp = async () => {
const pageSpeedService = new PageSpeedService();
const cleanup = async () => {
if (cleaningUp) {
console.log("Already cleaning up");
return;
}
cleaningUp = true;
console.log("Shutting down gracefully");
await jobQueue.obliterate();
console.log("Finished cleanup");
+44 -14
View File
@@ -124,6 +124,8 @@ class JobQueue {
const load = jobs.length / this.workers.length;
return { jobs, load };
} catch (error) {
console.log(error);
throw error;
}
}
@@ -143,8 +145,10 @@ class JobQueue {
async scaleWorkers(workerStats) {
if (this.workers.length === 0) {
// There are no workers, need to add one
const worker = this.createWorker();
this.workers.push(worker);
for (let i = 0; i < 5; i++) {
const worker = this.createWorker();
this.workers.push(worker);
}
return true;
}
@@ -168,15 +172,17 @@ class JobQueue {
const excessCapacity = workerCapacity - workerStats.jobs.length;
// Calculate how many workers to remove
const workersToRemove = Math.floor(excessCapacity / JOBS_PER_WORKER);
for (let i = 0; i < workersToRemove; i++) {
const worker = this.workers.pop();
try {
await worker.close();
} catch (error) {
// Catch the error instead of throwing it
logger.error(errorMessages.JOB_QUEUE_WORKER_CLOSE, {
service: SERVICE_NAME,
});
if (this.workers.length > 5) {
for (let i = 0; i < workersToRemove; i++) {
const worker = this.workers.pop();
try {
await worker.close();
} catch (error) {
// Catch the error instead of throwing it
logger.error(errorMessages.JOB_QUEUE_WORKER_CLOSE, {
service: SERVICE_NAME,
});
}
}
}
return true;
@@ -196,6 +202,25 @@ class JobQueue {
const jobs = await this.queue.getRepeatableJobs();
return jobs;
} catch (error) {
console.log(error);
throw error;
}
}
async getJobStats() {
try {
const jobs = await this.queue.getJobs();
const ret = await Promise.all(
jobs.map(async (job) => {
const state = await job.getState();
return { url: job.data.url, state };
})
);
return { jobs: ret, workers: this.workers.length };
} catch (error) {
console.log(error);
throw error;
}
}
@@ -210,6 +235,7 @@ class JobQueue {
*/
async addJob(jobName, payload) {
try {
console.log("Adding job", payload.url);
// Execute job immediately
await this.queue.add(jobName, payload);
@@ -221,6 +247,7 @@ class JobQueue {
const workerStats = await this.getWorkerStats();
await this.scaleWorkers(workerStats);
} catch (error) {
console.log(error);
throw error;
}
}
@@ -264,9 +291,12 @@ class JobQueue {
await this.queue.removeRepeatableByKey(job.key);
await this.queue.remove(job.id);
}
this.workers.forEach(async (worker) => {
await worker.close();
});
await Promise.all(
this.workers.map(async (worker) => {
await worker.close();
})
);
await this.queue.obliterate();
logger.info(successMessages.JOB_QUEUE_OBLITERATE, {
service: SERVICE_NAME,