Update tests for jobQueue

This commit is contained in:
Alex Holliday
2024-10-28 14:20:51 +08:00
parent 5d6cfb16bc
commit 3a2bdbbffa

View File

@@ -44,21 +44,41 @@ class WorkerStub {
}
describe("JobQueue", () => {
let settingsService, logger, db, networkService;
let settingsService,
logger,
db,
networkService,
statusService,
notificationService,
jobQueue;
beforeEach(() => {
beforeEach(async () => {
settingsService = { getSettings: sinon.stub() };
statusService = { updateStatus: sinon.stub() };
notificationService = { handleNotifications: sinon.stub() };
logger = { error: sinon.stub(), info: sinon.stub() };
db = {
getAllMonitors: sinon.stub().returns([]),
getMaintenanceWindowsByMonitorId: sinon.stub().returns([]),
};
networkService = { getStatus: sinon.stub() };
jobQueue = await JobQueue.createJobQueue(
db,
networkService,
statusService,
notificationService,
settingsService,
logger,
QueueStub,
WorkerStub
);
});
afterEach(() => {
sinon.restore();
});
describe("createJobQueue", () => {
it("should create a new JobQueue and add jobs for active monitors", async () => {
db.getAllMonitors.returns([
@@ -68,6 +88,8 @@ describe("JobQueue", () => {
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
statusService,
notificationService,
settingsService,
logger,
QueueStub,
@@ -81,9 +103,11 @@ describe("JobQueue", () => {
it("should reject with an error if an error occurs", async () => {
db.getAllMonitors.throws("Error");
try {
await JobQueue.createJobQueue(
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
statusService,
notificationService,
settingsService,
logger,
QueueStub,
@@ -94,6 +118,7 @@ describe("JobQueue", () => {
expect(error.method).to.equal("createJobQueue");
}
});
it("should reject with an error if an error occurs, should not overwrite error data", async () => {
const error = new Error("Error");
error.service = "otherService";
@@ -101,9 +126,11 @@ describe("JobQueue", () => {
db.getAllMonitors.throws(error);
try {
await JobQueue.createJobQueue(
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
statusService,
notificationService,
settingsService,
logger,
QueueStub,
@@ -115,58 +142,53 @@ describe("JobQueue", () => {
}
});
});
describe("Constructor", () => {
it("should construct a new JobQueue with default port and host if not provided", () => {
it("should construct a new JobQueue with default port and host if not provided", async () => {
settingsService.getSettings.returns({});
const jobQueue = new JobQueue(settingsService, logger, QueueStub, WorkerStub);
expect(jobQueue.connection.host).to.equal("127.0.0.1");
expect(jobQueue.connection.port).to.equal(6379);
});
it("should construct a new JobQueue with provided port and host", () => {
it("should construct a new JobQueue with provided port and host", async () => {
settingsService.getSettings.returns({ redisHost: "localhost", redisPort: 1234 });
const jobQueue = new JobQueue(settingsService, logger, QueueStub, WorkerStub);
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
statusService,
notificationService,
settingsService,
logger,
QueueStub,
WorkerStub
);
expect(jobQueue.connection.host).to.equal("localhost");
expect(jobQueue.connection.port).to.equal(1234);
});
});
describe("createWorker", () => {
it("should create a new worker", async () => {
const jobQueue = new JobQueue(settingsService, logger, QueueStub, WorkerStub);
const worker = jobQueue.createWorker();
expect(worker).to.be.instanceOf(WorkerStub);
});
it("worker should handle a maintenanceWindow error", async () => {
describe("isMaintenanceWindow", () => {
it("should throw an error if error occurs", async () => {
db.getMaintenanceWindowsByMonitorId.throws("Error");
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
statusService,
notificationService,
settingsService,
logger,
QueueStub,
WorkerStub
);
const worker = jobQueue.createWorker();
await worker.workerTask();
expect(logger.error.calledOnce).to.be.true;
try {
jobQueue.isInMaintenanceWindow(1);
} catch (error) {
expect(error.service).to.equal("JobQueue");
expect(error.method).to.equal("createWorker");
}
});
it("worker should handle a maintenanceWindow that is not active", async () => {
db.getMaintenanceWindowsByMonitorId.returns([
{ start: 123, end: 123, repeat: 123456 },
]);
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
settingsService,
logger,
QueueStub,
WorkerStub
);
const worker = jobQueue.createWorker();
await worker.workerTask();
expect(networkService.getStatus.calledOnce).to.be.true;
});
it("worker should handle a maintenanceWindow that is active", async () => {
it("should return true if in maintenance window with no repeat", async () => {
db.getMaintenanceWindowsByMonitorId.returns([
{
active: true,
@@ -178,42 +200,193 @@ describe("JobQueue", () => {
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
statusService,
notificationService,
settingsService,
logger,
QueueStub,
WorkerStub
);
const worker = jobQueue.createWorker();
await worker.workerTask();
expect(networkService.getStatus.calledOnce).to.be.false;
const inWindow = await jobQueue.isInMaintenanceWindow(1);
expect(inWindow).to.be.true;
});
it("worker should handle a maintenanceWindow that is active, has a repeat, but is not in maintenance zone", async () => {
it("should return true if in maintenance window with repeat", async () => {
db.getMaintenanceWindowsByMonitorId.returns([
{
active: true,
start: new Date(Date.now() - 10000).toISOString(),
end: new Date(Date.now() + 5000).toISOString(),
repeat: 10000,
end: new Date(Date.now() - 5000).toISOString(),
repeat: 1000,
},
]);
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
statusService,
notificationService,
settingsService,
logger,
QueueStub,
WorkerStub
);
const worker = jobQueue.createWorker();
await worker.workerTask();
expect(networkService.getStatus.calledOnce).to.be.true;
const inWindow = await jobQueue.isInMaintenanceWindow(1);
expect(inWindow).to.be.true;
});
it("should return false if in end < start", async () => {
db.getMaintenanceWindowsByMonitorId.returns([
{
active: true,
start: new Date(Date.now() - 5000).toISOString(),
end: new Date(Date.now() - 10000).toISOString(),
repeat: 1000,
},
]);
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
statusService,
notificationService,
settingsService,
logger,
QueueStub,
WorkerStub
);
const inWindow = await jobQueue.isInMaintenanceWindow(1);
expect(inWindow).to.be.false;
});
it("should return false if not in maintenance window", async () => {
db.getMaintenanceWindowsByMonitorId.returns([
{
active: false,
start: new Date(Date.now() - 5000).toISOString(),
end: new Date(Date.now() - 10000).toISOString(),
repeat: 1000,
},
]);
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
statusService,
notificationService,
settingsService,
logger,
QueueStub,
WorkerStub
);
const inWindow = await jobQueue.isInMaintenanceWindow(1);
expect(inWindow).to.be.false;
});
});
describe("createJobHandler", () => {
it("resolve to an error if an error is thrown within", async () => {
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
statusService,
notificationService,
settingsService,
logger,
QueueStub,
WorkerStub
);
jobQueue.isInMaintenanceWindow = sinon.stub().throws("Error");
try {
const handler = jobQueue.createJobHandler();
await handler({ data: { _id: 1 } });
} catch (error) {
expect(error.service).to.equal("JobQueue");
expect(error.details).to.equal(`Error processing job 1: Error`);
}
});
it("should log info if job is in maintenance window", async () => {
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
statusService,
notificationService,
settingsService,
logger,
QueueStub,
WorkerStub
);
jobQueue.isInMaintenanceWindow = sinon.stub().returns(true);
const handler = jobQueue.createJobHandler();
await handler({ data: { _id: 1 } });
expect(logger.info.calledOnce).to.be.true;
expect(logger.info.firstCall.args[0].message).to.equal(
"Monitor 1 is in maintenance window"
);
});
it("should return if status has not changed", async () => {
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
statusService,
notificationService,
settingsService,
logger,
QueueStub,
WorkerStub
);
jobQueue.isInMaintenanceWindow = sinon.stub().returns(false);
statusService.updateStatus = sinon.stub().returns({ statusChanged: false });
const handler = jobQueue.createJobHandler();
await handler({ data: { _id: 1 } });
expect(jobQueue.notificationService.handleNotifications.notCalled).to.be.true;
});
it("should return if status has changed, but prevStatus was undefined (monitor paused)", async () => {
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
statusService,
notificationService,
settingsService,
logger,
QueueStub,
WorkerStub
);
jobQueue.isInMaintenanceWindow = sinon.stub().returns(false);
statusService.updateStatus = sinon
.stub()
.returns({ statusChanged: true, prevStatus: undefined });
const handler = jobQueue.createJobHandler();
await handler({ data: { _id: 1 } });
expect(jobQueue.notificationService.handleNotifications.notCalled).to.be.true;
});
it("should call notification service if status changed and monitor was not paused", async () => {
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
statusService,
notificationService,
settingsService,
logger,
QueueStub,
WorkerStub
);
jobQueue.isInMaintenanceWindow = sinon.stub().returns(false);
statusService.updateStatus = sinon
.stub()
.returns({ statusChanged: true, prevStatus: false });
const handler = jobQueue.createJobHandler();
await handler({ data: { _id: 1 } });
expect(jobQueue.notificationService.handleNotifications.calledOnce).to.be.true;
});
});
describe("getWorkerStats", () => {
it("should throw an error if getRepeatable Jobs fails", async () => {
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
statusService,
notificationService,
settingsService,
logger,
QueueStub,
@@ -233,6 +406,8 @@ describe("JobQueue", () => {
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
statusService,
notificationService,
settingsService,
logger,
QueueStub,
@@ -252,11 +427,14 @@ describe("JobQueue", () => {
}
});
});
describe("scaleWorkers", () => {
it("should scale workers to 5 if no workers", async () => {
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
statusService,
notificationService,
settingsService,
logger,
QueueStub,
@@ -268,6 +446,8 @@ describe("JobQueue", () => {
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
statusService,
notificationService,
settingsService,
logger,
QueueStub,
@@ -286,6 +466,8 @@ describe("JobQueue", () => {
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
statusService,
notificationService,
settingsService,
logger,
QueueStub,
@@ -309,6 +491,8 @@ describe("JobQueue", () => {
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
statusService,
notificationService,
settingsService,
logger,
QueueStub,
@@ -329,6 +513,8 @@ describe("JobQueue", () => {
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
statusService,
notificationService,
settingsService,
logger,
QueueStub,
@@ -345,6 +531,8 @@ describe("JobQueue", () => {
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
statusService,
notificationService,
settingsService,
logger,
QueueStub,
@@ -357,6 +545,8 @@ describe("JobQueue", () => {
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
statusService,
notificationService,
settingsService,
logger,
QueueStub,
@@ -377,6 +567,8 @@ describe("JobQueue", () => {
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
statusService,
notificationService,
settingsService,
logger,
QueueStub,
@@ -400,26 +592,10 @@ describe("JobQueue", () => {
describe("getJobStats", async () => {
it("should return job stats for no jobs", async () => {
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
settingsService,
logger,
QueueStub,
WorkerStub
);
const jobStats = await jobQueue.getJobStats();
expect(jobStats).to.deep.equal({ jobs: [], workers: 5 });
});
it("should return job stats for jobs", async () => {
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
settingsService,
logger,
QueueStub,
WorkerStub
);
jobQueue.queue.getJobs = async () => {
return [{ data: { url: "test" }, getState: async () => "completed" }];
};
@@ -430,14 +606,6 @@ describe("JobQueue", () => {
});
});
it("should reject with an error if mapping jobs fails", async () => {
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
settingsService,
logger,
QueueStub,
WorkerStub
);
jobQueue.queue.getJobs = async () => {
return [
{
@@ -457,14 +625,6 @@ describe("JobQueue", () => {
}
});
it("should reject with an error if mapping jobs fails but respect existing error data", async () => {
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
settingsService,
logger,
QueueStub,
WorkerStub
);
jobQueue.queue.getJobs = async () => {
return [
{
@@ -490,26 +650,10 @@ describe("JobQueue", () => {
describe("addJob", async () => {
it("should add a job to the queue", async () => {
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
settingsService,
logger,
QueueStub,
WorkerStub
);
jobQueue.addJob("test", { url: "test" });
expect(jobQueue.queue.jobs.length).to.equal(1);
});
it("should reject with an error if adding fails", async () => {
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
settingsService,
logger,
QueueStub,
WorkerStub
);
jobQueue.queue.add = async () => {
throw new Error("Error adding job");
};
@@ -522,14 +666,6 @@ describe("JobQueue", () => {
}
});
it("should reject with an error if adding fails but respect existing error data", async () => {
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
settingsService,
logger,
QueueStub,
WorkerStub
);
jobQueue.queue.add = async () => {
const error = new Error("Error adding job");
error.service = "otherService";
@@ -547,14 +683,6 @@ describe("JobQueue", () => {
});
describe("deleteJob", async () => {
it("should delete a job from the queue", async () => {
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
settingsService,
logger,
QueueStub,
WorkerStub
);
jobQueue.getWorkerStats = sinon.stub().returns({ load: 1, jobs: [{}] });
jobQueue.scaleWorkers = sinon.stub();
const monitor = { _id: 1 };
@@ -567,14 +695,6 @@ describe("JobQueue", () => {
// expect(jobQueue.scaleWorkers.calledOnce).to.be.true;
});
it("should log an error if job is not found", async () => {
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
settingsService,
logger,
QueueStub,
WorkerStub
);
jobQueue.getWorkerStats = sinon.stub().returns({ load: 1, jobs: [{}] });
jobQueue.scaleWorkers = sinon.stub();
const monitor = { _id: 1 };
@@ -584,14 +704,6 @@ describe("JobQueue", () => {
expect(logger.error.calledOnce).to.be.true;
});
it("should reject with an error if removeRepeatable fails", async () => {
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
settingsService,
logger,
QueueStub,
WorkerStub
);
jobQueue.queue.removeRepeatable = async () => {
const error = new Error("removeRepeatable error");
throw error;
@@ -606,14 +718,6 @@ describe("JobQueue", () => {
}
});
it("should reject with an error if removeRepeatable fails but respect existing error data", async () => {
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
settingsService,
logger,
QueueStub,
WorkerStub
);
jobQueue.queue.removeRepeatable = async () => {
const error = new Error("removeRepeatable error");
error.service = "otherService";
@@ -632,14 +736,6 @@ describe("JobQueue", () => {
});
describe("getMetrics", () => {
it("should return metrics for the job queue", async () => {
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
settingsService,
logger,
QueueStub,
WorkerStub
);
jobQueue.queue.getWaitingCount = async () => 1;
jobQueue.queue.getActiveCount = async () => 2;
jobQueue.queue.getCompletedCount = async () => 3;
@@ -657,14 +753,6 @@ describe("JobQueue", () => {
});
});
it("should log an error if metrics operations fail", async () => {
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
settingsService,
logger,
QueueStub,
WorkerStub
);
jobQueue.queue.getWaitingCount = async () => {
throw new Error("Error");
};
@@ -676,14 +764,6 @@ describe("JobQueue", () => {
describe("obliterate", () => {
it("should return true if obliteration is successful", async () => {
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
settingsService,
logger,
QueueStub,
WorkerStub
);
jobQueue.queue.pause = async () => true;
jobQueue.getJobs = async () => [{ key: 1, id: 1 }];
jobQueue.queue.removeRepeatableByKey = async () => true;
@@ -693,15 +773,6 @@ describe("JobQueue", () => {
expect(obliteration).to.be.true;
});
it("should throw an error if obliteration fails", async () => {
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
settingsService,
logger,
QueueStub,
WorkerStub
);
jobQueue.getMetrics = async () => {
throw new Error("Error");
};
@@ -714,15 +785,6 @@ describe("JobQueue", () => {
}
});
it("should throw an error if obliteration fails but respect existing error data", async () => {
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
settingsService,
logger,
QueueStub,
WorkerStub
);
jobQueue.getMetrics = async () => {
const error = new Error("Error");
error.service = "otherService";