mirror of
https://github.com/bluewave-labs/Checkmate.git
synced 2026-01-20 08:39:43 -06:00
Merge pull request #1035 from bluewave-labs/feat/be/job-queue-tests
Feat/be/job queue tests
This commit is contained in:
@@ -36,7 +36,7 @@ import { useSelector } from "react-redux";
|
||||
import { CssBaseline } from "@mui/material";
|
||||
import { useEffect } from "react";
|
||||
import { useDispatch } from "react-redux";
|
||||
import { getAppSettings } from "./Features/Settings/settingsSlice";
|
||||
import { getAppSettings, updateAppSettings } from "./Features/Settings/settingsSlice";
|
||||
import { logger } from "./Utils/Logger"; // Import the logger
|
||||
import { networkService } from "./main";
|
||||
function App() {
|
||||
@@ -66,6 +66,21 @@ function App() {
|
||||
};
|
||||
}, []);
|
||||
|
||||
useEffect(() => {
|
||||
const thing = async () => {
|
||||
const action = await dispatch(
|
||||
updateAppSettings({ authToken, settings: { apiBaseUrl: "test" } })
|
||||
);
|
||||
|
||||
if (action.payload.success) {
|
||||
console.log(action.payload.data);
|
||||
} else {
|
||||
console.log(action);
|
||||
}
|
||||
};
|
||||
thing();
|
||||
}, [dispatch, authToken]);
|
||||
|
||||
return (
|
||||
<ThemeProvider theme={mode === "light" ? lightTheme : darkTheme}>
|
||||
<CssBaseline />
|
||||
|
||||
@@ -49,6 +49,7 @@ export const updateAppSettings = createAsyncThunk(
|
||||
systemEmailAddress: settings.systemEmailAddress,
|
||||
systemEmailPassword: settings.systemEmailPassword,
|
||||
};
|
||||
console.log(parsedSettings);
|
||||
const res = await networkService.updateAppSettings({
|
||||
settings: parsedSettings,
|
||||
authToken,
|
||||
|
||||
@@ -13,6 +13,7 @@ class NetworkService {
|
||||
this.setBaseUrl(baseURL);
|
||||
this.unsubscribe = store.subscribe(() => {
|
||||
const state = store.getState();
|
||||
console.log(state.settings.apiBaseUrl);
|
||||
if (BASE_URL !== undefined) {
|
||||
baseURL = BASE_URL;
|
||||
} else if (state?.settings?.apiBaseUrl ?? null) {
|
||||
@@ -87,48 +88,48 @@ class NetworkService {
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* ************************************
|
||||
* Check the endpoint resolution
|
||||
* ************************************
|
||||
*
|
||||
* @async
|
||||
* @param {Object} config - The configuration object.
|
||||
* @param {string} config.authToken - The authorization token to be used in the request header.
|
||||
* @param {Object} config.monitorURL - The monitor url to be sent in the request body.
|
||||
* @returns {Promise<AxiosResponse>} The response from the axios POST request.
|
||||
*/
|
||||
async checkEndpointResolution(config) {
|
||||
const { authToken, monitorURL } = config;
|
||||
const params = new URLSearchParams();
|
||||
|
||||
if (monitorURL) params.append("monitorURL", monitorURL);
|
||||
|
||||
return this.axiosInstance.get(`/monitors/resolution/url?${params.toString()}`, {
|
||||
headers: {
|
||||
Authorization: `Bearer ${authToken}`,
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
})
|
||||
}
|
||||
/**
|
||||
*
|
||||
* ************************************
|
||||
* Check the endpoint resolution
|
||||
* ************************************
|
||||
*
|
||||
* @async
|
||||
* @param {Object} config - The configuration object.
|
||||
* @param {string} config.authToken - The authorization token to be used in the request header.
|
||||
* @param {Object} config.monitorURL - The monitor url to be sent in the request body.
|
||||
* @returns {Promise<AxiosResponse>} The response from the axios POST request.
|
||||
*/
|
||||
async checkEndpointResolution(config) {
|
||||
const { authToken, monitorURL } = config;
|
||||
const params = new URLSearchParams();
|
||||
|
||||
/**
|
||||
*
|
||||
* ************************************
|
||||
* Gets monitors and summary of stats by TeamID
|
||||
* ************************************
|
||||
*
|
||||
* @async
|
||||
* @param {Object} config - The configuration object.
|
||||
* @param {string} config.authToken - The authorization token to be used in the request header.
|
||||
* @param {string} config.teamId - Team ID
|
||||
* @param {Array<string>} config.types - Array of monitor types
|
||||
* @returns {Promise<AxiosResponse>} The response from the axios POST request.
|
||||
*/
|
||||
async getMonitorsAndSummaryByTeamId(config) {
|
||||
const params = new URLSearchParams();
|
||||
if (monitorURL) params.append("monitorURL", monitorURL);
|
||||
|
||||
return this.axiosInstance.get(`/monitors/resolution/url?${params.toString()}`, {
|
||||
headers: {
|
||||
Authorization: `Bearer ${authToken}`,
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* ************************************
|
||||
* Gets monitors and summary of stats by TeamID
|
||||
* ************************************
|
||||
*
|
||||
* @async
|
||||
* @param {Object} config - The configuration object.
|
||||
* @param {string} config.authToken - The authorization token to be used in the request header.
|
||||
* @param {string} config.teamId - Team ID
|
||||
* @param {Array<string>} config.types - Array of monitor types
|
||||
* @returns {Promise<AxiosResponse>} The response from the axios POST request.
|
||||
*/
|
||||
async getMonitorsAndSummaryByTeamId(config) {
|
||||
const params = new URLSearchParams();
|
||||
|
||||
if (config.types) {
|
||||
config.types.forEach((type) => {
|
||||
|
||||
@@ -19,7 +19,10 @@ import { fileURLToPath } from "url";
|
||||
|
||||
import { connectDbAndRunServer } from "./configs/db.js";
|
||||
import queueRouter from "./routes/queueRoute.js";
|
||||
|
||||
//JobQueue service and dependencies
|
||||
import JobQueue from "./service/jobQueue.js";
|
||||
import { Queue, Worker } from "bullmq";
|
||||
|
||||
//Network service and dependencies
|
||||
import NetworkService from "./service/networkService.js";
|
||||
@@ -157,7 +160,14 @@ const startApp = async () => {
|
||||
logger
|
||||
);
|
||||
const networkService = new NetworkService(db, emailService, axios, ping, logger, http);
|
||||
const jobQueue = await JobQueue.createJobQueue(db, networkService, settingsService);
|
||||
const jobQueue = await JobQueue.createJobQueue(
|
||||
db,
|
||||
networkService,
|
||||
settingsService,
|
||||
logger,
|
||||
Queue,
|
||||
Worker
|
||||
);
|
||||
|
||||
const cleanup = async () => {
|
||||
if (cleaningUp) {
|
||||
|
||||
@@ -1,8 +1,5 @@
|
||||
import { Queue, Worker, Job } from "bullmq";
|
||||
const QUEUE_NAME = "monitors";
|
||||
|
||||
const JOBS_PER_WORKER = 5;
|
||||
import logger from "../utils/logger.js";
|
||||
import { errorMessages, successMessages } from "../utils/messages.js";
|
||||
const SERVICE_NAME = "JobQueue";
|
||||
/**
|
||||
@@ -19,11 +16,13 @@ class JobQueue {
|
||||
* @param {SettingsService} settingsService - The settings service
|
||||
* @throws {Error}
|
||||
*/
|
||||
constructor(settingsService) {
|
||||
const { redisHost, redisPort } = settingsService.getSettings();
|
||||
constructor(settingsService, logger, Queue, Worker) {
|
||||
const settings = settingsService.getSettings() || {};
|
||||
|
||||
const { redisHost = "127.0.0.1", redisPort = 6379 } = settings;
|
||||
const connection = {
|
||||
host: redisHost || "127.0.0.1",
|
||||
port: redisPort || 6379,
|
||||
host: redisHost,
|
||||
port: redisPort,
|
||||
};
|
||||
this.connection = connection;
|
||||
this.queue = new Queue(QUEUE_NAME, {
|
||||
@@ -33,6 +32,8 @@ class JobQueue {
|
||||
this.db = null;
|
||||
this.networkService = null;
|
||||
this.settingsService = settingsService;
|
||||
this.logger = logger;
|
||||
this.Worker = Worker;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -42,8 +43,15 @@ class JobQueue {
|
||||
* @returns {Promise<JobQueue>} - Returns a new JobQueue
|
||||
*
|
||||
*/
|
||||
static async createJobQueue(db, networkService, settingsService) {
|
||||
const queue = new JobQueue(settingsService);
|
||||
static async createJobQueue(
|
||||
db,
|
||||
networkService,
|
||||
settingsService,
|
||||
logger,
|
||||
Queue,
|
||||
Worker
|
||||
) {
|
||||
const queue = new JobQueue(settingsService, logger, Queue, Worker);
|
||||
try {
|
||||
queue.db = db;
|
||||
queue.networkService = networkService;
|
||||
@@ -69,7 +77,7 @@ class JobQueue {
|
||||
* @returns {Worker} The newly created worker
|
||||
*/
|
||||
createWorker() {
|
||||
const worker = new Worker(
|
||||
const worker = new this.Worker(
|
||||
QUEUE_NAME,
|
||||
async (job) => {
|
||||
try {
|
||||
@@ -96,17 +104,16 @@ class JobQueue {
|
||||
}
|
||||
return acc;
|
||||
}, false);
|
||||
|
||||
if (!maintenanceWindowActive) {
|
||||
await this.networkService.getStatus(job);
|
||||
} else {
|
||||
logger.info(`Monitor ${monitorId} is in maintenance window`, {
|
||||
this.logger.info(`Monitor ${monitorId} is in maintenance window`, {
|
||||
service: SERVICE_NAME,
|
||||
monitorId,
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(`Error processing job ${job.id}: ${error.message}`, {
|
||||
this.logger.error(`Error processing job ${job.id}: ${error.message}`, {
|
||||
service: SERVICE_NAME,
|
||||
jobId: job.id,
|
||||
error: error,
|
||||
@@ -169,11 +176,9 @@ class JobQueue {
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
if (workerStats.load > JOBS_PER_WORKER) {
|
||||
// Find out how many more jobs we have than current workers can handle
|
||||
const excessJobs = workerStats.jobs.length - this.workers.length * JOBS_PER_WORKER;
|
||||
|
||||
// Divide by jobs/worker to find out how many workers to add
|
||||
const workersToAdd = Math.ceil(excessJobs / JOBS_PER_WORKER);
|
||||
for (let i = 0; i < workersToAdd; i++) {
|
||||
@@ -188,18 +193,17 @@ class JobQueue {
|
||||
const workerCapacity = this.workers.length * JOBS_PER_WORKER;
|
||||
const excessCapacity = workerCapacity - workerStats.jobs.length;
|
||||
// Calculate how many workers to remove
|
||||
const workersToRemove = Math.floor(excessCapacity / JOBS_PER_WORKER);
|
||||
if (this.workers.length > 5) {
|
||||
for (let i = 0; i < workersToRemove; i++) {
|
||||
const worker = this.workers.pop();
|
||||
try {
|
||||
await worker.close();
|
||||
} catch (error) {
|
||||
// Catch the error instead of throwing it
|
||||
logger.error(errorMessages.JOB_QUEUE_WORKER_CLOSE, {
|
||||
service: SERVICE_NAME,
|
||||
});
|
||||
}
|
||||
let workersToRemove = Math.floor(excessCapacity / JOBS_PER_WORKER); // Make sure there are always at least 5
|
||||
while (workersToRemove > 0 && this.workers.length > 5) {
|
||||
const worker = this.workers.pop();
|
||||
workersToRemove--;
|
||||
try {
|
||||
await worker.close();
|
||||
} catch (error) {
|
||||
// Catch the error instead of throwing it
|
||||
this.logger.error(errorMessages.JOB_QUEUE_WORKER_CLOSE, {
|
||||
service: SERVICE_NAME,
|
||||
});
|
||||
}
|
||||
}
|
||||
return true;
|
||||
@@ -282,14 +286,14 @@ class JobQueue {
|
||||
every: monitor.interval,
|
||||
});
|
||||
if (deleted) {
|
||||
logger.info(successMessages.JOB_QUEUE_DELETE_JOB, {
|
||||
this.logger.info(successMessages.JOB_QUEUE_DELETE_JOB, {
|
||||
service: SERVICE_NAME,
|
||||
jobId: monitor.id,
|
||||
});
|
||||
const workerStats = await this.getWorkerStats();
|
||||
await this.scaleWorkers(workerStats);
|
||||
} else {
|
||||
logger.error(errorMessages.JOB_QUEUE_DELETE_JOB, {
|
||||
this.logger.error(errorMessages.JOB_QUEUE_DELETE_JOB, {
|
||||
service: SERVICE_NAME,
|
||||
jobId: monitor.id,
|
||||
});
|
||||
@@ -311,9 +315,10 @@ class JobQueue {
|
||||
delayed: await this.queue.getDelayedCount(),
|
||||
repeatableJobs: (await this.queue.getRepeatableJobs()).length,
|
||||
};
|
||||
console.log(metrics);
|
||||
return metrics;
|
||||
} catch (error) {
|
||||
logger.error("Failed to retrieve job queue metrics", {
|
||||
this.logger.error("Failed to retrieve job queue metrics", {
|
||||
service: SERVICE_NAME,
|
||||
errorMsg: error.message,
|
||||
});
|
||||
@@ -344,7 +349,7 @@ class JobQueue {
|
||||
await this.queue.obliterate();
|
||||
metrics = await this.getMetrics();
|
||||
console.log(metrics);
|
||||
logger.info(successMessages.JOB_QUEUE_OBLITERATE, {
|
||||
this.logger.info(successMessages.JOB_QUEUE_OBLITERATE, {
|
||||
service: SERVICE_NAME,
|
||||
});
|
||||
return true;
|
||||
|
||||
738
Server/tests/services/jobQueue.test.js
Normal file
738
Server/tests/services/jobQueue.test.js
Normal file
@@ -0,0 +1,738 @@
|
||||
import sinon from "sinon";
|
||||
import JobQueue from "../../service/jobQueue.js";
|
||||
import { log } from "console";
|
||||
|
||||
class QueueStub {
|
||||
constructor(queueName, options) {
|
||||
this.queueName = queueName;
|
||||
this.options = options;
|
||||
this.workers = [];
|
||||
this.jobs = [];
|
||||
}
|
||||
|
||||
// Add any methods that are expected to be called on the Queue instance
|
||||
add(job) {
|
||||
this.jobs.push(job);
|
||||
}
|
||||
|
||||
removeRepeatable(id) {
|
||||
const removedJob = this.jobs.find((job) => job.data._id === id);
|
||||
this.jobs = this.jobs.filter((job) => job.data._id !== id);
|
||||
return removedJob;
|
||||
}
|
||||
|
||||
getRepeatableJobs() {
|
||||
return this.jobs;
|
||||
}
|
||||
async getJobs() {
|
||||
return this.jobs;
|
||||
}
|
||||
}
|
||||
|
||||
class WorkerStub {
|
||||
constructor(QUEUE_NAME, workerTask) {
|
||||
this.queueName = QUEUE_NAME;
|
||||
this.workerTask = async () => workerTask({ data: { _id: 1 } });
|
||||
}
|
||||
|
||||
async close() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
describe("JobQueue", () => {
|
||||
let settingsService, logger, db, networkService;
|
||||
|
||||
beforeEach(() => {
|
||||
settingsService = { getSettings: sinon.stub() };
|
||||
logger = { error: sinon.stub(), info: sinon.stub() };
|
||||
db = {
|
||||
getAllMonitors: sinon.stub().returns([]),
|
||||
getMaintenanceWindowsByMonitorId: sinon.stub().returns([]),
|
||||
};
|
||||
networkService = { getStatus: sinon.stub() };
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
sinon.restore();
|
||||
});
|
||||
describe("createJobQueue", () => {
|
||||
it("should create a new JobQueue and add jobs for active monitors", async () => {
|
||||
db.getAllMonitors.returns([
|
||||
{ id: 1, isActive: true },
|
||||
{ id: 2, isActive: true },
|
||||
]);
|
||||
const jobQueue = await JobQueue.createJobQueue(
|
||||
db,
|
||||
networkService,
|
||||
settingsService,
|
||||
logger,
|
||||
QueueStub,
|
||||
WorkerStub
|
||||
);
|
||||
// There should be double the jobs, as one is meant to be instantly executed
|
||||
// And one is meant to be enqueued
|
||||
expect(jobQueue.queue.jobs.length).to.equal(4);
|
||||
});
|
||||
|
||||
it("should reject with an error if an error occurs", async () => {
|
||||
db.getAllMonitors.throws("Error");
|
||||
try {
|
||||
await JobQueue.createJobQueue(
|
||||
db,
|
||||
networkService,
|
||||
settingsService,
|
||||
logger,
|
||||
QueueStub,
|
||||
WorkerStub
|
||||
);
|
||||
} catch (error) {
|
||||
expect(error.service).to.equal("JobQueue");
|
||||
expect(error.method).to.equal("createJobQueue");
|
||||
}
|
||||
});
|
||||
it("should reject with an error if an error occurs, should not overwrite error data", async () => {
|
||||
const error = new Error("Error");
|
||||
error.service = "otherService";
|
||||
error.method = "otherMethod";
|
||||
db.getAllMonitors.throws(error);
|
||||
|
||||
try {
|
||||
await JobQueue.createJobQueue(
|
||||
db,
|
||||
networkService,
|
||||
settingsService,
|
||||
logger,
|
||||
QueueStub,
|
||||
WorkerStub
|
||||
);
|
||||
} catch (error) {
|
||||
expect(error.service).to.equal("otherService");
|
||||
expect(error.method).to.equal("otherMethod");
|
||||
}
|
||||
});
|
||||
});
|
||||
describe("Constructor", () => {
|
||||
it("should construct a new JobQueue with default port and host if not provided", () => {
|
||||
settingsService.getSettings.returns({});
|
||||
const jobQueue = new JobQueue(settingsService, logger, QueueStub, WorkerStub);
|
||||
expect(jobQueue.connection.host).to.equal("127.0.0.1");
|
||||
expect(jobQueue.connection.port).to.equal(6379);
|
||||
});
|
||||
it("should construct a new JobQueue with provided port and host", () => {
|
||||
settingsService.getSettings.returns({ redisHost: "localhost", redisPort: 1234 });
|
||||
const jobQueue = new JobQueue(settingsService, logger, QueueStub, WorkerStub);
|
||||
expect(jobQueue.connection.host).to.equal("localhost");
|
||||
expect(jobQueue.connection.port).to.equal(1234);
|
||||
});
|
||||
});
|
||||
|
||||
describe("createWorker", () => {
|
||||
it("should create a new worker", async () => {
|
||||
const jobQueue = new JobQueue(settingsService, logger, QueueStub, WorkerStub);
|
||||
const worker = jobQueue.createWorker();
|
||||
expect(worker).to.be.instanceOf(WorkerStub);
|
||||
});
|
||||
it("worker should handle a maintenanceWindow error", async () => {
|
||||
db.getMaintenanceWindowsByMonitorId.throws("Error");
|
||||
const jobQueue = await JobQueue.createJobQueue(
|
||||
db,
|
||||
networkService,
|
||||
settingsService,
|
||||
logger,
|
||||
QueueStub,
|
||||
WorkerStub
|
||||
);
|
||||
const worker = jobQueue.createWorker();
|
||||
await worker.workerTask();
|
||||
expect(logger.error.calledOnce).to.be.true;
|
||||
});
|
||||
it("worker should handle a maintenanceWindow that is not active", async () => {
|
||||
db.getMaintenanceWindowsByMonitorId.returns([
|
||||
{ start: 123, end: 123, repeat: 123456 },
|
||||
]);
|
||||
const jobQueue = await JobQueue.createJobQueue(
|
||||
db,
|
||||
networkService,
|
||||
settingsService,
|
||||
logger,
|
||||
QueueStub,
|
||||
WorkerStub
|
||||
);
|
||||
const worker = jobQueue.createWorker();
|
||||
await worker.workerTask();
|
||||
expect(networkService.getStatus.calledOnce).to.be.true;
|
||||
});
|
||||
it("worker should handle a maintenanceWindow that is active", async () => {
|
||||
db.getMaintenanceWindowsByMonitorId.returns([
|
||||
{
|
||||
active: true,
|
||||
start: new Date(Date.now() - 1000).toISOString(),
|
||||
end: new Date(Date.now() + 1000).toISOString(),
|
||||
repeat: 0,
|
||||
},
|
||||
]);
|
||||
const jobQueue = await JobQueue.createJobQueue(
|
||||
db,
|
||||
networkService,
|
||||
settingsService,
|
||||
logger,
|
||||
QueueStub,
|
||||
WorkerStub
|
||||
);
|
||||
const worker = jobQueue.createWorker();
|
||||
await worker.workerTask();
|
||||
expect(networkService.getStatus.calledOnce).to.be.false;
|
||||
});
|
||||
it("worker should handle a maintenanceWindow that is active, has a repeat, but is not in maintenance zone", async () => {
|
||||
db.getMaintenanceWindowsByMonitorId.returns([
|
||||
{
|
||||
active: true,
|
||||
start: new Date(Date.now() - 10000).toISOString(),
|
||||
end: new Date(Date.now() + 5000).toISOString(),
|
||||
repeat: 10000,
|
||||
},
|
||||
]);
|
||||
const jobQueue = await JobQueue.createJobQueue(
|
||||
db,
|
||||
networkService,
|
||||
settingsService,
|
||||
logger,
|
||||
QueueStub,
|
||||
WorkerStub
|
||||
);
|
||||
const worker = jobQueue.createWorker();
|
||||
await worker.workerTask();
|
||||
expect(networkService.getStatus.calledOnce).to.be.true;
|
||||
});
|
||||
});
|
||||
describe("getWorkerStats", () => {
|
||||
it("should throw an error if getRepeatable Jobs fails", async () => {
|
||||
const jobQueue = await JobQueue.createJobQueue(
|
||||
db,
|
||||
networkService,
|
||||
settingsService,
|
||||
logger,
|
||||
QueueStub,
|
||||
WorkerStub
|
||||
);
|
||||
jobQueue.queue.getRepeatableJobs = async () => {
|
||||
throw new Error("Error");
|
||||
};
|
||||
try {
|
||||
const stats = await jobQueue.getWorkerStats();
|
||||
} catch (error) {
|
||||
expect(error.service).to.equal("JobQueue");
|
||||
expect(error.method).to.equal("getWorkerStats");
|
||||
}
|
||||
});
|
||||
it("should throw an error if getRepeatable Jobs fails but respect existing error data", async () => {
|
||||
const jobQueue = await JobQueue.createJobQueue(
|
||||
db,
|
||||
networkService,
|
||||
settingsService,
|
||||
logger,
|
||||
QueueStub,
|
||||
WorkerStub
|
||||
);
|
||||
jobQueue.queue.getRepeatableJobs = async () => {
|
||||
const error = new Error("Existing Error");
|
||||
error.service = "otherService";
|
||||
error.method = "otherMethod";
|
||||
throw error;
|
||||
};
|
||||
try {
|
||||
await jobQueue.getWorkerStats();
|
||||
} catch (error) {
|
||||
expect(error.service).to.equal("otherService");
|
||||
expect(error.method).to.equal("otherMethod");
|
||||
}
|
||||
});
|
||||
});
|
||||
describe("scaleWorkers", () => {
|
||||
it("should scale workers to 5 if no workers", async () => {
|
||||
const jobQueue = await JobQueue.createJobQueue(
|
||||
db,
|
||||
networkService,
|
||||
settingsService,
|
||||
logger,
|
||||
QueueStub,
|
||||
WorkerStub
|
||||
);
|
||||
expect(jobQueue.workers.length).to.equal(5);
|
||||
});
|
||||
it("should scale workers up", async () => {
|
||||
const jobQueue = await JobQueue.createJobQueue(
|
||||
db,
|
||||
networkService,
|
||||
settingsService,
|
||||
logger,
|
||||
QueueStub,
|
||||
WorkerStub
|
||||
);
|
||||
jobQueue.scaleWorkers({
|
||||
load: 100,
|
||||
jobs: Array.from({ length: 100 }, (_, i) => i + 1),
|
||||
});
|
||||
expect(jobQueue.workers.length).to.equal(20);
|
||||
});
|
||||
it("should scale workers down, even with error of worker.close fails", async () => {
|
||||
WorkerStub.prototype.close = async () => {
|
||||
throw new Error("Error");
|
||||
};
|
||||
const jobQueue = await JobQueue.createJobQueue(
|
||||
db,
|
||||
networkService,
|
||||
settingsService,
|
||||
logger,
|
||||
QueueStub,
|
||||
WorkerStub
|
||||
);
|
||||
await jobQueue.scaleWorkers({
|
||||
load: 100,
|
||||
jobs: Array.from({ length: 100 }, (_, i) => i + 1),
|
||||
});
|
||||
|
||||
const res = await jobQueue.scaleWorkers({
|
||||
load: 0,
|
||||
jobs: [],
|
||||
});
|
||||
expect(jobQueue.workers.length).to.equal(5);
|
||||
});
|
||||
it("should scale workers down", async () => {
|
||||
WorkerStub.prototype.close = async () => {
|
||||
return true;
|
||||
};
|
||||
const jobQueue = await JobQueue.createJobQueue(
|
||||
db,
|
||||
networkService,
|
||||
settingsService,
|
||||
logger,
|
||||
QueueStub,
|
||||
WorkerStub
|
||||
);
|
||||
await jobQueue.scaleWorkers({
|
||||
load: 40,
|
||||
jobs: Array.from({ length: 40 }, (_, i) => i + 1),
|
||||
});
|
||||
|
||||
const res = await jobQueue.scaleWorkers({
|
||||
load: 0,
|
||||
jobs: [],
|
||||
});
|
||||
expect(jobQueue.workers.length).to.equal(5);
|
||||
});
|
||||
it("should return false if scaling doesn't happen", async () => {
|
||||
const jobQueue = await JobQueue.createJobQueue(
|
||||
db,
|
||||
networkService,
|
||||
settingsService,
|
||||
logger,
|
||||
QueueStub,
|
||||
WorkerStub
|
||||
);
|
||||
const res = await jobQueue.scaleWorkers({ load: 5 });
|
||||
expect(jobQueue.workers.length).to.equal(5);
|
||||
expect(res).to.be.false;
|
||||
});
|
||||
});
|
||||
|
||||
describe("getJobs", async () => {
|
||||
it("should return jobs", async () => {
|
||||
const jobQueue = await JobQueue.createJobQueue(
|
||||
db,
|
||||
networkService,
|
||||
settingsService,
|
||||
logger,
|
||||
QueueStub,
|
||||
WorkerStub
|
||||
);
|
||||
const jobs = await jobQueue.getJobs();
|
||||
expect(jobs.length).to.equal(0);
|
||||
});
|
||||
it("should throw an error if getRepeatableJobs fails", async () => {
|
||||
const jobQueue = await JobQueue.createJobQueue(
|
||||
db,
|
||||
networkService,
|
||||
settingsService,
|
||||
logger,
|
||||
QueueStub,
|
||||
WorkerStub
|
||||
);
|
||||
try {
|
||||
jobQueue.queue.getRepeatableJobs = async () => {
|
||||
throw new Error("error");
|
||||
};
|
||||
|
||||
await jobQueue.getJobs(true);
|
||||
} catch (error) {
|
||||
expect(error.service).to.equal("JobQueue");
|
||||
expect(error.method).to.equal("getJobs");
|
||||
}
|
||||
});
|
||||
it("should throw an error if getRepeatableJobs fails but respect existing error data", async () => {
|
||||
const jobQueue = await JobQueue.createJobQueue(
|
||||
db,
|
||||
networkService,
|
||||
settingsService,
|
||||
logger,
|
||||
QueueStub,
|
||||
WorkerStub
|
||||
);
|
||||
try {
|
||||
jobQueue.queue.getRepeatableJobs = async () => {
|
||||
const error = new Error("Existing error");
|
||||
error.service = "otherService";
|
||||
error.method = "otherMethod";
|
||||
throw error;
|
||||
};
|
||||
|
||||
await jobQueue.getJobs(true);
|
||||
} catch (error) {
|
||||
expect(error.service).to.equal("otherService");
|
||||
expect(error.method).to.equal("otherMethod");
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("getJobStats", async () => {
|
||||
it("should return job stats for no jobs", async () => {
|
||||
const jobQueue = await JobQueue.createJobQueue(
|
||||
db,
|
||||
networkService,
|
||||
settingsService,
|
||||
logger,
|
||||
QueueStub,
|
||||
WorkerStub
|
||||
);
|
||||
const jobStats = await jobQueue.getJobStats();
|
||||
expect(jobStats).to.deep.equal({ jobs: [], workers: 5 });
|
||||
});
|
||||
it("should return job stats for jobs", async () => {
|
||||
const jobQueue = await JobQueue.createJobQueue(
|
||||
db,
|
||||
networkService,
|
||||
settingsService,
|
||||
logger,
|
||||
QueueStub,
|
||||
WorkerStub
|
||||
);
|
||||
jobQueue.queue.getJobs = async () => {
|
||||
return [{ data: { url: "test" }, getState: async () => "completed" }];
|
||||
};
|
||||
const jobStats = await jobQueue.getJobStats();
|
||||
expect(jobStats).to.deep.equal({
|
||||
jobs: [{ url: "test", state: "completed" }],
|
||||
workers: 5,
|
||||
});
|
||||
});
|
||||
it("should reject with an error if mapping jobs fails", async () => {
|
||||
const jobQueue = await JobQueue.createJobQueue(
|
||||
db,
|
||||
networkService,
|
||||
settingsService,
|
||||
logger,
|
||||
QueueStub,
|
||||
WorkerStub
|
||||
);
|
||||
jobQueue.queue.getJobs = async () => {
|
||||
return [
|
||||
{
|
||||
data: { url: "test" },
|
||||
getState: async () => {
|
||||
throw new Error("Mapping Error");
|
||||
},
|
||||
},
|
||||
];
|
||||
};
|
||||
try {
|
||||
await jobQueue.getJobStats();
|
||||
} catch (error) {
|
||||
expect(error.message).to.equal("Mapping Error");
|
||||
expect(error.service).to.equal("JobQueue");
|
||||
expect(error.method).to.equal("getJobStats");
|
||||
}
|
||||
});
|
||||
it("should reject with an error if mapping jobs fails but respect existing error data", async () => {
|
||||
const jobQueue = await JobQueue.createJobQueue(
|
||||
db,
|
||||
networkService,
|
||||
settingsService,
|
||||
logger,
|
||||
QueueStub,
|
||||
WorkerStub
|
||||
);
|
||||
jobQueue.queue.getJobs = async () => {
|
||||
return [
|
||||
{
|
||||
data: { url: "test" },
|
||||
getState: async () => {
|
||||
const error = new Error("Mapping Error");
|
||||
error.service = "otherService";
|
||||
error.method = "otherMethod";
|
||||
throw error;
|
||||
},
|
||||
},
|
||||
];
|
||||
};
|
||||
try {
|
||||
await jobQueue.getJobStats();
|
||||
} catch (error) {
|
||||
expect(error.message).to.equal("Mapping Error");
|
||||
expect(error.service).to.equal("otherService");
|
||||
expect(error.method).to.equal("otherMethod");
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("addJob", async () => {
|
||||
it("should add a job to the queue", async () => {
|
||||
const jobQueue = await JobQueue.createJobQueue(
|
||||
db,
|
||||
networkService,
|
||||
settingsService,
|
||||
logger,
|
||||
QueueStub,
|
||||
WorkerStub
|
||||
);
|
||||
jobQueue.addJob("test", { url: "test" });
|
||||
expect(jobQueue.queue.jobs.length).to.equal(1);
|
||||
});
|
||||
it("should reject with an error if adding fails", async () => {
|
||||
const jobQueue = await JobQueue.createJobQueue(
|
||||
db,
|
||||
networkService,
|
||||
settingsService,
|
||||
logger,
|
||||
QueueStub,
|
||||
WorkerStub
|
||||
);
|
||||
jobQueue.queue.add = async () => {
|
||||
throw new Error("Error adding job");
|
||||
};
|
||||
try {
|
||||
await jobQueue.addJob("test", { url: "test" });
|
||||
} catch (error) {
|
||||
expect(error.message).to.equal("Error adding job");
|
||||
expect(error.service).to.equal("JobQueue");
|
||||
expect(error.method).to.equal("addJob");
|
||||
}
|
||||
});
|
||||
it("should reject with an error if adding fails but respect existing error data", async () => {
|
||||
const jobQueue = await JobQueue.createJobQueue(
|
||||
db,
|
||||
networkService,
|
||||
settingsService,
|
||||
logger,
|
||||
QueueStub,
|
||||
WorkerStub
|
||||
);
|
||||
jobQueue.queue.add = async () => {
|
||||
const error = new Error("Error adding job");
|
||||
error.service = "otherService";
|
||||
error.method = "otherMethod";
|
||||
throw error;
|
||||
};
|
||||
try {
|
||||
await jobQueue.addJob("test", { url: "test" });
|
||||
} catch (error) {
|
||||
expect(error.message).to.equal("Error adding job");
|
||||
expect(error.service).to.equal("otherService");
|
||||
expect(error.method).to.equal("otherMethod");
|
||||
}
|
||||
});
|
||||
});
|
||||
describe("deleteJob", async () => {
|
||||
it("should delete a job from the queue", async () => {
|
||||
const jobQueue = await JobQueue.createJobQueue(
|
||||
db,
|
||||
networkService,
|
||||
settingsService,
|
||||
logger,
|
||||
QueueStub,
|
||||
WorkerStub
|
||||
);
|
||||
jobQueue.getWorkerStats = sinon.stub().returns({ load: 1, jobs: [{}] });
|
||||
jobQueue.scaleWorkers = sinon.stub();
|
||||
const monitor = { _id: 1 };
|
||||
const job = { data: monitor };
|
||||
jobQueue.queue.jobs = [job];
|
||||
await jobQueue.deleteJob(monitor);
|
||||
expect(jobQueue.queue.jobs.length).to.equal(0);
|
||||
expect(logger.info.calledOnce).to.be.true;
|
||||
expect(jobQueue.getWorkerStats.calledOnce).to.be.true;
|
||||
expect(jobQueue.scaleWorkers.calledOnce).to.be.true;
|
||||
});
|
||||
it("should log an error if job is not found", async () => {
|
||||
const jobQueue = await JobQueue.createJobQueue(
|
||||
db,
|
||||
networkService,
|
||||
settingsService,
|
||||
logger,
|
||||
QueueStub,
|
||||
WorkerStub
|
||||
);
|
||||
jobQueue.getWorkerStats = sinon.stub().returns({ load: 1, jobs: [{}] });
|
||||
jobQueue.scaleWorkers = sinon.stub();
|
||||
const monitor = { _id: 1 };
|
||||
const job = { data: monitor };
|
||||
jobQueue.queue.jobs = [job];
|
||||
await jobQueue.deleteJob({ id_: 2 });
|
||||
expect(logger.error.calledOnce).to.be.true;
|
||||
});
|
||||
it("should reject with an error if removeRepeatable fails", async () => {
|
||||
const jobQueue = await JobQueue.createJobQueue(
|
||||
db,
|
||||
networkService,
|
||||
settingsService,
|
||||
logger,
|
||||
QueueStub,
|
||||
WorkerStub
|
||||
);
|
||||
jobQueue.queue.removeRepeatable = async () => {
|
||||
const error = new Error("removeRepeatable error");
|
||||
throw error;
|
||||
};
|
||||
|
||||
try {
|
||||
await jobQueue.deleteJob({ _id: 1 });
|
||||
} catch (error) {
|
||||
expect(error.message).to.equal("removeRepeatable error");
|
||||
expect(error.service).to.equal("JobQueue");
|
||||
expect(error.method).to.equal("deleteJob");
|
||||
}
|
||||
});
|
||||
it("should reject with an error if removeRepeatable fails but respect existing error data", async () => {
|
||||
const jobQueue = await JobQueue.createJobQueue(
|
||||
db,
|
||||
networkService,
|
||||
settingsService,
|
||||
logger,
|
||||
QueueStub,
|
||||
WorkerStub
|
||||
);
|
||||
jobQueue.queue.removeRepeatable = async () => {
|
||||
const error = new Error("removeRepeatable error");
|
||||
error.service = "otherService";
|
||||
error.method = "otherMethod";
|
||||
throw error;
|
||||
};
|
||||
|
||||
try {
|
||||
await jobQueue.deleteJob({ _id: 1 });
|
||||
} catch (error) {
|
||||
expect(error.message).to.equal("removeRepeatable error");
|
||||
expect(error.service).to.equal("otherService");
|
||||
expect(error.method).to.equal("otherMethod");
|
||||
}
|
||||
});
|
||||
});
|
||||
describe("getMetrics", () => {
|
||||
it("should return metrics for the job queue", async () => {
|
||||
const jobQueue = await JobQueue.createJobQueue(
|
||||
db,
|
||||
networkService,
|
||||
settingsService,
|
||||
logger,
|
||||
QueueStub,
|
||||
WorkerStub
|
||||
);
|
||||
jobQueue.queue.getWaitingCount = async () => 1;
|
||||
jobQueue.queue.getActiveCount = async () => 2;
|
||||
jobQueue.queue.getCompletedCount = async () => 3;
|
||||
jobQueue.queue.getFailedCount = async () => 4;
|
||||
jobQueue.queue.getDelayedCount = async () => 5;
|
||||
jobQueue.queue.getRepeatableJobs = async () => [1, 2, 3];
|
||||
const metrics = await jobQueue.getMetrics();
|
||||
expect(metrics).to.deep.equal({
|
||||
waiting: 1,
|
||||
active: 2,
|
||||
completed: 3,
|
||||
failed: 4,
|
||||
delayed: 5,
|
||||
repeatableJobs: 3,
|
||||
});
|
||||
});
|
||||
it("should log an error if metrics operations fail", async () => {
|
||||
const jobQueue = await JobQueue.createJobQueue(
|
||||
db,
|
||||
networkService,
|
||||
settingsService,
|
||||
logger,
|
||||
QueueStub,
|
||||
WorkerStub
|
||||
);
|
||||
jobQueue.queue.getWaitingCount = async () => {
|
||||
throw new Error("Error");
|
||||
};
|
||||
await jobQueue.getMetrics();
|
||||
expect(logger.error.calledOnce).to.be.true;
|
||||
expect(logger.error.calledWith("Failed to retrieve job queue metrics")).to.be.true;
|
||||
});
|
||||
});
|
||||
|
||||
describe("obliterate", () => {
|
||||
it("should return true if obliteration is successful", async () => {
|
||||
const jobQueue = await JobQueue.createJobQueue(
|
||||
db,
|
||||
networkService,
|
||||
settingsService,
|
||||
logger,
|
||||
QueueStub,
|
||||
WorkerStub
|
||||
);
|
||||
jobQueue.queue.pause = async () => true;
|
||||
jobQueue.getJobs = async () => [{ key: 1, id: 1 }];
|
||||
jobQueue.queue.removeRepeatableByKey = async () => true;
|
||||
jobQueue.queue.remove = async () => true;
|
||||
jobQueue.queue.obliterate = async () => true;
|
||||
const obliteration = await jobQueue.obliterate();
|
||||
expect(obliteration).to.be.true;
|
||||
});
|
||||
it("should throw an error if obliteration fails", async () => {
|
||||
const jobQueue = await JobQueue.createJobQueue(
|
||||
db,
|
||||
networkService,
|
||||
settingsService,
|
||||
logger,
|
||||
QueueStub,
|
||||
WorkerStub
|
||||
);
|
||||
|
||||
jobQueue.getMetrics = async () => {
|
||||
throw new Error("Error");
|
||||
};
|
||||
|
||||
try {
|
||||
await jobQueue.obliterate();
|
||||
} catch (error) {
|
||||
expect(error.service).to.equal("JobQueue");
|
||||
expect(error.method).to.equal("obliterate");
|
||||
}
|
||||
});
|
||||
it("should throw an error if obliteration fails but respect existing error data", async () => {
|
||||
const jobQueue = await JobQueue.createJobQueue(
|
||||
db,
|
||||
networkService,
|
||||
settingsService,
|
||||
logger,
|
||||
QueueStub,
|
||||
WorkerStub
|
||||
);
|
||||
|
||||
jobQueue.getMetrics = async () => {
|
||||
const error = new Error("Error");
|
||||
error.service = "otherService";
|
||||
error.method = "otherMethod";
|
||||
throw error;
|
||||
};
|
||||
|
||||
try {
|
||||
await jobQueue.obliterate();
|
||||
} catch (error) {
|
||||
expect(error.service).to.equal("otherService");
|
||||
expect(error.method).to.equal("otherMethod");
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -308,43 +308,36 @@ describe("networkService - handlePing", () => {
|
||||
sinon.restore();
|
||||
});
|
||||
|
||||
it("should handle a successful ping response", async function () {
|
||||
it("should handle a successful ping response", async () => {
|
||||
const response = { alive: true };
|
||||
const responseTime = 0;
|
||||
pingMock.promise.probe.resolves(response);
|
||||
pingMock.promise.probe.returns(response);
|
||||
logAndStoreCheckStub.resolves();
|
||||
await networkService.handlePing(job);
|
||||
expect(
|
||||
logAndStoreCheckStub.calledOnceWith(
|
||||
{
|
||||
monitorId: job.data._id,
|
||||
status: true,
|
||||
responseTime,
|
||||
message: successMessages.PING_SUCCESS,
|
||||
},
|
||||
networkService.db.createCheck
|
||||
)
|
||||
logAndStoreCheckStub.calledOnceWith({
|
||||
monitorId: job.data._id,
|
||||
status: response.alive,
|
||||
responseTime,
|
||||
message: successMessages.PING_SUCCESS,
|
||||
})
|
||||
).to.be.true;
|
||||
expect(handleStatusUpdateStub.calledOnceWith(job, true)).to.be.true;
|
||||
});
|
||||
it("should handle a successful ping response and isAlive === false", async function () {
|
||||
it("should handle a successful response and isAlive === false", async function () {
|
||||
const response = { alive: false };
|
||||
const responseTime = 0;
|
||||
pingMock.promise.probe.resolves(response);
|
||||
logAndStoreCheckStub.resolves();
|
||||
|
||||
await networkService.handlePing(job);
|
||||
console.log(logAndStoreCheckStub.getCall(0).args[0]);
|
||||
expect(
|
||||
logAndStoreCheckStub.calledOnceWith(
|
||||
{
|
||||
monitorId: job.data._id,
|
||||
status: false,
|
||||
responseTime,
|
||||
message: errorMessages.PING_CANNOT_RESOLVE,
|
||||
},
|
||||
networkService.db.createCheck
|
||||
)
|
||||
logAndStoreCheckStub.calledOnceWith({
|
||||
monitorId: job.data._id,
|
||||
status: false,
|
||||
responseTime,
|
||||
message: errorMessages.PING_CANNOT_RESOLVE,
|
||||
})
|
||||
).to.be.true;
|
||||
expect(handleStatusUpdateStub.calledOnceWith(job, false)).to.be.true;
|
||||
});
|
||||
|
||||
@@ -84,7 +84,6 @@ describe("SettingsService", () => {
|
||||
try {
|
||||
await settingsService.loadSettings();
|
||||
} catch (error) {
|
||||
console.log(error);
|
||||
expect(error.message).to.equal("Test error");
|
||||
expect(error.service).to.equal("OTHER_SERVICE");
|
||||
expect(error.method).to.equal("otherMethod");
|
||||
|
||||
Reference in New Issue
Block a user