refactor(api): enhance metrics polling mechanism and simplify subscription handling

- Removed legacy polling methods from `MetricsResolver` and integrated polling logic into `SubscriptionPollingService` for better separation of concerns.
- Updated `SubscriptionTrackerService` to support polling configuration directly, allowing for cleaner topic registration.
- Adjusted unit tests to accommodate changes in the subscription handling and polling logic.
- Introduced `SubscriptionPollingService` to manage polling intervals and ensure efficient execution of polling tasks.
This commit is contained in:
Eli Bosley
2025-08-19 12:50:17 -04:00
parent 99da8bf309
commit ca691b71aa
7 changed files with 151 additions and 55 deletions

View File

@@ -174,12 +174,12 @@ describe('MetricsResolver', () => {
expect(subscriptionTracker.registerTopic).toHaveBeenCalledWith(
'CPU_UTILIZATION',
expect.any(Function),
expect.any(Function)
1000
);
expect(subscriptionTracker.registerTopic).toHaveBeenCalledWith(
'MEMORY_UTILIZATION',
expect.any(Function),
expect.any(Function)
2000
);
});
});

View File

@@ -19,11 +19,6 @@ import { SubscriptionTrackerService } from '@app/unraid-api/graph/services/subsc
@Resolver(() => Metrics)
export class MetricsResolver implements OnModuleInit {
private cpuPollingTimer: NodeJS.Timeout | undefined;
private memoryPollingTimer: NodeJS.Timeout | undefined;
private isCpuPollingInProgress = false;
private isMemoryPollingInProgress = false;
constructor(
private readonly cpuService: CpuService,
private readonly memoryService: MemoryService,
@@ -32,59 +27,27 @@ export class MetricsResolver implements OnModuleInit {
) {}
onModuleInit() {
// Register CPU polling with 1 second interval
this.subscriptionTracker.registerTopic(
PUBSUB_CHANNEL.CPU_UTILIZATION,
() => {
this.pollCpuUtilization();
this.cpuPollingTimer = setInterval(() => this.pollCpuUtilization(), 1000);
async () => {
const payload = await this.cpuService.generateCpuLoad();
pubsub.publish(PUBSUB_CHANNEL.CPU_UTILIZATION, { systemMetricsCpu: payload });
},
() => {
clearInterval(this.cpuPollingTimer);
this.isCpuPollingInProgress = false;
}
1000
);
// Register memory polling with 2 second interval
this.subscriptionTracker.registerTopic(
PUBSUB_CHANNEL.MEMORY_UTILIZATION,
() => {
this.pollMemoryUtilization();
this.memoryPollingTimer = setInterval(() => this.pollMemoryUtilization(), 2000);
async () => {
const payload = await this.memoryService.generateMemoryLoad();
pubsub.publish(PUBSUB_CHANNEL.MEMORY_UTILIZATION, { systemMetricsMemory: payload });
},
() => {
clearInterval(this.memoryPollingTimer);
this.isMemoryPollingInProgress = false;
}
2000
);
}
private async pollCpuUtilization(): Promise<void> {
if (this.isCpuPollingInProgress) return;
this.isCpuPollingInProgress = true;
try {
const payload = await this.cpuService.generateCpuLoad();
pubsub.publish(PUBSUB_CHANNEL.CPU_UTILIZATION, { systemMetricsCpu: payload });
} catch (error) {
console.error('Error polling CPU utilization:', error);
} finally {
this.isCpuPollingInProgress = false;
}
}
private async pollMemoryUtilization(): Promise<void> {
if (this.isMemoryPollingInProgress) return;
this.isMemoryPollingInProgress = true;
try {
const payload = await this.memoryService.generateMemoryLoad();
pubsub.publish(PUBSUB_CHANNEL.MEMORY_UTILIZATION, { systemMetricsMemory: payload });
} catch (error) {
console.error('Error polling memory utilization:', error);
} finally {
this.isMemoryPollingInProgress = false;
}
}
@Query(() => Metrics)
@UsePermissions({
action: AuthActionVerb.READ,

View File

@@ -1,10 +1,13 @@
import { Module } from '@nestjs/common';
import { ScheduleModule } from '@nestjs/schedule';
import { SubscriptionHelperService } from '@app/unraid-api/graph/services/subscription-helper.service.js';
import { SubscriptionPollingService } from '@app/unraid-api/graph/services/subscription-polling.service.js';
import { SubscriptionTrackerService } from '@app/unraid-api/graph/services/subscription-tracker.service.js';
@Module({
providers: [SubscriptionTrackerService, SubscriptionHelperService],
exports: [SubscriptionTrackerService, SubscriptionHelperService],
imports: [ScheduleModule.forRoot()],
providers: [SubscriptionTrackerService, SubscriptionHelperService, SubscriptionPollingService],
exports: [SubscriptionTrackerService, SubscriptionHelperService, SubscriptionPollingService],
})
export class ServicesModule {}

View File

@@ -13,7 +13,11 @@ describe('SubscriptionHelperService', () => {
let loggerSpy: any;
beforeEach(() => {
trackerService = new SubscriptionTrackerService();
const mockPollingService = {
startPolling: vi.fn(),
stopPolling: vi.fn(),
};
trackerService = new SubscriptionTrackerService(mockPollingService as any);
helperService = new SubscriptionHelperService(trackerService);
loggerSpy = vi.spyOn(Logger.prototype, 'debug').mockImplementation(() => {});
});

View File

@@ -0,0 +1,91 @@
import { Injectable, Logger, OnModuleDestroy } from '@nestjs/common';
import { SchedulerRegistry } from '@nestjs/schedule';
export interface PollingConfig {
name: string;
intervalMs: number;
callback: () => Promise<void>;
}
@Injectable()
export class SubscriptionPollingService implements OnModuleDestroy {
private readonly logger = new Logger(SubscriptionPollingService.name);
private readonly activePollers = new Map<string, { isPolling: boolean }>();
constructor(private readonly schedulerRegistry: SchedulerRegistry) {}
onModuleDestroy() {
this.stopAll();
}
/**
* Start polling for a specific subscription topic
*/
startPolling(config: PollingConfig): void {
const { name, intervalMs, callback } = config;
// Clean up any existing interval
this.stopPolling(name);
// Initialize polling state
this.activePollers.set(name, { isPolling: false });
// Create the polling function with guard against overlapping executions
const pollFunction = async () => {
const poller = this.activePollers.get(name);
if (!poller || poller.isPolling) {
return;
}
poller.isPolling = true;
try {
await callback();
} catch (error) {
this.logger.error(`Error in polling task '${name}'`, error);
} finally {
if (poller) {
poller.isPolling = false;
}
}
};
// Create and register the interval
const interval = setInterval(pollFunction, intervalMs);
this.schedulerRegistry.addInterval(name, interval);
this.logger.debug(`Started polling for '${name}' every ${intervalMs}ms`);
}
/**
* Stop polling for a specific subscription topic
*/
stopPolling(name: string): void {
try {
if (this.schedulerRegistry.doesExist('interval', name)) {
this.schedulerRegistry.deleteInterval(name);
this.logger.debug(`Stopped polling for '${name}'`);
}
} catch (error) {
// Interval doesn't exist, which is fine
}
// Clean up polling state
this.activePollers.delete(name);
}
/**
* Stop all active polling tasks
*/
stopAll(): void {
const intervals = this.schedulerRegistry.getIntervals();
intervals.forEach((key) => this.stopPolling(key));
this.activePollers.clear();
}
/**
* Check if polling is active for a given name
*/
isPolling(name: string): boolean {
return this.schedulerRegistry.doesExist('interval', name);
}
}

View File

@@ -9,7 +9,11 @@ describe('SubscriptionTrackerService', () => {
let loggerSpy: any;
beforeEach(() => {
service = new SubscriptionTrackerService();
const mockPollingService = {
startPolling: vi.fn(),
stopPolling: vi.fn(),
};
service = new SubscriptionTrackerService(mockPollingService as any);
// Spy on logger methods
loggerSpy = vi.spyOn(Logger.prototype, 'debug').mockImplementation(() => {});
});

View File

@@ -1,13 +1,44 @@
import { Injectable, Logger } from '@nestjs/common';
import { SubscriptionPollingService } from '@app/unraid-api/graph/services/subscription-polling.service.js';
@Injectable()
export class SubscriptionTrackerService {
private readonly logger = new Logger(SubscriptionTrackerService.name);
private subscriberCounts = new Map<string, number>();
private topicHandlers = new Map<string, { onStart: () => void; onStop: () => void }>();
public registerTopic(topic: string, onStart: () => void, onStop: () => void): void {
this.topicHandlers.set(topic, { onStart, onStop });
constructor(private readonly pollingService: SubscriptionPollingService) {}
/**
* Register a topic with optional polling support
* @param topic The topic identifier
* @param callbackOrOnStart The callback function to execute (can be async) OR onStart handler for legacy support
* @param intervalMsOrOnStop Optional interval in ms for polling OR onStop handler for legacy support
*/
public registerTopic(
topic: string,
callbackOrOnStart: () => void | Promise<void>,
intervalMsOrOnStop?: number | (() => void)
): void {
if (typeof intervalMsOrOnStop === 'number') {
// New API: callback with polling interval
const pollingConfig = {
name: topic,
intervalMs: intervalMsOrOnStop,
callback: async () => callbackOrOnStart(),
};
this.topicHandlers.set(topic, {
onStart: () => this.pollingService.startPolling(pollingConfig),
onStop: () => this.pollingService.stopPolling(topic),
});
} else {
// Legacy API: onStart and onStop handlers
this.topicHandlers.set(topic, {
onStart: callbackOrOnStart,
onStop: intervalMsOrOnStop || (() => {}),
});
}
}
public subscribe(topic: string): void {