remove incidents from buffer

This commit is contained in:
Alex Holliday
2026-01-20 19:04:30 +00:00
parent 7afdcb8ebc
commit 69d050ff61
2 changed files with 1 additions and 216 deletions
@@ -220,162 +220,6 @@ class IncidentService {
throw error;
}
};
processIncidentsFromBuffer = async (incidentBufferItems: any[]) => {
try {
if (!incidentBufferItems || incidentBufferItems.length === 0) {
return;
}
const createItems = [];
const resolveItems = [];
for (const item of incidentBufferItems) {
if (item.action === "resolve") {
resolveItems.push(item);
} else {
createItems.push(item);
}
}
for (const item of resolveItems as any[]) {
try {
await this.resolveIncidentForMonitor(item.monitor);
} catch (error: any) {
this.logger.error({
service: SERVICE_NAME,
method: "processIncidentsFromBuffer",
message: `Failed to resolve incident from buffer: ${error.message}`,
monitorId: item.monitor?._id,
stack: error.stack,
});
}
}
if (createItems.length === 0) {
return;
}
const groupedByMonitor: Record<string, any[]> = {};
for (const item of createItems) {
if (!item.monitor || !item.monitor.id || !item.check || !item.check.id) {
this.logger.warn({
service: SERVICE_NAME,
method: "processIncidentsFromBuffer",
message: "Skipping item with missing monitor or check data",
item,
});
continue;
}
const monitorId = item.monitor.id.toString();
if (!groupedByMonitor[monitorId]) {
groupedByMonitor[monitorId] = [];
}
groupedByMonitor[monitorId].push(item);
}
const monitorIds = Object.keys(groupedByMonitor);
if (monitorIds.length === 0) {
return;
}
const activeIncidents = await this.db.incidentModule.getActiveIncidentsByMonitors(monitorIds);
const incidentsCreatedInFlush: Record<string, any> = {};
const checksToAddToIncidents = [];
const newIncidentsToCreate = [];
for (const [monitorId, items] of Object.entries(groupedByMonitor)) {
const existingIncident = activeIncidents.get(monitorId) || incidentsCreatedInFlush[monitorId];
if (existingIncident) {
const incidentId = existingIncident._id ? existingIncident._id.toString() : existingIncident;
for (const item of items) {
checksToAddToIncidents.push({
incidentId,
checkId: item.check.id.toString(),
});
}
} else {
const firstItem = items[0];
const incidentData = {
monitorId: firstItem.monitor.id,
teamId: firstItem.monitor.teamId,
type: firstItem.monitor.type,
startTime: new Date(),
status: true,
message: firstItem.check.message || null,
statusCode: firstItem.check.statusCode || null,
checks: [firstItem.check.id],
};
newIncidentsToCreate.push({
incidentData,
monitorId,
remainingChecks: items.slice(1), // Checks restantes para agregar después
});
}
}
if (newIncidentsToCreate.length > 0) {
const incidentDataArray = newIncidentsToCreate.map((item) => item.incidentData);
await this.db.incidentModule.createIncidents(incidentDataArray);
const createdIncidentsMap = await this.db.incidentModule.getActiveIncidentsByMonitors(newIncidentsToCreate.map((item) => item.monitorId));
for (const item of newIncidentsToCreate) {
const createdIncident = createdIncidentsMap.get(item.monitorId);
if (createdIncident && createdIncident._id) {
const incidentId = createdIncident._id.toString();
incidentsCreatedInFlush[item.monitorId] = incidentId;
for (const remainingItem of item.remainingChecks) {
checksToAddToIncidents.push({
incidentId,
checkId: remainingItem.check.id,
});
}
}
}
}
if (checksToAddToIncidents.length > 0) {
await this.db.incidentModule.addChecksToIncidentsBatch(checksToAddToIncidents);
}
this.logger.info({
service: SERVICE_NAME,
method: "processIncidentsFromBuffer",
message: `Processed ${incidentBufferItems.length} incident buffer items`,
created: newIncidentsToCreate.length,
checksAdded: checksToAddToIncidents.length,
resolved: resolveItems.length,
});
} catch (error: any) {
this.logger.error({
service: SERVICE_NAME,
method: "processIncidentsFromBuffer",
message: error.message,
stack: error.stack,
});
throw error;
}
};
private resolveIncidentForMonitor = async (monitor: Monitor) => {
if (!monitor?.id) {
return;
}
const incident = await this.incidentsRepository.findActiveByMonitorId(monitor.id, monitor.teamId);
if (!incident) {
return;
}
incident.status = false;
incident.endTime = new Date().toISOString();
incident.resolutionType = "automatic";
await this.incidentsRepository.updateById(incident.id, incident.teamId, incident);
};
}
export default IncidentService;
@@ -7,21 +7,17 @@ class BufferService {
static SERVICE_NAME = SERVICE_NAME;
private BUFFER_TIMEOUT: number;
private logger: any;
private incidentService: any;
private SERVICE_NAME: string;
private buffer: any[];
private incidentBuffer: any[];
private bufferTimer: NodeJS.Timeout | null = null;
private checksService: any;
constructor({ logger, incidentService, checkService }: { logger: any; incidentService: any; checkService: any }) {
constructor({ logger, checkService }: { logger: any; checkService: any }) {
this.BUFFER_TIMEOUT = config.NODE_ENV === "development" ? 10 : 1000 * 60 * 1; // 1 minute
this.logger = logger;
this.incidentService = incidentService;
this.checksService = checkService;
this.SERVICE_NAME = SERVICE_NAME;
this.buffer = [];
this.incidentBuffer = [];
this.scheduleNextFlush();
this.logger.info({
message: `Buffer service initialized, flushing every ${this.BUFFER_TIMEOUT / 1000}s`,
@@ -47,28 +43,6 @@ class BufferService {
}
}
addIncidentToBuffer({ monitor, check, action = "create" }: { monitor: any; check: Check; action?: string }) {
try {
if (!monitor || !check) {
this.logger.warn({
message: "Skipping incident buffer item: missing monitor or check",
service: this.SERVICE_NAME,
method: "addIncidentToBuffer",
});
return;
}
this.incidentBuffer.push({ monitor, check, action });
} catch (error: any) {
this.logger.error({
message: error.message,
service: this.SERVICE_NAME,
method: "addIncidentToBuffer",
stack: error.stack,
});
}
}
removeCheckFromBuffer(checkToRemove: Check) {
try {
if (!checkToRemove) {
@@ -141,40 +115,7 @@ class BufferService {
});
}
try {
if (this.incidentBuffer.length > 0 && this.incidentService) {
await this.flushIncidentBuffer();
}
} catch (error: any) {
this.logger.error({
message: error.message,
service: this.SERVICE_NAME,
method: "flushBuffer",
stack: error.stack,
});
}
this.buffer = [];
this.incidentBuffer = [];
}
async flushIncidentBuffer() {
if (!this.incidentService || this.incidentBuffer.length === 0) {
return;
}
try {
const itemsToProcess = [...this.incidentBuffer];
await this.incidentService.processIncidentsFromBuffer(itemsToProcess);
} catch (error: any) {
this.logger.error({
message: `Error flushing incident buffer: ${error.message}`,
service: this.SERVICE_NAME,
method: "flushIncidentBuffer",
stack: error.stack,
});
throw error;
}
}
}