queue and buffer refactor

This commit is contained in:
Alex Holliday
2026-01-16 18:11:04 +00:00
parent 0f764ca084
commit 95cd33ac06
10 changed files with 218 additions and 287 deletions
+14 -8
View File
@@ -183,7 +183,17 @@ export const initializeServices = async ({
stringService,
});
const bufferService = new BufferService({ db, logger, envSettings, incidentService });
const checkService = new CheckService({
db,
settingsService,
stringService,
errorService,
monitorsRepository,
logger,
checksRepository,
});
const bufferService = new BufferService({ db, logger, envSettings, incidentService, checkService });
const statusService = new StatusService({ db, logger, buffer: bufferService, incidentService, monitorsRepository });
@@ -208,6 +218,8 @@ export const initializeServices = async ({
networkService,
statusService,
notificationService,
checkService,
buffer: bufferService,
});
const superSimpleQueue = await SuperSimpleQueue.create({
@@ -229,13 +241,7 @@ export const initializeServices = async ({
jobQueue: superSimpleQueue,
monitorsRepository,
});
const checkService = new CheckService({
db,
settingsService,
stringService,
errorService,
monitorsRepository,
});
const diagnosticService = new DiagnosticService();
const inviteService = new InviteService({
db,
@@ -1,4 +1,4 @@
import type { Check, CheckAudits, MonitorStatusResponse, MonitorType } from "@/types/index.js";
import type { Check, MonitorType } from "@/types/index.js";
import type { LatestChecksMap } from "@/repositories/checks/MongoChecksRepistory.js";
export interface PageSpeedChecksResult {
@@ -55,7 +55,8 @@ export interface UptimeChecksResult {
export interface IChecksRepository {
// create
buildCheck(status: MonitorStatusResponse, type: MonitorType): Check;
createChecks(checks: Check[]): Promise<Check[]>;
// single fetch
// collection fetch
findLatestChecksByMonitorIds(monitorIds: string[], options?: { limitPerMonitor?: number }): Promise<LatestChecksMap>;
@@ -11,10 +11,8 @@ import type {
CheckMetadata,
CheckNetworkInterfaceInfo,
GotTimings,
MonitorStatusResponse,
MonitorType,
} from "@/types/index.js";
import type { HardwareStatusPayload, PageSpeedStatusPayload } from "@/types/network.js";
import { CheckModel, type CheckDocument } from "@/db/models/index.js";
import mongoose from "mongoose";
@@ -144,11 +142,11 @@ class MongoChecksRepository implements IChecksRepository {
return undefined;
}
return {
cls: audits.cls ?? 0,
si: audits.si ?? 0,
fcp: audits.fcp ?? 0,
lcp: audits.lcp ?? 0,
tbt: audits.tbt ?? 0,
cls: audits.cls,
si: audits.si,
fcp: audits.fcp,
lcp: audits.lcp,
tbt: audits.tbt,
};
};
@@ -187,100 +185,8 @@ class MongoChecksRepository implements IChecksRepository {
};
};
buildCheck = (
statusResponse: MonitorStatusResponse<PageSpeedStatusPayload | HardwareStatusPayload | undefined>
) => {
const {
monitorId,
teamId,
type,
status,
responseTime,
code,
message,
payload,
first_byte_took,
body_read_took,
dns_took,
conn_took,
connect_took,
tls_took,
timings,
} = statusResponse;
const check: any = {
metadata: {
monitorId,
teamId,
type,
},
status,
statusCode: code,
responseTime,
timings: timings || {},
message,
first_byte_took,
body_read_took,
dns_took,
conn_took,
connect_took,
tls_took,
};
if (type === "pagespeed") {
const pageSpeedPayload = payload as PageSpeedStatusPayload | undefined;
if (!pageSpeedPayload) {
this.logger.warn({
message: "Failed to build check",
service: SERVICE_NAME,
method: "buildCheck",
details: "empty payload",
});
return undefined;
}
const categories = pageSpeedPayload.lighthouseResult?.categories ?? {};
const audits = pageSpeedPayload.lighthouseResult?.audits ?? {};
const mapAudit = (audit: any) => {
if (!audit || typeof audit !== "object") {
return undefined;
}
return {
id: audit.id,
title: audit.title,
score: typeof audit.score === "number" ? audit.score : (audit.score ?? null),
displayValue: audit.displayValue,
numericValue: typeof audit.numericValue === "number" ? audit.numericValue : undefined,
numericUnit: audit.numericUnit,
};
};
check.accessibility = (categories?.accessibility?.score || 0) * 100;
check.bestPractices = (categories?.["best-practices"]?.score || 0) * 100;
check.seo = (categories?.seo?.score || 0) * 100;
check.performance = (categories?.performance?.score || 0) * 100;
check.audits = {
cls: mapAudit(audits?.["cumulative-layout-shift"]),
si: mapAudit(audits?.["speed-index"]),
fcp: mapAudit(audits?.["first-contentful-paint"]),
lcp: mapAudit(audits?.["largest-contentful-paint"]),
tbt: mapAudit(audits?.["total-blocking-time"]),
};
}
if (type === "hardware") {
const hardwarePayload = payload as HardwareStatusPayload | undefined;
const { cpu, memory, disk, host, net } = hardwarePayload?.data ?? {};
const errorsSource = Array.isArray(hardwarePayload?.errors)
? hardwarePayload?.errors
: (hardwarePayload?.errors as { errors?: CheckErrorInfo[] } | undefined)?.errors;
check.cpu = cpu ?? {};
check.memory = memory ?? {};
check.disk = disk ?? {};
check.host = host ?? {};
check.errors = errorsSource ?? [];
check.capture = hardwarePayload?.capture ?? {};
check.net = net ?? {};
}
return check;
createChecks = async (checks: Check[]) => {
return await CheckModel.insertMany(checks);
};
findLatestChecksByMonitorIds = async (monitorIds: string[], options?: { limitPerMonitor?: number }): Promise<LatestChecksMap> => {
+86 -4
View File
@@ -1,5 +1,6 @@
import { IMonitorsRepository } from "@/repositories/index.js";
import type { MonitorType } from "@/types/index.js";
import { IChecksRepository, IMonitorsRepository } from "@/repositories/index.js";
import type { MonitorType, MonitorStatusResponse, CheckErrorInfo, Check } from "@/types/index.js";
import type { HardwareStatusPayload, PageSpeedStatusPayload } from "@/types/network.js";
const SERVICE_NAME = "checkService";
@@ -11,32 +12,113 @@ class CheckService {
private stringService: any;
private errorService: any;
private monitorsRepository: IMonitorsRepository;
private checksRepository: IChecksRepository;
private logger: any;
constructor({
db,
settingsService,
stringService,
errorService,
monitorsRepository,
logger,
checksRepository,
}: {
db: any;
settingsService: any;
stringService: any;
errorService: any;
monitorsRepository: IMonitorsRepository;
logger: any;
checksRepository: IChecksRepository;
}) {
this.db = db;
this.settingsService = settingsService;
this.stringService = stringService;
this.errorService = errorService;
this.monitorsRepository = monitorsRepository;
this.logger = logger;
this.checksRepository = checksRepository;
}
get serviceName() {
return CheckService.SERVICE_NAME;
}
buildCheck = async (statusResponse: any, type: MonitorType) => {};
createChecks = async (checks: Check[]) => {
return this.checksRepository.createChecks(checks);
};
buildCheck = (statusResponse: MonitorStatusResponse<PageSpeedStatusPayload | HardwareStatusPayload | undefined>) => {
const { monitorId, teamId, type, status, responseTime, code, message, payload, timings } = statusResponse;
const check: Partial<Check> = {
metadata: {
monitorId,
teamId,
type,
},
status,
statusCode: code,
responseTime: responseTime || 0,
timings: timings,
message,
};
if (type === "pagespeed") {
const pageSpeedPayload = payload as PageSpeedStatusPayload | undefined;
if (!pageSpeedPayload) {
this.logger.warn({
message: "Failed to build check",
service: SERVICE_NAME,
method: "buildCheck",
details: "empty payload",
});
return undefined;
}
const categories = pageSpeedPayload.lighthouseResult?.categories ?? {};
const audits = pageSpeedPayload.lighthouseResult?.audits ?? {};
const mapAudit = (audit: any) => {
if (!audit || typeof audit !== "object") {
return undefined;
}
return {
id: audit.id,
title: audit.title,
score: typeof audit.score === "number" ? audit.score : (audit.score ?? null),
displayValue: audit.displayValue,
numericValue: typeof audit.numericValue === "number" ? audit.numericValue : undefined,
numericUnit: audit.numericUnit,
};
};
check.accessibility = (categories?.accessibility?.score || 0) * 100;
check.bestPractices = (categories?.["best-practices"]?.score || 0) * 100;
check.seo = (categories?.seo?.score || 0) * 100;
check.performance = (categories?.performance?.score || 0) * 100;
check.audits = {
cls: mapAudit(audits?.["cumulative-layout-shift"]),
si: mapAudit(audits?.["speed-index"]),
fcp: mapAudit(audits?.["first-contentful-paint"]),
lcp: mapAudit(audits?.["largest-contentful-paint"]),
tbt: mapAudit(audits?.["total-blocking-time"]),
};
}
if (type === "hardware") {
const hardwarePayload = payload as HardwareStatusPayload | undefined;
const { cpu, memory, disk, host, net } = hardwarePayload?.data ?? {};
const errorsSource = Array.isArray(hardwarePayload?.errors)
? hardwarePayload?.errors
: (hardwarePayload?.errors as { errors?: CheckErrorInfo[] } | undefined)?.errors;
check.cpu = cpu;
check.memory = memory;
check.disk = disk;
check.host = host;
check.errors = errorsSource;
check.capture = hardwarePayload?.capture;
check.net = net;
}
return check;
};
getChecksByMonitor = async ({ monitorId, query, teamId }: { monitorId: string; query: any; teamId: string }) => {
if (!monitorId) {
@@ -10,6 +10,8 @@ class SuperSimpleQueueHelper {
private networkService: INetworkService;
private statusService: any;
private notificationService: any;
private checkService: any;
private buffer: any;
constructor({
db,
@@ -17,18 +19,24 @@ class SuperSimpleQueueHelper {
networkService,
statusService,
notificationService,
checkService,
buffer,
}: {
db: any;
logger: any;
networkService: INetworkService;
statusService: any;
notificationService: any;
checkService: any;
buffer: any;
}) {
this.db = db;
this.logger = logger;
this.networkService = networkService;
this.statusService = statusService;
this.notificationService = notificationService;
this.checkService = checkService;
this.buffer = buffer;
}
get serviceName() {
@@ -61,7 +69,15 @@ class SuperSimpleQueueHelper {
throw new Error("No network response");
}
const statusChangeResult = await this.statusService.updateMonitorStatus(status);
// Step 3. Build check
const check = await this.checkService.buildCheck(status);
// Step 4 Add check to buffer
this.buffer.addToBuffer({ check });
// Step 4. Update monitor status
const statusChangeResult = await this.statusService.updateMonitorStatus(status, check);
this.notificationService
.handleNotifications({
...status,
@@ -1,15 +1,38 @@
import { config } from "@/config/index.js";
import type { Check } from "@/types/index.js";
const SERVICE_NAME = "BufferService";
class BufferService {
static SERVICE_NAME = SERVICE_NAME;
constructor({ db, logger, envSettings, incidentService }) {
console.log(envSettings);
private BUFFER_TIMEOUT: number;
private db: any;
private logger: any;
private incidentService: any;
private SERVICE_NAME: string;
private buffer: any[];
private incidentBuffer: any[];
private bufferTimer: NodeJS.Timeout;
private checksService: any;
constructor({
db,
logger,
envSettings,
incidentService,
checkService,
}: {
db: any;
logger: any;
envSettings: any;
incidentService: any;
checkService: any;
}) {
this.BUFFER_TIMEOUT = config.NODE_ENV === "development" ? 10 : 1000 * 60 * 1; // 1 minute
this.db = db;
this.logger = logger;
this.incidentService = incidentService;
this.checksService = checkService;
this.SERVICE_NAME = SERVICE_NAME;
this.buffer = [];
this.incidentBuffer = [];
@@ -25,10 +48,10 @@ class BufferService {
return BufferService.SERVICE_NAME;
}
addToBuffer({ check }) {
addToBuffer({ check }: { check: Check }) {
try {
this.buffer.push(check);
} catch (error) {
} catch (error: any) {
this.logger.error({
message: error.message,
service: this.SERVICE_NAME,
@@ -38,7 +61,7 @@ class BufferService {
}
}
addIncidentToBuffer({ monitor, check, action = "create" }) {
addIncidentToBuffer({ monitor, check, action = "create" }: { monitor: any; check: Check; action?: string }) {
try {
if (!monitor || !check) {
this.logger.warn({
@@ -50,7 +73,7 @@ class BufferService {
}
this.incidentBuffer.push({ monitor, check, action });
} catch (error) {
} catch (error: any) {
this.logger.error({
message: error.message,
service: this.SERVICE_NAME,
@@ -60,20 +83,20 @@ class BufferService {
}
}
removeCheckFromBuffer(checkToRemove) {
removeCheckFromBuffer(checkToRemove: Check) {
try {
if (!checkToRemove) {
return false;
}
const index = this.buffer.findIndex((check) => {
if (checkToRemove._id && check._id) {
return check._id.toString() === checkToRemove._id.toString();
if (checkToRemove.id && check.id) {
return check.id.toString() === checkToRemove.id.toString();
}
return (
check.monitorId?.toString() === checkToRemove.monitorId?.toString() &&
check.teamId?.toString() === checkToRemove.teamId?.toString() &&
check.type === checkToRemove.type &&
check.monitorId?.toString() === checkToRemove.metadata.monitorId &&
check.teamId?.toString() === checkToRemove.metadata.teamId &&
check.type === checkToRemove.metadata.type &&
check.status === checkToRemove.status &&
check.statusCode === checkToRemove.statusCode &&
check.responseTime === checkToRemove.responseTime &&
@@ -87,7 +110,7 @@ class BufferService {
}
return false;
} catch (error) {
} catch (error: any) {
this.logger.error({
message: error.message,
service: this.SERVICE_NAME,
@@ -102,7 +125,7 @@ class BufferService {
this.bufferTimer = setTimeout(async () => {
try {
await this.flushBuffer();
} catch (error) {
} catch (error: any) {
this.logger.error({
message: `Error in flush cycle: ${error.message}`,
service: this.SERVICE_NAME,
@@ -118,9 +141,9 @@ class BufferService {
async flushBuffer() {
try {
if (this.buffer.length > 0) {
await this.db.checkModule.createChecks(this.buffer);
await this.checksService.createChecks(this.buffer);
}
} catch (error) {
} catch (error: any) {
this.logger.error({
message: error.message,
service: this.SERVICE_NAME,
@@ -133,7 +156,7 @@ class BufferService {
if (this.incidentBuffer.length > 0 && this.incidentService) {
await this.flushIncidentBuffer();
}
} catch (error) {
} catch (error: any) {
this.logger.error({
message: error.message,
service: this.SERVICE_NAME,
@@ -154,7 +177,7 @@ class BufferService {
try {
const itemsToProcess = [...this.incidentBuffer];
await this.incidentService.processIncidentsFromBuffer(itemsToProcess);
} catch (error) {
} catch (error: any) {
this.logger.error({
message: `Error flushing incident buffer: ${error.message}`,
service: this.SERVICE_NAME,
@@ -1,8 +1,15 @@
import { IMonitorsRepository } from "@/repositories/index.js";
import MonitorStats from "../../db/models/MonitorStats.js";
import { CheckModel } from "@/db/models/index.js";
import type { CheckErrorInfo, Monitor, MonitorStatusResponse, StatusChangeResult } from "@/types/index.js";
import type { HardwareStatusPayload, PageSpeedStatusPayload } from "@/types/network.js";
import type {
CheckErrorInfo,
Monitor,
MonitorStatusResponse,
StatusChangeResult,
Check,
HardwareStatusPayload,
PageSpeedStatusPayload,
} from "@/types/index.js";
const SERVICE_NAME = "StatusService";
class StatusService {
@@ -163,10 +170,9 @@ class StatusService {
};
updateMonitorStatus = async (
statusResponse: MonitorStatusResponse<PageSpeedStatusPayload | HardwareStatusPayload | undefined>
statusResponse: MonitorStatusResponse<PageSpeedStatusPayload | HardwareStatusPayload | undefined>,
check: Check
): Promise<StatusChangeResult> => {
const check = this.buildCheck(statusResponse);
await this.insertCheck(check);
try {
const { monitorId, teamId, status, code } = statusResponse;
const monitor = await this.monitorsRepository.findById(monitorId, teamId);
@@ -301,116 +307,7 @@ class StatusService {
}
};
buildCheck = (
networkResponse: MonitorStatusResponse<PageSpeedStatusPayload | HardwareStatusPayload | undefined>
) => {
const {
monitorId,
teamId,
type,
status,
responseTime,
code,
message,
payload,
first_byte_took,
body_read_took,
dns_took,
conn_took,
connect_took,
tls_took,
timings,
} = networkResponse;
const check: any = {
metadata: {
monitorId,
teamId,
type,
},
status,
statusCode: code,
responseTime,
timings: timings || {},
message,
first_byte_took,
body_read_took,
dns_took,
conn_took,
connect_took,
tls_took,
};
if (type === "pagespeed") {
const pageSpeedPayload = payload as PageSpeedStatusPayload | undefined;
if (!pageSpeedPayload) {
this.logger.warn({
message: "Failed to build check",
service: SERVICE_NAME,
method: "buildCheck",
details: "empty payload",
});
return undefined;
}
const categories = pageSpeedPayload.lighthouseResult?.categories ?? {};
const audits = pageSpeedPayload.lighthouseResult?.audits ?? {};
const mapAudit = (audit: any) => {
if (!audit || typeof audit !== "object") {
return undefined;
}
return {
id: audit.id,
title: audit.title,
score: typeof audit.score === "number" ? audit.score : (audit.score ?? null),
displayValue: audit.displayValue,
numericValue: typeof audit.numericValue === "number" ? audit.numericValue : undefined,
numericUnit: audit.numericUnit,
};
};
check.accessibility = (categories?.accessibility?.score || 0) * 100;
check.bestPractices = (categories?.["best-practices"]?.score || 0) * 100;
check.seo = (categories?.seo?.score || 0) * 100;
check.performance = (categories?.performance?.score || 0) * 100;
check.audits = {
cls: mapAudit(audits?.["cumulative-layout-shift"]),
si: mapAudit(audits?.["speed-index"]),
fcp: mapAudit(audits?.["first-contentful-paint"]),
lcp: mapAudit(audits?.["largest-contentful-paint"]),
tbt: mapAudit(audits?.["total-blocking-time"]),
};
}
if (type === "hardware") {
const hardwarePayload = payload as HardwareStatusPayload | undefined;
const { cpu, memory, disk, host, net } = hardwarePayload?.data ?? {};
const errorsSource = Array.isArray(hardwarePayload?.errors)
? hardwarePayload?.errors
: (hardwarePayload?.errors as { errors?: CheckErrorInfo[] } | undefined)?.errors;
check.cpu = cpu ?? {};
check.memory = memory ?? {};
check.disk = disk ?? {};
check.host = host ?? {};
check.errors = errorsSource ?? [];
check.capture = hardwarePayload?.capture ?? {};
check.net = net ?? {};
}
return check;
};
/**
* Inserts a check into the database based on the network response.
*
* @param {Object} networkResponse - The network response object.
* @param {string} networkResponse.monitorId - The monitor ID.
* @param {string} networkResponse.type - The type of the response.
* @param {string} networkResponse.status - The status of the response.
* @param {number} networkResponse.responseTime - The response time.
* @param {number} networkResponse.code - The status code.
* @param {string} networkResponse.message - The message.
* @param {Object} networkResponse.payload - The payload of the response.
* @returns {Promise<void>} A promise that resolves when the check is inserted.
*/
insertCheck = async (check: any) => {
insertCheck = async (check: Check) => {
try {
if (typeof check === "undefined") {
this.logger.warn({
@@ -427,7 +324,7 @@ class StatusService {
message: error.message,
service: error.service || SERVICE_NAME,
method: error.method || "insertCheck",
details: error.details || `Error inserting check for monitor: ${check?.monitorId}`,
details: error.details || `Error inserting check for monitor: ${check?.metadata.monitorId}`,
stack: error.stack,
});
}
+35 -35
View File
@@ -9,40 +9,40 @@ export interface CheckMetadata {
}
export interface CheckCpuInfo {
physical_core: number;
logical_core: number;
frequency: number;
temperature: number[];
free_percent: number;
usage_percent: number;
physical_core?: number;
logical_core?: number;
frequency?: number;
temperature?: number[];
free_percent?: number;
usage_percent?: number;
}
export interface CheckMemoryInfo {
total_bytes: number;
available_bytes: number;
used_bytes: number;
usage_percent: number;
total_bytes?: number;
available_bytes?: number;
used_bytes?: number;
usage_percent?: number;
}
export interface CheckHostInfo {
os: string;
platform: string;
kernel_version: string;
os?: string;
platform?: string;
kernel_version?: string;
}
export interface CheckCaptureInfo {
version: string;
mode: string;
version?: string;
mode?: string;
}
export interface CheckDiskInfo {
device: string;
mountpoint: string;
read_speed_bytes: number;
write_speed_bytes: number;
total_bytes: number;
free_bytes: number;
usage_percent: number;
device?: string;
mountpoint?: string;
read_speed_bytes?: number;
write_speed_bytes?: number;
total_bytes?: number;
free_bytes?: number;
usage_percent?: number;
}
export interface CheckErrorInfo {
@@ -65,11 +65,11 @@ export interface CheckNetworkInterfaceInfo {
}
export interface CheckAudits {
cls: ILighthouseAudit;
si: ILighthouseAudit;
fcp: ILighthouseAudit;
lcp: ILighthouseAudit;
tbt: ILighthouseAudit;
cls?: ILighthouseAudit;
si?: ILighthouseAudit;
fcp?: ILighthouseAudit;
lcp?: ILighthouseAudit;
tbt?: ILighthouseAudit;
}
export interface ILighthouseAudit {
@@ -86,19 +86,19 @@ export interface Check {
metadata: CheckMetadata;
status: boolean;
responseTime: number;
timings: GotTimings;
timings?: GotTimings;
statusCode: number;
message: string;
ack: boolean;
ackAt?: string | null;
expiry: string;
cpu: CheckCpuInfo;
memory: CheckMemoryInfo;
disk: CheckDiskInfo[];
host: CheckHostInfo;
errors: CheckErrorInfo[];
capture: CheckCaptureInfo;
net: CheckNetworkInterfaceInfo[];
cpu?: CheckCpuInfo;
memory?: CheckMemoryInfo;
disk?: CheckDiskInfo[];
host?: CheckHostInfo;
errors?: CheckErrorInfo[];
capture?: CheckCaptureInfo;
net?: CheckNetworkInterfaceInfo[];
accessibility?: number;
bestPractices?: number;
seo?: number;
+1 -1
View File
@@ -1,4 +1,4 @@
export const MonitorTypes = ["http", "ping", "pagespeed", "hardware", "docker", "port", "game"] as const;
export const MonitorTypes = ["http", "ping", "pagespeed", "hardware", "docker", "port", "game", "unknown"] as const;
export type MonitorType = (typeof MonitorTypes)[number];
export interface MonitorThresholds {
+1 -1
View File
@@ -16,7 +16,7 @@ import type {
export interface MonitorStatusResponse<T = unknown> {
monitorId: string;
teamId: string;
type: MonitorType | "unknown";
type: MonitorType;
status: boolean;
code: number;
message: string;