diff --git a/client/src/Pages/Incidents2/index.jsx b/client/src/Pages/Incidents2/index.jsx index 966388b51..2f31864bf 100644 --- a/client/src/Pages/Incidents2/index.jsx +++ b/client/src/Pages/Incidents2/index.jsx @@ -87,8 +87,8 @@ const Incidents2 = () => { useEffect(() => { const lookup = monitors?.reduce((acc, monitor) => { - acc[monitor._id] = { - _id: monitor._id, + acc[monitor.id] = { + _id: monitor.id, name: monitor.name, type: monitor.type, }; diff --git a/server/src/config/services.ts b/server/src/config/services.ts index daa576bf8..90d6bb518 100644 --- a/server/src/config/services.ts +++ b/server/src/config/services.ts @@ -211,8 +211,6 @@ export const initializeServices = async ({ }); const superSimpleQueue = await SuperSimpleQueue.create({ - envSettings, - db, logger, helper: superSimpleQueueHelper, monitorsRepository, diff --git a/server/src/service/infrastructure/SuperSimpleQueue/SuperSimpleQueue.js b/server/src/service/infrastructure/SuperSimpleQueue/SuperSimpleQueue.js deleted file mode 100644 index a6637d4cc..000000000 --- a/server/src/service/infrastructure/SuperSimpleQueue/SuperSimpleQueue.js +++ /dev/null @@ -1,177 +0,0 @@ -import Scheduler from "super-simple-scheduler"; -const SERVICE_NAME = "JobQueue"; - -class SuperSimpleQueue { - static SERVICE_NAME = SERVICE_NAME; - - constructor({ envSettings, db, logger, helper, monitorsRepository }) { - this.envSettings = envSettings; - this.db = db; - this.logger = logger; - this.helper = helper; - this.monitorsRepository = monitorsRepository; - } - - get serviceName() { - return SuperSimpleQueue.SERVICE_NAME; - } - - static async create({ envSettings, db, logger, helper, monitorsRepository }) { - const instance = new SuperSimpleQueue({ envSettings, db, logger, helper, monitorsRepository }); - await instance.init(); - return instance; - } - - init = async () => { - try { - this.scheduler = new Scheduler({ - // storeType: "mongo", - // storeType: "redis", - logLevel: "debug", - debug: true, - // dbUri: this.envSettings.dbConnectionString, - }); - this.scheduler.start(); - - this.scheduler.addTemplate("monitor-job", this.helper.getMonitorJob()); - const monitors = await this.monitorsRepository.findAll(); - for (const monitor of monitors) { - const randomOffset = Math.floor(Math.random() * 100); - setTimeout(() => { - this.addJob(monitor.id, monitor); - }, randomOffset); - } - - return true; - } catch (error) { - this.logger.error({ - message: "Failed to initialize SuperSimpleQueue", - service: SERVICE_NAME, - method: "init", - details: error, - }); - return false; - } - }; - - addJob = async (monitorId, monitor) => { - this.scheduler.addJob({ - id: monitorId, - template: "monitor-job", - repeat: monitor.interval, - active: monitor.isActive, - data: monitor, - }); - }; - - deleteJob = async (monitor) => { - this.scheduler.removeJob(monitor.id); - }; - - pauseJob = async (monitor) => { - const result = this.scheduler.pauseJob(monitor.id); - if (result === false) { - throw new Error("Failed to resume monitor"); - } - this.logger.debug({ - message: `Paused monitor ${monitor.id}`, - service: SERVICE_NAME, - method: "pauseJob", - }); - }; - - resumeJob = async (monitor) => { - const result = this.scheduler.resumeJob(monitor.id); - if (result === false) { - throw new Error("Failed to resume monitor"); - } - this.logger.debug({ - message: `Resumed monitor ${monitor.id}`, - service: SERVICE_NAME, - method: "resumeJob", - }); - }; - - updateJob = async (monitor) => { - this.scheduler.updateJob(monitor.id, { repeat: monitor.interval, data: monitor }); - }; - - shutdown = async () => { - this.scheduler.stop(); - }; - - getMetrics = async () => { - const jobs = await this.scheduler.getJobs(); - const metrics = jobs.reduce( - (acc, job) => { - acc.totalRuns += job.runCount || 0; - acc.totalFailures += job.failCount || 0; - acc.jobs++; - if (job.failCount > 0 && job.lastFailedAt >= job.lsatRunAt) { - acc.failingJobs++; - } - - if (job.lockedAt) { - acc.activeJobs++; - } - - if (job.failCount > 0) { - acc.jobsWithFailures.push({ - monitorId: job.id, - monitorUrl: job?.data?.url || null, - monitorType: job?.data?.type || null, - failedAt: job.lastFailedAt, - failCount: job.failCount, - failReason: job.lastFailReason, - }); - } - return acc; - }, - { - jobs: 0, - activeJobs: 0, - failingJobs: 0, - jobsWithFailures: [], - totalRuns: 0, - totalFailures: 0, - } - ); - return metrics; - }; - - getJobs = async () => { - const jobs = await this.scheduler.getJobs(); - return jobs.map((job) => { - return { - monitorId: job.id, - monitorUrl: job?.data?.url || null, - monitorType: job?.data?.type || null, - monitorInterval: job?.data?.interval || null, - active: job.active, - lockedAt: job.lockedAt, - runCount: job.runCount || 0, - failCount: job.failCount || 0, - failReason: job.lastFailReason, - lastRunAt: job.lastRunAt, - lastFinishedAt: job.lastFinishedAt, - lastRunTook: job.lockedAt ? null : job.lastFinishedAt - job.lastRunAt, - lastFailedAt: job.lastFailedAt, - }; - }); - }; - - flushQueues = async () => { - const stopRes = this.scheduler.stop(); - const flushRes = this.scheduler.flushJobs(); - const initRes = await this.init(); - return { - success: stopRes && flushRes && initRes, - }; - }; - - obliterate = async () => { - console.log("obliterate not implemented"); - }; -} - -export default SuperSimpleQueue; diff --git a/server/src/service/infrastructure/SuperSimpleQueue/SuperSimpleQueue.ts b/server/src/service/infrastructure/SuperSimpleQueue/SuperSimpleQueue.ts new file mode 100644 index 000000000..daf7e130f --- /dev/null +++ b/server/src/service/infrastructure/SuperSimpleQueue/SuperSimpleQueue.ts @@ -0,0 +1,254 @@ +import { IMonitorsRepository } from "@/repositories/index.js"; +import Scheduler from "super-simple-scheduler"; +const SERVICE_NAME = "JobQueue"; + +type QueueJobFailure = { + monitorId: string | number; + monitorUrl: string | null; + monitorType: string | null; + failedAt: number | null; + failCount: number; + failReason: string | null; +}; + +type QueueMetrics = { + jobs: number; + activeJobs: number; + failingJobs: number; + jobsWithFailures: QueueJobFailure[]; + totalRuns: number; + totalFailures: number; +}; + +type QueueJobSummary = { + monitorId: string | number; + monitorUrl: string | null; + monitorType: string | null; + monitorInterval: number | null; + active: boolean; + lockedAt: number | null; + runCount: number; + failCount: number; + failReason: string | null; + lastRunAt: number | null; + lastFinishedAt: number | null; + lastRunTook: number | null; + lastFailedAt: number | null; +}; + +export interface ISuperSimpleQueue { + readonly serviceName: string; + init(): Promise; + addJob(monitorId: string, monitor: any): Promise; + deleteJob(monitor: any): Promise; + pauseJob(monitor: any): Promise; + resumeJob(monitor: any): Promise; + updateJob(monitor: any): Promise; + shutdown(): Promise; + getMetrics(): Promise; + getJobs(): Promise; + flushQueues(): Promise<{ success: boolean }>; + obliterate(): Promise; +} + +class SuperSimpleQueue implements ISuperSimpleQueue { + static SERVICE_NAME = SERVICE_NAME; + + private logger: any; + private helper: any; + private monitorsRepository: IMonitorsRepository; + private readonly scheduler: Scheduler; + + constructor({ + logger, + helper, + monitorsRepository, + scheduler, + }: { + logger: any; + helper: any; + monitorsRepository: IMonitorsRepository; + scheduler: Scheduler; + }) { + this.logger = logger; + this.helper = helper; + this.monitorsRepository = monitorsRepository; + this.scheduler = scheduler; + } + + get serviceName() { + return SuperSimpleQueue.SERVICE_NAME; + } + + static async create({ logger, helper, monitorsRepository }: { logger: any; helper: any; monitorsRepository: IMonitorsRepository }) { + const scheduler = new Scheduler({ + // storeType: "mongo", + // storeType: "redis", + logLevel: "debug", + // dbUri: envSettings.dbConnectionString, + }); + const instance = new SuperSimpleQueue({ logger, helper, monitorsRepository, scheduler }); + await instance.init(); + return instance; + } + + init = async () => { + try { + this.scheduler.start(); + + this.scheduler.addTemplate("monitor-job", this.helper.getMonitorJob()); + const monitors = await this.monitorsRepository.findAll(); + if (!monitors) { + return true; + } + for (const monitor of monitors) { + const randomOffset = Math.floor(Math.random() * 100); + setTimeout(() => { + this.addJob(monitor.id, monitor); + }, randomOffset); + } + + return true; + } catch (error) { + this.logger.error({ + message: "Failed to initialize SuperSimpleQueue", + service: SERVICE_NAME, + method: "init", + details: error, + }); + return false; + } + }; + + addJob = async (monitorId: string, monitor: any) => { + this.scheduler.addJob({ + id: monitorId, + template: "monitor-job", + repeat: monitor.interval, + active: monitor.isActive, + data: monitor, + }); + }; + + deleteJob = async (monitor: any) => { + this.scheduler.removeJob(monitor.id); + }; + + pauseJob = async (monitor: any) => { + const result = await this.scheduler.pauseJob(monitor.id); + if (result === false) { + throw new Error("Failed to resume monitor"); + } + this.logger.debug({ + message: `Paused monitor ${monitor.id}`, + service: SERVICE_NAME, + method: "pauseJob", + }); + }; + + resumeJob = async (monitor: any) => { + const result = await this.scheduler.resumeJob(monitor.id); + if (result === false) { + throw new Error("Failed to resume monitor"); + } + this.logger.debug({ + message: `Resumed monitor ${monitor.id}`, + service: SERVICE_NAME, + method: "resumeJob", + }); + }; + + updateJob = async (monitor: any) => { + this.scheduler.updateJob(monitor.id, { repeat: monitor.interval, data: monitor }); + }; + + shutdown = async () => { + this.scheduler.stop(); + }; + + getMetrics = async () => { + const jobs = await this.scheduler.getJobs(); + const metrics = jobs.reduce( + (acc, job) => { + const runCount = job.runCount ?? 0; + const failCount = job.failCount ?? 0; + const lastFailedAt = job.lastFailedAt ?? 0; + const lastRunAt = job.lastRunAt ?? 0; + acc.totalRuns += runCount; + acc.totalFailures += failCount; + acc.jobs++; + if (failCount > 0 && lastFailedAt >= lastRunAt) { + acc.failingJobs++; + } + + if (job.lockedAt) { + acc.activeJobs++; + } + + if (failCount > 0) { + acc.jobsWithFailures.push({ + monitorId: job.id, + monitorUrl: job?.data?.url || null, + monitorType: job?.data?.type || null, + failedAt: job.lastFailedAt ?? null, + failCount, + failReason: job.lastFailReason ?? null, + }); + } + return acc; + }, + { + jobs: 0, + activeJobs: 0, + failingJobs: 0, + jobsWithFailures: [] as Array<{ + monitorId: string | number; + monitorUrl: any; + monitorType: any; + failedAt: number | null; + failCount: number; + failReason: string | null; + }>, + totalRuns: 0, + totalFailures: 0, + } + ); + return metrics; + }; + + getJobs = async () => { + const jobs = await this.scheduler.getJobs(); + return jobs.map((job) => { + return { + monitorId: job.id, + monitorUrl: job?.data?.url || null, + monitorType: job?.data?.type || null, + monitorInterval: job?.data?.interval || null, + active: job.active, + lockedAt: job.lockedAt ?? null, + runCount: job.runCount ?? 0, + failCount: job.failCount ?? 0, + failReason: job.lastFailReason ?? null, + lastRunAt: job.lastRunAt ?? null, + lastFinishedAt: job.lastFinishedAt ?? null, + lastRunTook: job.lockedAt ? null : (job.lastFinishedAt ?? 0) - (job.lastRunAt ?? 0), + lastFailedAt: job.lastFailedAt ?? null, + }; + }); + }; + + flushQueues = async () => { + const stopRes = await this.scheduler.stop(); + const flushRes = await this.scheduler.flushJobs(); + const initRes = await this.init(); + return { + success: Boolean(stopRes && flushRes && initRes), + }; + }; + + obliterate = async () => { + console.log("obliterate not implemented"); + }; +} + +export default SuperSimpleQueue;