initial Pulse implementation

This commit is contained in:
Alex Holliday
2025-06-06 14:05:59 +08:00
parent 1f9fbf7115
commit 9a22d4a971
12 changed files with 455 additions and 72 deletions
+171
View File
@@ -0,0 +1,171 @@
import { Pulse } from "@pulsecron/pulse";
import { ObjectId } from "mongodb";
const SERVICE_NAME = "PulseQueue";
class PulseQueue {
static SERVICE_NAME = SERVICE_NAME;
constructor({ appSettings, db, pulseQueueHelper, logger }) {
this.db = db;
this.appSettings = appSettings;
this.pulseQueueHelper = pulseQueueHelper;
this.logger = logger;
this.init();
}
// ****************************************
// Core methods
// ****************************************
init = async () => {
try {
const mongoConnectionString =
this.appSettings.dbConnectionString || "mongodb://localhost:27017/uptime_db";
this.pulse = new Pulse({ db: { address: mongoConnectionString } });
await this.pulse.start();
this.pulse.define("monitor-job", this.pulseQueueHelper.getMonitorJob(), {});
const monitors = await this.db.getAllMonitors();
for (const monitor of monitors) {
await this.addJob(monitor._id, monitor);
}
return true;
} catch (error) {
this.logger.error({
message: "Failed to initialize PulseQueue",
service: SERVICE_NAME,
method: "init",
details: error,
});
return false;
}
};
addJob = async (monitorId, monitor) => {
this.logger.debug({
message: `Adding job ${monitor?.url ?? "No URL"}`,
service: SERVICE_NAME,
method: "addJob",
});
const intervalInSeconds = monitor.interval / 1000;
const job = this.pulse.create("monitor-job", {
monitor,
});
job.unique({ "data.monitor._id": monitor._id });
job.attrs.jobId = monitorId.toString();
job.repeatEvery(`${intervalInSeconds} seconds`);
await job.save();
};
deleteJob = async (monitor) => {
this.logger.debug({
message: `Deleting job ${monitor?.url ?? "No URL"}`,
service: SERVICE_NAME,
method: "deleteJob",
});
const result = await this.pulse.cancel({
"data.monitor._id": monitor._id,
});
console.log(result);
};
pauseJob = async (monitor) => {
const result = await this.pulse.disable({
"data.monitor._id": monitor._id,
});
if (result.length < 1) {
throw new Error("Failed to pause monitor");
}
this.logger.debug({
message: `Paused monitor ${monitor._id}`,
service: SERVICE_NAME,
method: "pauseJob",
});
};
resumeJob = async (monitor) => {
const result = await this.pulse.enable({
"data.monitor._id": monitor._id,
});
if (result.length < 1) {
throw new Error("Failed to resume monitor");
}
this.logger.debug({
message: `Resumed monitor ${monitor._id}`,
service: SERVICE_NAME,
method: "resumeJob",
});
};
shutdown = async () => {
this.logger.info({
message: "Shutting down JobQueue",
service: SERVICE_NAME,
method: "shutdown",
});
await this.pulse.stop();
};
// ****************************************
// Diagnostic methods
// ****************************************
getMetrics = async () => {
const jobs = await this.pulse.jobs();
const metrics = jobs.reduce(
(acc, job) => {
acc.jobs++;
if (job.attrs.failCount > 0 && job.attrs.failedAt >= job.attrs.lastFinishedAt) {
acc.failingJobs++;
}
if (job.attrs.lockedAt) {
acc.activeJobs++;
}
if (job.attrs.failCount > 0) {
acc.jobsWithFailures.push({
monitorId: job.attrs.data.monitor._id,
monitorUrl: job.attrs.data.monitor.url,
failCount: job.attrs.failCount,
failReason: job.attrs.failReason,
});
}
return acc;
},
{ jobs: 0, activeJobs: 0, failingJobs: 0, jobsWithFailures: [] }
);
return metrics;
};
getJobs = async () => {
const jobs = await this.pulse.jobs();
return jobs.map((job) => {
return {
monitorId: job.attrs.data.monitor._id,
monitorUrl: job.attrs.data.monitor.url,
lockedAt: job.attrs.lockedAt,
runCount: job.attrs.runCount || 0,
failCount: job.attrs.failCount || 0,
failReason: job.attrs.failReason,
};
});
};
flushQueues = async () => {
const cancelRes = await this.pulse.cancel();
await this.pulse.stop();
const initRes = await this.init();
return {
flushedJobs: cancelRes,
initSuccess: initRes,
};
};
obliterate = async () => {
await this.flushQueues();
};
}
export default PulseQueue;
@@ -0,0 +1,106 @@
const SERVICE_NAME = "PulseQueueHelper";
class PulseQueueHelper {
constructor({ db, logger, networkService, statusService, notificationService }) {
this.db = db;
this.logger = logger;
this.networkService = networkService;
this.statusService = statusService;
this.notificationService = notificationService;
}
getMonitorJob = () => {
return async (job) => {
try {
const monitor = job.attrs.data.monitor;
const monitorId = job.attrs.data.monitor._id;
if (!monitorId) {
throw new Error("No monitor id");
}
const maintenanceWindowActive = await this.isInMaintenanceWindow(monitorId);
if (maintenanceWindowActive) {
this.logger.info({
message: `Monitor ${monitorId} is in maintenance window`,
service: SERVICE_NAME,
method: "getMonitorJob",
});
return;
}
const networkResponse = await this.networkService.getStatus(monitor);
if (monitor.type === "distributed_http" || monitor.type === "distributed_test") {
await job.updateProgress(100);
return true;
}
if (!networkResponse) {
throw new Error("No network response");
}
const {
monitor: updatedMonitor,
statusChanged,
prevStatus,
} = await this.statusService.updateStatus(networkResponse);
this.notificationService
.handleNotifications({
...networkResponse,
monitor: updatedMonitor,
prevStatus,
statusChanged,
})
.catch((error) => {
this.logger.error({
message: error.message,
service: SERVICE_NAME,
method: "createJobHandler",
details: `Error sending notifications for job ${job.id}: ${error.message}`,
stack: error.stack,
});
});
} catch (error) {
this.logger.warn({
message: error.message,
service: error.service || SERVICE_NAME,
method: error.method || "getMonitorJob",
stack: error.stack,
});
throw error;
}
};
};
async isInMaintenanceWindow(monitorId) {
const maintenanceWindows = await this.db.getMaintenanceWindowsByMonitorId(monitorId);
// Check for active maintenance window:
const maintenanceWindowIsActive = maintenanceWindows.reduce((acc, window) => {
if (window.active) {
const start = new Date(window.start);
const end = new Date(window.end);
const now = new Date();
const repeatInterval = window.repeat || 0;
// If start is < now and end > now, we're in maintenance
if (start <= now && end >= now) return true;
// If maintenance window was set in the past with a repeat,
// we need to advance start and end to see if we are in range
while (start < now && repeatInterval !== 0) {
start.setTime(start.getTime() + repeatInterval);
end.setTime(end.getTime() + repeatInterval);
if (start <= now && end >= now) {
return true;
}
}
return false;
}
return acc;
}, false);
return maintenanceWindowIsActive;
}
}
export default PulseQueueHelper;