use super-simple-queue

This commit is contained in:
Alex Holliday
2025-06-27 12:38:06 +08:00
parent 4c5bb66c3d
commit f1c6beacf2
6 changed files with 1185 additions and 703 deletions

View File

@@ -53,6 +53,9 @@ import { Queue, Worker } from "bullmq";
import PulseQueue from "./service/PulseQueue/PulseQueue.js";
import PulseQueueHelper from "./service/PulseQueue/PulseQueueHelper.js";
import SuperSimpleQueue from "./service/SuperSimpleQueue/SuperSimpleQueue.js";
import SuperSimpleQueueHelper from "./service/SuperSimpleQueue/SuperSimpleQueueHelper.js";
//Network service and dependencies
import NetworkService from "./service/networkService.js";
import axios from "axios";
@@ -210,23 +213,38 @@ const startApp = async () => {
// stringService,
// });
const pulseQueueHelper = new PulseQueueHelper({
// const pulseQueueHelper = new PulseQueueHelper({
// db,
// logger,
// networkService,
// statusService,
// notificationService,
// });
// const pulseQueue = await PulseQueue.create({
// appSettings,
// db,
// pulseQueueHelper,
// logger,
// });
const superSimpleQueueHelper = new SuperSimpleQueueHelper({
db,
logger,
networkService,
statusService,
notificationService,
});
const pulseQueue = await PulseQueue.create({
const superSimpleQueue = await SuperSimpleQueue.create({
appSettings,
db,
pulseQueueHelper,
logger,
helper: superSimpleQueueHelper,
});
// Register services
// ServiceRegistry.register(JobQueue.SERVICE_NAME, jobQueue);
ServiceRegistry.register(JobQueue.SERVICE_NAME, pulseQueue);
// ServiceRegistry.register(JobQueue.SERVICE_NAME, pulseQueue);
ServiceRegistry.register(JobQueue.SERVICE_NAME, superSimpleQueue);
ServiceRegistry.register(MongoDB.SERVICE_NAME, db);
ServiceRegistry.register(SettingsService.SERVICE_NAME, settingsService);
ServiceRegistry.register(EmailService.SERVICE_NAME, emailService);

1614
server/package-lock.json generated Executable file → Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -40,6 +40,7 @@
"ping": "0.4.4",
"sharp": "0.33.5",
"ssl-checker": "2.0.10",
"super-simple-scheduler": "1.0.7",
"swagger-ui-express": "5.0.1",
"winston": "^3.13.0"
},

View File

@@ -0,0 +1,146 @@
import Scheduler from "super-simple-scheduler";
const SERVICE_NAME = "SuperSimpleQueue";
class SuperSimpleQueue {
static SERVICE_NAME = SERVICE_NAME;
constructor({ appSettings, db, logger, helper }) {
this.appSettings = appSettings;
this.db = db;
this.logger = logger;
this.helper = helper;
}
static async create({ appSettings, db, logger, helper }) {
const instance = new SuperSimpleQueue({ appSettings, db, logger, helper });
await instance.init();
return instance;
}
init = async () => {
this.scheduler = new Scheduler({
logLevel: process.env.LOG_LEVEL,
debug: process.env.NODE_ENV,
});
this.scheduler.start();
this.scheduler.addTemplate("test", this.helper.getMonitorJob());
const monitors = await this.db.getAllMonitors();
for (const monitor of monitors) {
await this.addJob(monitor._id, monitor);
}
};
addJob = async (monitorId, monitor) => {
this.scheduler.addJob({
id: monitorId.toString(),
template: "test",
repeat: monitor.interval,
data: monitor,
});
};
deleteJob = async (monitor) => {
this.scheduler.removeJob(monitor._id.toString());
};
pauseJob = async (monitor) => {
const result = this.scheduler.pauseJob(monitor._id.toString());
if (result === false) {
throw new Error("Failed to resume monitor");
}
this.logger.debug({
message: `Paused monitor ${monitor._id}`,
service: SERVICE_NAME,
method: "pauseJob",
});
};
resumeJob = async (monitor) => {
const result = this.scheduler.resumeJob(monitor._id.toString());
if (result === false) {
throw new Error("Failed to resume monitor");
}
this.logger.debug({
message: `Resumed monitor ${monitor._id}`,
service: SERVICE_NAME,
method: "resumeJob",
});
};
updateJob = async (monitor) => {
this.scheduler.updateJob(monitor._id.toString(), monitor.interval);
};
shutdown = async () => {
this.scheduler.stop();
};
getMetrics = async () => {
const jobs = this.scheduler.getJobs();
const metrics = jobs.reduce(
(acc, job) => {
acc.totalRuns += job.runCount || 0;
acc.totalFailures += job.failCount || 0;
acc.jobs++;
if (job.failCount > 0 && job.lastFailedAt >= job.lsatRunAt) {
acc.failingJobs++;
}
if (job.lockedAt) {
acc.activeJobs++;
}
if (job.failCount > 0) {
acc.jobsWithFailures.push({
monitorId: job.id,
monitorUrl: job?.data?.url || null,
monitorType: job?.data?.type || null,
failedAt: job.lastFailedAt,
failCount: job.failCount,
failReason: job.lastFailReason,
});
}
return acc;
},
{
jobs: 0,
activeJobs: 0,
failingJobs: 0,
jobsWithFailures: [],
totalRuns: 0,
totalFailures: 0,
}
);
return metrics;
};
getJobs = async () => {
const jobs = this.scheduler.getJobs();
return jobs.map((job) => {
return {
monitorId: job.id,
monitorUrl: job?.data?.url || null,
monitorType: job?.data?.type || null,
active: job.active,
lockedAt: job.lockedAt,
runCount: job.runCount || 0,
failCount: job.failCount || 0,
failReason: job.lastFailReason,
lastRunAt: job.lastRunAt,
lastFinishedAt: job.lastFinishedAt,
lastRunTook: job.lockedAt ? null : job.lastFinishedAt - job.lastRunAt,
lastFailedAt: job.lastFailedAt,
};
});
};
flushQueues = async () => {
console.log("flush not implemented");
};
obliterate = async () => {
console.log("obliterate not implemented");
};
}
export default SuperSimpleQueue;

View File

@@ -0,0 +1,99 @@
const SERVICE_NAME = "SuperSimpleQueueHelper";
class SuperSimpleQueueHelper {
constructor({ db, logger, networkService, statusService, notificationService }) {
this.db = db;
this.logger = logger;
this.networkService = networkService;
this.statusService = statusService;
this.notificationService = notificationService;
}
getMonitorJob = () => {
return async (monitor) => {
try {
const monitorId = monitor._id;
if (!monitorId) {
throw new Error("No monitor id");
}
const maintenanceWindowActive = await this.isInMaintenanceWindow(monitorId);
if (maintenanceWindowActive) {
this.logger.info({
message: `Monitor ${monitorId} is in maintenance window`,
service: SERVICE_NAME,
method: "getMonitorJob",
});
return;
}
const networkResponse = await this.networkService.getStatus(monitor);
if (!networkResponse) {
throw new Error("No network response");
}
const {
monitor: updatedMonitor,
statusChanged,
prevStatus,
} = await this.statusService.updateStatus(networkResponse);
this.notificationService
.handleNotifications({
...networkResponse,
monitor: updatedMonitor,
prevStatus,
statusChanged,
})
.catch((error) => {
this.logger.error({
message: error.message,
service: SERVICE_NAME,
method: "getMonitorJob",
details: `Error sending notifications for job ${monitor._id}: ${error.message}`,
stack: error.stack,
});
});
} catch (error) {
this.logger.warn({
message: error.message,
service: error.service || SERVICE_NAME,
method: error.method || "getMonitorJob",
stack: error.stack,
});
throw error;
}
};
};
async isInMaintenanceWindow(monitorId) {
const maintenanceWindows = await this.db.getMaintenanceWindowsByMonitorId(monitorId);
// Check for active maintenance window:
const maintenanceWindowIsActive = maintenanceWindows.reduce((acc, window) => {
if (window.active) {
const start = new Date(window.start);
const end = new Date(window.end);
const now = new Date();
const repeatInterval = window.repeat || 0;
// If start is < now and end > now, we're in maintenance
if (start <= now && end >= now) return true;
// If maintenance window was set in the past with a repeat,
// we need to advance start and end to see if we are in range
while (start < now && repeatInterval !== 0) {
start.setTime(start.getTime() + repeatInterval);
end.setTime(end.getTime() + repeatInterval);
if (start <= now && end >= now) {
return true;
}
}
return false;
}
return acc;
}, false);
return maintenanceWindowIsActive;
}
}
export default SuperSimpleQueueHelper;

View File

@@ -256,7 +256,7 @@ class NetworkService {
async requestPagespeed(monitor) {
try {
const url = monitor.url;
const updatedMonitor = { ...monitor };
const updatedMonitor = monitor.toObject();
let pagespeedUrl = `https://pagespeedonline.googleapis.com/pagespeedonline/v5/runPagespeed?url=${url}&category=seo&category=accessibility&category=best-practices&category=performance`;
const dbSettings = await this.settingsService.getDBSettings();