convert super simple queue to ts

This commit is contained in:
Alex Holliday
2026-01-15 21:49:48 +00:00
parent 206dcae196
commit 5a7b51bfec
4 changed files with 256 additions and 181 deletions
+2 -2
View File
@@ -87,8 +87,8 @@ const Incidents2 = () => {
useEffect(() => {
const lookup = monitors?.reduce((acc, monitor) => {
acc[monitor._id] = {
_id: monitor._id,
acc[monitor.id] = {
_id: monitor.id,
name: monitor.name,
type: monitor.type,
};
-2
View File
@@ -211,8 +211,6 @@ export const initializeServices = async ({
});
const superSimpleQueue = await SuperSimpleQueue.create({
envSettings,
db,
logger,
helper: superSimpleQueueHelper,
monitorsRepository,
@@ -1,177 +0,0 @@
import Scheduler from "super-simple-scheduler";
const SERVICE_NAME = "JobQueue";
class SuperSimpleQueue {
static SERVICE_NAME = SERVICE_NAME;
constructor({ envSettings, db, logger, helper, monitorsRepository }) {
this.envSettings = envSettings;
this.db = db;
this.logger = logger;
this.helper = helper;
this.monitorsRepository = monitorsRepository;
}
get serviceName() {
return SuperSimpleQueue.SERVICE_NAME;
}
static async create({ envSettings, db, logger, helper, monitorsRepository }) {
const instance = new SuperSimpleQueue({ envSettings, db, logger, helper, monitorsRepository });
await instance.init();
return instance;
}
init = async () => {
try {
this.scheduler = new Scheduler({
// storeType: "mongo",
// storeType: "redis",
logLevel: "debug",
debug: true,
// dbUri: this.envSettings.dbConnectionString,
});
this.scheduler.start();
this.scheduler.addTemplate("monitor-job", this.helper.getMonitorJob());
const monitors = await this.monitorsRepository.findAll();
for (const monitor of monitors) {
const randomOffset = Math.floor(Math.random() * 100);
setTimeout(() => {
this.addJob(monitor.id, monitor);
}, randomOffset);
}
return true;
} catch (error) {
this.logger.error({
message: "Failed to initialize SuperSimpleQueue",
service: SERVICE_NAME,
method: "init",
details: error,
});
return false;
}
};
addJob = async (monitorId, monitor) => {
this.scheduler.addJob({
id: monitorId,
template: "monitor-job",
repeat: monitor.interval,
active: monitor.isActive,
data: monitor,
});
};
deleteJob = async (monitor) => {
this.scheduler.removeJob(monitor.id);
};
pauseJob = async (monitor) => {
const result = this.scheduler.pauseJob(monitor.id);
if (result === false) {
throw new Error("Failed to resume monitor");
}
this.logger.debug({
message: `Paused monitor ${monitor.id}`,
service: SERVICE_NAME,
method: "pauseJob",
});
};
resumeJob = async (monitor) => {
const result = this.scheduler.resumeJob(monitor.id);
if (result === false) {
throw new Error("Failed to resume monitor");
}
this.logger.debug({
message: `Resumed monitor ${monitor.id}`,
service: SERVICE_NAME,
method: "resumeJob",
});
};
updateJob = async (monitor) => {
this.scheduler.updateJob(monitor.id, { repeat: monitor.interval, data: monitor });
};
shutdown = async () => {
this.scheduler.stop();
};
getMetrics = async () => {
const jobs = await this.scheduler.getJobs();
const metrics = jobs.reduce(
(acc, job) => {
acc.totalRuns += job.runCount || 0;
acc.totalFailures += job.failCount || 0;
acc.jobs++;
if (job.failCount > 0 && job.lastFailedAt >= job.lsatRunAt) {
acc.failingJobs++;
}
if (job.lockedAt) {
acc.activeJobs++;
}
if (job.failCount > 0) {
acc.jobsWithFailures.push({
monitorId: job.id,
monitorUrl: job?.data?.url || null,
monitorType: job?.data?.type || null,
failedAt: job.lastFailedAt,
failCount: job.failCount,
failReason: job.lastFailReason,
});
}
return acc;
},
{
jobs: 0,
activeJobs: 0,
failingJobs: 0,
jobsWithFailures: [],
totalRuns: 0,
totalFailures: 0,
}
);
return metrics;
};
getJobs = async () => {
const jobs = await this.scheduler.getJobs();
return jobs.map((job) => {
return {
monitorId: job.id,
monitorUrl: job?.data?.url || null,
monitorType: job?.data?.type || null,
monitorInterval: job?.data?.interval || null,
active: job.active,
lockedAt: job.lockedAt,
runCount: job.runCount || 0,
failCount: job.failCount || 0,
failReason: job.lastFailReason,
lastRunAt: job.lastRunAt,
lastFinishedAt: job.lastFinishedAt,
lastRunTook: job.lockedAt ? null : job.lastFinishedAt - job.lastRunAt,
lastFailedAt: job.lastFailedAt,
};
});
};
flushQueues = async () => {
const stopRes = this.scheduler.stop();
const flushRes = this.scheduler.flushJobs();
const initRes = await this.init();
return {
success: stopRes && flushRes && initRes,
};
};
obliterate = async () => {
console.log("obliterate not implemented");
};
}
export default SuperSimpleQueue;
@@ -0,0 +1,254 @@
import { IMonitorsRepository } from "@/repositories/index.js";
import Scheduler from "super-simple-scheduler";
const SERVICE_NAME = "JobQueue";
type QueueJobFailure = {
monitorId: string | number;
monitorUrl: string | null;
monitorType: string | null;
failedAt: number | null;
failCount: number;
failReason: string | null;
};
type QueueMetrics = {
jobs: number;
activeJobs: number;
failingJobs: number;
jobsWithFailures: QueueJobFailure[];
totalRuns: number;
totalFailures: number;
};
type QueueJobSummary = {
monitorId: string | number;
monitorUrl: string | null;
monitorType: string | null;
monitorInterval: number | null;
active: boolean;
lockedAt: number | null;
runCount: number;
failCount: number;
failReason: string | null;
lastRunAt: number | null;
lastFinishedAt: number | null;
lastRunTook: number | null;
lastFailedAt: number | null;
};
export interface ISuperSimpleQueue {
readonly serviceName: string;
init(): Promise<boolean>;
addJob(monitorId: string, monitor: any): Promise<void>;
deleteJob(monitor: any): Promise<void>;
pauseJob(monitor: any): Promise<void>;
resumeJob(monitor: any): Promise<void>;
updateJob(monitor: any): Promise<void>;
shutdown(): Promise<void>;
getMetrics(): Promise<QueueMetrics>;
getJobs(): Promise<QueueJobSummary[]>;
flushQueues(): Promise<{ success: boolean }>;
obliterate(): Promise<void>;
}
class SuperSimpleQueue implements ISuperSimpleQueue {
static SERVICE_NAME = SERVICE_NAME;
private logger: any;
private helper: any;
private monitorsRepository: IMonitorsRepository;
private readonly scheduler: Scheduler;
constructor({
logger,
helper,
monitorsRepository,
scheduler,
}: {
logger: any;
helper: any;
monitorsRepository: IMonitorsRepository;
scheduler: Scheduler;
}) {
this.logger = logger;
this.helper = helper;
this.monitorsRepository = monitorsRepository;
this.scheduler = scheduler;
}
get serviceName() {
return SuperSimpleQueue.SERVICE_NAME;
}
static async create({ logger, helper, monitorsRepository }: { logger: any; helper: any; monitorsRepository: IMonitorsRepository }) {
const scheduler = new Scheduler({
// storeType: "mongo",
// storeType: "redis",
logLevel: "debug",
// dbUri: envSettings.dbConnectionString,
});
const instance = new SuperSimpleQueue({ logger, helper, monitorsRepository, scheduler });
await instance.init();
return instance;
}
init = async () => {
try {
this.scheduler.start();
this.scheduler.addTemplate("monitor-job", this.helper.getMonitorJob());
const monitors = await this.monitorsRepository.findAll();
if (!monitors) {
return true;
}
for (const monitor of monitors) {
const randomOffset = Math.floor(Math.random() * 100);
setTimeout(() => {
this.addJob(monitor.id, monitor);
}, randomOffset);
}
return true;
} catch (error) {
this.logger.error({
message: "Failed to initialize SuperSimpleQueue",
service: SERVICE_NAME,
method: "init",
details: error,
});
return false;
}
};
addJob = async (monitorId: string, monitor: any) => {
this.scheduler.addJob({
id: monitorId,
template: "monitor-job",
repeat: monitor.interval,
active: monitor.isActive,
data: monitor,
});
};
deleteJob = async (monitor: any) => {
this.scheduler.removeJob(monitor.id);
};
pauseJob = async (monitor: any) => {
const result = await this.scheduler.pauseJob(monitor.id);
if (result === false) {
throw new Error("Failed to resume monitor");
}
this.logger.debug({
message: `Paused monitor ${monitor.id}`,
service: SERVICE_NAME,
method: "pauseJob",
});
};
resumeJob = async (monitor: any) => {
const result = await this.scheduler.resumeJob(monitor.id);
if (result === false) {
throw new Error("Failed to resume monitor");
}
this.logger.debug({
message: `Resumed monitor ${monitor.id}`,
service: SERVICE_NAME,
method: "resumeJob",
});
};
updateJob = async (monitor: any) => {
this.scheduler.updateJob(monitor.id, { repeat: monitor.interval, data: monitor });
};
shutdown = async () => {
this.scheduler.stop();
};
getMetrics = async () => {
const jobs = await this.scheduler.getJobs();
const metrics = jobs.reduce(
(acc, job) => {
const runCount = job.runCount ?? 0;
const failCount = job.failCount ?? 0;
const lastFailedAt = job.lastFailedAt ?? 0;
const lastRunAt = job.lastRunAt ?? 0;
acc.totalRuns += runCount;
acc.totalFailures += failCount;
acc.jobs++;
if (failCount > 0 && lastFailedAt >= lastRunAt) {
acc.failingJobs++;
}
if (job.lockedAt) {
acc.activeJobs++;
}
if (failCount > 0) {
acc.jobsWithFailures.push({
monitorId: job.id,
monitorUrl: job?.data?.url || null,
monitorType: job?.data?.type || null,
failedAt: job.lastFailedAt ?? null,
failCount,
failReason: job.lastFailReason ?? null,
});
}
return acc;
},
{
jobs: 0,
activeJobs: 0,
failingJobs: 0,
jobsWithFailures: [] as Array<{
monitorId: string | number;
monitorUrl: any;
monitorType: any;
failedAt: number | null;
failCount: number;
failReason: string | null;
}>,
totalRuns: 0,
totalFailures: 0,
}
);
return metrics;
};
getJobs = async () => {
const jobs = await this.scheduler.getJobs();
return jobs.map((job) => {
return {
monitorId: job.id,
monitorUrl: job?.data?.url || null,
monitorType: job?.data?.type || null,
monitorInterval: job?.data?.interval || null,
active: job.active,
lockedAt: job.lockedAt ?? null,
runCount: job.runCount ?? 0,
failCount: job.failCount ?? 0,
failReason: job.lastFailReason ?? null,
lastRunAt: job.lastRunAt ?? null,
lastFinishedAt: job.lastFinishedAt ?? null,
lastRunTook: job.lockedAt ? null : (job.lastFinishedAt ?? 0) - (job.lastRunAt ?? 0),
lastFailedAt: job.lastFailedAt ?? null,
};
});
};
flushQueues = async () => {
const stopRes = await this.scheduler.stop();
const flushRes = await this.scheduler.flushJobs();
const initRes = await this.init();
return {
success: Boolean(stopRes && flushRes && initRes),
};
};
obliterate = async () => {
console.log("obliterate not implemented");
};
}
export default SuperSimpleQueue;