Merge pull request #100 from bluewave-labs/feat/job-queue

Feat/job queue
This commit is contained in:
Veysel
2024-06-05 17:17:27 -04:00
committed by GitHub
7 changed files with 979 additions and 209 deletions

View File

@@ -76,6 +76,8 @@ Configure the server with the following environmental variables:
| PORT | Optional | `integer` | Specifies Port for Server | |
| SENDGRID_API_KEY | Required | `string` | Specifies API KEY for SendGrid email service | |
| SYSTEM_EMAIL_ADDRESS | Required | `string` | Specifies System email to be used in emailing service, must be a verified email by sendgrid | |
| REDIS_HOST | Required | `string` | Host address for Redis database | |
| REDIS_PORT | Required | `integer` | Port for Redis database | |
---

View File

@@ -0,0 +1,42 @@
const express = require("express");
const logger = require("../utils/logger");
const SERVICE_NAME = "JobQueue";
const getJobs = async (req, res, next) => {
try {
const jobs = await req.jobQueue.getJobs();
return res.status(200).json({ jobs });
} catch (error) {
error.service = SERVICE_NAME;
next(error);
return;
}
};
const addJob = async (req, res, next) => {
try {
await req.jobQueue.addJob(Math.random().toString(36).substring(7));
return res.send("Added job");
} catch (error) {
error.service = SERVICE_NAME;
next(error);
return;
}
};
const obliterateQueue = async (req, res, next) => {
try {
const obliterated = await req.jobQueue.obliterate();
return res.status(200).send("Obliterated jobs");
} catch (error) {
error.service = SERVICE_NAME;
next(error);
return;
}
};
module.exports = {
getJobs,
addJob,
obliterateQueue,
};

View File

@@ -10,72 +10,91 @@ require("dotenv").config();
const logger = require("./utils/logger");
const { verifyJWT } = require("./middleware/verifyJWT");
const { handleErrors } = require("./middleware/handleErrors");
const queueRouter = require("./routes/queueRoute");
const JobQueue = require("./service/jobQueue");
// **************************
// Here is where we can swap out DBs easily. Spin up a mongoDB instance and try it out.
// Simply comment out the FakeDB and uncomment the MongoDB or vice versa.
// We can easily swap between any type of data source as long as the methods are implemented
//
// FakeDB
// const db = require("./db/FakeDb");
//
// MongoDB
// const db = require("./db/MongoDB");
//
// **************************
// Need to wrap server setup in a function to handle async nature of JobQueue
const startApp = async () => {
// const { sendEmail } = require('./utils/sendEmail')
const DB_TYPE = {
MongoDB: () => require("./db/MongoDB"),
FakedDB: () => require("./db/FakeDb"),
// **************************
// Here is where we can swap out DBs easily. Spin up a mongoDB instance and try it out.
// Simply comment out the FakeDB and uncomment the MongoDB or vice versa.
// We can easily swap between any type of data source as long as the methods are implemented
//
// FakeDB
// const db = require("./db/FakeDb");
//
// MongoDB
// const db = require("./db/MongoDB");
//
// **************************
const DB_TYPE = {
MongoDB: () => require("./db/MongoDB"),
FakedDB: () => require("./db/FakeDb"),
};
const db = DB_TYPE[process.env.DB_TYPE]
? DB_TYPE[process.env.DB_TYPE]()
: require("./db/FakeDb");
const jobQueue = await JobQueue.createJobQueue();
/**
* NOTES
* Email Service will be added
* Logger Service will be added (Winston or similar)
*/
const app = express();
// middlewares
app.use(
cors()
//We will add configuration later
);
app.use(express.json());
app.use(helmet());
// **************************
// Make DB accessible anywhere we have a Request object
// By adding the DB to the request object, we can access it in any route
// Thus we do not need to import it in every route file, and we can easily swap out DBs as there is only one place to change it
// Same applies for JobQueue
// **************************
app.use((req, res, next) => {
req.db = db;
req.jobQueue = jobQueue;
next();
});
//routes
app.use("/api/v1/auth", authRouter);
app.use("/api/v1/monitors", monitorRouter);
app.use("/api/v1/checks", verifyJWT, checkRouter);
app.use("/api/v1/alerts", verifyJWT, alertRouter);
//Temporary route for testing, remove later
app.use("/api/v1/job", queueRouter);
//health check
app.use("/api/v1/healthy", (req, res) => {
try {
logger.info("Checking Health of the server.");
return res.status(200).json({ message: "Healthy" });
} catch (error) {
logger.error(error.message);
return res.status(500).json({ message: error.message });
}
});
/**
* Error handler middleware
* Should be called last
*/
app.use(handleErrors);
connectDbAndRunServer(app, db);
};
const db = DB_TYPE[process.env.DB_TYPE]
? DB_TYPE[process.env.DB_TYPE]()
: require("./db/FakeDb");
const app = express();
// middlewares
app.use(
cors()
//We will add configuration later
);
app.use(express.json());
app.use(helmet());
// **************************
// Make DB accessible anywhere we have a Request object
// By adding the DB to the request object, we can access it in any route
// Thus we do not need to import it in every route file, and we can easily swap out DBs as there is only one place to change it
// **************************
app.use((req, res, next) => {
req.db = db;
next();
startApp().catch((error) => {
console.log(error);
});
//routes
app.use("/api/v1/auth", authRouter);
app.use("/api/v1/monitors", monitorRouter);
app.use("/api/v1/checks", verifyJWT, checkRouter);
app.use("/api/v1/alerts", verifyJWT, alertRouter);
//health check
app.use("/api/v1/healthy", (req, res) => {
try {
logger.info("Checking Health of the server.");
return res.status(200).json({ message: "Healthy" });
} catch (error) {
logger.error(error.message);
return res.status(500).json({ message: error.message });
}
});
/**
* Error handler middleware
* Should be called last
*/
app.use(handleErrors);
connectDbAndRunServer(app, db);

788
Server/package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -13,6 +13,7 @@
"dependencies": {
"@sendgrid/mail": "^8.1.3",
"bcrypt": "^5.1.1",
"bullmq": "5.7.15",
"cors": "^2.8.5",
"dotenv": "^16.4.5",
"express": "^4.19.2",

View File

@@ -0,0 +1,13 @@
const router = require("express").Router();
const queueController = require("../controllers/queueController");
// Get Jobs
router.get("/", queueController.getJobs);
// Add Job
router.post("/", queueController.addJob);
// Obliterate Queue
router.post("/obliterate", queueController.obliterateQueue);
module.exports = router;

195
Server/service/jobQueue.js Normal file
View File

@@ -0,0 +1,195 @@
const { Queue, Worker, Job } = require("bullmq");
const QUEUE_NAME = "monitors";
const connection = {
host: process.env.REDIS_HOST || "127.0.0.1",
port: process.env.REDIS_PORT || 6379,
};
const JOBS_PER_WORKER = 5;
const logger = require("../utils/logger");
class JobQueue {
/**
* Constructs a new JobQueue
* @constructor
* @throws {Error}
*/
constructor() {
console.log(process.env.REDIS_PORT);
this.queue = new Queue(QUEUE_NAME, {
connection,
});
this.workers = [];
}
/**
* Static factory method to create a JobQueue
* @static
* @async
* @returns {Promise<JobQueue>} - Returns a new JobQueue
*
*/
static async createJobQueue() {
const queue = new JobQueue();
try {
const workerStats = await queue.getWorkerStats();
await queue.scaleWorkers(workerStats);
return queue;
} catch (error) {
throw error;
}
}
/**
* Creates a worker for the queue
* Operations are carried out in the async callback
* @returns {Worker} The newly created worker
*/
createWorker() {
const worker = new Worker(
QUEUE_NAME,
async (job) => {
// TODO Ping a monitor
console.log(`${job.name} completed, workers: ${this.workers.length}`);
},
{
connection,
}
);
return worker;
}
/**
* @typedef {Object} WorkerStats
* @property {Array<Job>} jobs - Array of jobs in the Queue
* @property {number} - workerLoad - The number of jobs per worker
*
*/
/**
* Gets stats related to the workers
* This is used for scaling workers right now
* In the future we will likely want to scale based on server performance metrics
* CPU Usage & memory usage, if too high, scale down workers.
* When to scale up? If jobs are taking too long to complete?
* @async
* @returns {Promise<WorkerStats>} - Returns the worker stats
*/
async getWorkerStats() {
try {
const jobs = await this.queue.getRepeatableJobs();
const load = jobs.length / this.workers.length;
return { jobs, load };
} catch (error) {
throw error;
}
}
/**
* Scale Workers
* This function scales workers based on the load per worker
* If the load is higher than the JOBS_PER_WORKER threshold, we add more workers
* If the load is lower than the JOBS_PER_WORKER threshold, we release workers
* This approach ignores server performance, which we should add in the future
*
* @async
* @param {WorkerStats} workerStats - The payload for the job.
* @returns {Promise<boolean>}
*/
async scaleWorkers(workerStats) {
if (this.workers.length === 0) {
// There are no workers, need to add one
const worker = this.createWorker();
this.workers.push(worker);
return true;
}
if (workerStats.load > JOBS_PER_WORKER) {
// Find out how many more jobs we have than current workers can handle
const excessJobs =
workerStats.jobs.length - this.workers.length * JOBS_PER_WORKER;
// Divide by jobs/worker to find out how many workers to add
const workersToAdd = Math.ceil(excessJobs / JOBS_PER_WORKER);
for (let i = 0; i < workersToAdd; i++) {
const worker = this.createWorker();
this.workers.push(worker);
}
return true;
}
if (workerStats.load < JOBS_PER_WORKER) {
// Find out how much excess capacity we have
const workerCapacity = this.workers.length * JOBS_PER_WORKER;
const excessCapacity = workerCapacity - workerStats.jobs.length;
// Calculate how many workers to remove
const workersToRemove = Math.floor(excessCapacity / JOBS_PER_WORKER);
for (let i = 0; i < workersToRemove; i++) {
const worker = this.workers.pop();
try {
await worker.close();
} catch (error) {
// Catch the error instead of throwing it
console.error("Error closing worker", error);
}
}
return true;
}
return false;
}
/**
* Gets all jobs in the queue.
*
* @async
* @returns {Promise<Array<Job>>}
* @throws {Error} - Throws error if getting jobs fails
*/
async getJobs() {
try {
const jobs = await this.queue.getRepeatableJobs();
console.log("jobs", jobs);
return jobs;
} catch (error) {
throw error;
}
}
/**
* Adds a job to the queue and scales workers based on worker stats.
*
* @async
* @param {string} jobName - The name of the job to be added.
* @param {Monitor} payload - The payload for the job.
* @throws {Error} - Will throw an error if the job cannot be added or workers don't scale
*/
async addJob(jobName, payload) {
try {
await this.queue.add(jobName, payload, {
repeat: {
every: 1000,
limit: 100,
},
});
const workerStats = await this.getWorkerStats();
await this.scaleWorkers(workerStats);
} catch (error) {
throw error;
}
}
/**
* @async
* @returns {Promise<boolean>} - Returns true if obliteration is successful
*/
async obliterate() {
try {
await this.queue.obliterate();
return true;
} catch (error) {
throw error;
}
}
}
module.exports = JobQueue;