mirror of
https://github.com/unraid/api.git
synced 2026-01-06 08:39:54 -06:00
feat(api): introduce SubscriptionHelperService for improved subscription management
- Added `SubscriptionHelperService` to streamline the creation of tracked GraphQL subscriptions with automatic cleanup. - Updated `InfoResolver` to utilize the new service for managing CPU utilization subscriptions, enhancing code clarity and maintainability. - Introduced unit tests for `SubscriptionHelperService` and `SubscriptionTrackerService` to ensure robust functionality and reliability. - Refactored `SubscriptionTrackerService` to include logging for subscription events, improving observability. - Removed deprecated types and methods related to previous subscription handling, simplifying the codebase.
This commit is contained in:
@@ -978,6 +978,38 @@ type InfoCpu implements Node {
|
||||
socket: String!
|
||||
cache: JSON!
|
||||
flags: [String!]!
|
||||
|
||||
"""CPU utilization in percent"""
|
||||
utilization: Float
|
||||
}
|
||||
|
||||
"""CPU load for a single core"""
|
||||
type CpuLoad {
|
||||
"""The total CPU load on a single core, in percent."""
|
||||
load: Float!
|
||||
|
||||
"""The percentage of time the CPU spent in user space."""
|
||||
loadUser: Float!
|
||||
|
||||
"""The percentage of time the CPU spent in kernel space."""
|
||||
loadSystem: Float!
|
||||
|
||||
"""
|
||||
The percentage of time the CPU spent on low-priority (niced) user space processes.
|
||||
"""
|
||||
loadNice: Float!
|
||||
|
||||
"""The percentage of time the CPU was idle."""
|
||||
loadIdle: Float!
|
||||
|
||||
"""The percentage of time the CPU spent servicing hardware interrupts."""
|
||||
loadIrq: Float!
|
||||
}
|
||||
|
||||
type CpuUtilization implements Node {
|
||||
id: PrefixedID!
|
||||
load: Float!
|
||||
cpus: [CpuLoad!]!
|
||||
}
|
||||
|
||||
type Gpu implements Node {
|
||||
@@ -1750,160 +1782,6 @@ type Plugin {
|
||||
hasCliModule: Boolean
|
||||
}
|
||||
|
||||
type AccessUrl {
|
||||
type: URL_TYPE!
|
||||
name: String
|
||||
ipv4: URL
|
||||
ipv6: URL
|
||||
}
|
||||
|
||||
enum URL_TYPE {
|
||||
LAN
|
||||
WIREGUARD
|
||||
WAN
|
||||
MDNS
|
||||
OTHER
|
||||
DEFAULT
|
||||
}
|
||||
|
||||
"""
|
||||
A field whose value conforms to the standard URL format as specified in RFC3986: https://www.ietf.org/rfc/rfc3986.txt.
|
||||
"""
|
||||
scalar URL
|
||||
|
||||
type AccessUrlObject {
|
||||
ipv4: String
|
||||
ipv6: String
|
||||
type: URL_TYPE!
|
||||
name: String
|
||||
}
|
||||
|
||||
type ApiKeyResponse {
|
||||
valid: Boolean!
|
||||
error: String
|
||||
}
|
||||
|
||||
type MinigraphqlResponse {
|
||||
status: MinigraphStatus!
|
||||
timeout: Int
|
||||
error: String
|
||||
}
|
||||
|
||||
"""The status of the minigraph"""
|
||||
enum MinigraphStatus {
|
||||
PRE_INIT
|
||||
CONNECTING
|
||||
CONNECTED
|
||||
PING_FAILURE
|
||||
ERROR_RETRYING
|
||||
}
|
||||
|
||||
type CloudResponse {
|
||||
status: String!
|
||||
ip: String
|
||||
error: String
|
||||
}
|
||||
|
||||
type RelayResponse {
|
||||
status: String!
|
||||
timeout: String
|
||||
error: String
|
||||
}
|
||||
|
||||
type Cloud {
|
||||
error: String
|
||||
apiKey: ApiKeyResponse!
|
||||
relay: RelayResponse
|
||||
minigraphql: MinigraphqlResponse!
|
||||
cloud: CloudResponse!
|
||||
allowedOrigins: [String!]!
|
||||
}
|
||||
|
||||
type RemoteAccess {
|
||||
"""The type of WAN access used for Remote Access"""
|
||||
accessType: WAN_ACCESS_TYPE!
|
||||
|
||||
"""The type of port forwarding used for Remote Access"""
|
||||
forwardType: WAN_FORWARD_TYPE
|
||||
|
||||
"""The port used for Remote Access"""
|
||||
port: Int
|
||||
}
|
||||
|
||||
enum WAN_ACCESS_TYPE {
|
||||
DYNAMIC
|
||||
ALWAYS
|
||||
DISABLED
|
||||
}
|
||||
|
||||
enum WAN_FORWARD_TYPE {
|
||||
UPNP
|
||||
STATIC
|
||||
}
|
||||
|
||||
type DynamicRemoteAccessStatus {
|
||||
"""The type of dynamic remote access that is enabled"""
|
||||
enabledType: DynamicRemoteAccessType!
|
||||
|
||||
"""The type of dynamic remote access that is currently running"""
|
||||
runningType: DynamicRemoteAccessType!
|
||||
|
||||
"""Any error message associated with the dynamic remote access"""
|
||||
error: String
|
||||
}
|
||||
|
||||
enum DynamicRemoteAccessType {
|
||||
STATIC
|
||||
UPNP
|
||||
DISABLED
|
||||
}
|
||||
|
||||
type ConnectSettingsValues {
|
||||
"""The type of WAN access used for Remote Access"""
|
||||
accessType: WAN_ACCESS_TYPE!
|
||||
|
||||
"""The type of port forwarding used for Remote Access"""
|
||||
forwardType: WAN_FORWARD_TYPE
|
||||
|
||||
"""The port used for Remote Access"""
|
||||
port: Int
|
||||
}
|
||||
|
||||
type ConnectSettings implements Node {
|
||||
id: PrefixedID!
|
||||
|
||||
"""The data schema for the Connect settings"""
|
||||
dataSchema: JSON!
|
||||
|
||||
"""The UI schema for the Connect settings"""
|
||||
uiSchema: JSON!
|
||||
|
||||
"""The values for the Connect settings"""
|
||||
values: ConnectSettingsValues!
|
||||
}
|
||||
|
||||
type Connect implements Node {
|
||||
id: PrefixedID!
|
||||
|
||||
"""The status of dynamic remote access"""
|
||||
dynamicRemoteAccess: DynamicRemoteAccessStatus!
|
||||
|
||||
"""The settings for the Connect instance"""
|
||||
settings: ConnectSettings!
|
||||
}
|
||||
|
||||
type Network implements Node {
|
||||
id: PrefixedID!
|
||||
accessUrls: [AccessUrl!]
|
||||
}
|
||||
|
||||
input AccessUrlObjectInput {
|
||||
ipv4: String
|
||||
ipv6: String
|
||||
type: URL_TYPE!
|
||||
name: String
|
||||
}
|
||||
|
||||
"\n### Description:\n\nID scalar type that prefixes the underlying ID with the server identifier on output and strips it on input.\n\nWe use this scalar type to ensure that the ID is unique across all servers, allowing the same underlying resource ID to be used across different server instances.\n\n#### Input Behavior:\n\nWhen providing an ID as input (e.g., in arguments or input objects), the server identifier prefix ('<serverId>:') is optional.\n\n- If the prefix is present (e.g., '123:456'), it will be automatically stripped, and only the underlying ID ('456') will be used internally.\n- If the prefix is absent (e.g., '456'), the ID will be used as-is.\n\nThis makes it flexible for clients, as they don't strictly need to know or provide the server ID.\n\n#### Output Behavior:\n\nWhen an ID is returned in the response (output), it will *always* be prefixed with the current server's unique identifier (e.g., '123:456').\n\n#### Example:\n\nNote: The server identifier is '123' in this example.\n\n##### Input (Prefix Optional):\n```graphql\n# Both of these are valid inputs resolving to internal ID '456'\n{\n someQuery(id: \"123:456\") { ... }\n anotherQuery(id: \"456\") { ... }\n}\n```\n\n##### Output (Prefix Always Added):\n```graphql\n# Assuming internal ID is '456'\n{\n \"data\": {\n \"someResource\": {\n \"id\": \"123:456\" \n }\n }\n}\n```\n "
|
||||
scalar PrefixedID
|
||||
|
||||
@@ -1920,6 +1798,7 @@ type Query {
|
||||
display: Display!
|
||||
flash: Flash!
|
||||
info: Info!
|
||||
cpuUtilization: CpuUtilization!
|
||||
logFiles: [LogFile!]!
|
||||
logFile(path: String!, lines: Int, startLine: Int): LogFileContent!
|
||||
me: UserAccount!
|
||||
@@ -1967,10 +1846,6 @@ type Query {
|
||||
|
||||
"""List all installed plugins with their metadata"""
|
||||
plugins: [Plugin!]!
|
||||
remoteAccess: RemoteAccess!
|
||||
connect: Connect!
|
||||
network: Network!
|
||||
cloud: Cloud!
|
||||
}
|
||||
|
||||
type Mutation {
|
||||
@@ -2018,11 +1893,6 @@ type Mutation {
|
||||
Remove one or more plugins from the API. Returns false if restart was triggered automatically, true if manual restart is required.
|
||||
"""
|
||||
removePlugin(input: PluginManagementInput!): Boolean!
|
||||
updateApiSettings(input: ConnectSettingsInput!): ConnectSettingsValues!
|
||||
connectSignIn(input: ConnectSignInInput!): Boolean!
|
||||
connectSignOut: Boolean!
|
||||
setupRemoteAccess(input: SetupRemoteAccessInput!): Boolean!
|
||||
enableDynamicRemoteAccess(input: EnableDynamicRemoteAccessInput!): Boolean!
|
||||
}
|
||||
|
||||
input NotificationData {
|
||||
@@ -2140,69 +2010,10 @@ input PluginManagementInput {
|
||||
restart: Boolean! = true
|
||||
}
|
||||
|
||||
input ConnectSettingsInput {
|
||||
"""The type of WAN access to use for Remote Access"""
|
||||
accessType: WAN_ACCESS_TYPE
|
||||
|
||||
"""The type of port forwarding to use for Remote Access"""
|
||||
forwardType: WAN_FORWARD_TYPE
|
||||
|
||||
"""
|
||||
The port to use for Remote Access. Not required for UPNP forwardType. Required for STATIC forwardType. Ignored if accessType is DISABLED or forwardType is UPNP.
|
||||
"""
|
||||
port: Int
|
||||
}
|
||||
|
||||
input ConnectSignInInput {
|
||||
"""The API key for authentication"""
|
||||
apiKey: String!
|
||||
|
||||
"""User information for the sign-in"""
|
||||
userInfo: ConnectUserInfoInput
|
||||
}
|
||||
|
||||
input ConnectUserInfoInput {
|
||||
"""The preferred username of the user"""
|
||||
preferred_username: String!
|
||||
|
||||
"""The email address of the user"""
|
||||
email: String!
|
||||
|
||||
"""The avatar URL of the user"""
|
||||
avatar: String
|
||||
}
|
||||
|
||||
input SetupRemoteAccessInput {
|
||||
"""The type of WAN access to use for Remote Access"""
|
||||
accessType: WAN_ACCESS_TYPE!
|
||||
|
||||
"""The type of port forwarding to use for Remote Access"""
|
||||
forwardType: WAN_FORWARD_TYPE
|
||||
|
||||
"""
|
||||
The port to use for Remote Access. Not required for UPNP forwardType. Required for STATIC forwardType. Ignored if accessType is DISABLED or forwardType is UPNP.
|
||||
"""
|
||||
port: Int
|
||||
}
|
||||
|
||||
input EnableDynamicRemoteAccessInput {
|
||||
"""The AccessURL Input for dynamic remote access"""
|
||||
url: AccessUrlInput!
|
||||
|
||||
"""Whether to enable or disable dynamic remote access"""
|
||||
enabled: Boolean!
|
||||
}
|
||||
|
||||
input AccessUrlInput {
|
||||
type: URL_TYPE!
|
||||
name: String
|
||||
ipv4: URL
|
||||
ipv6: URL
|
||||
}
|
||||
|
||||
type Subscription {
|
||||
displaySubscription: Display!
|
||||
infoSubscription: Info!
|
||||
cpuUtilization: CpuUtilization!
|
||||
logFile(path: String!): LogFileContent!
|
||||
notificationAdded: Notification!
|
||||
notificationsOverview: NotificationOverview!
|
||||
|
||||
@@ -27,6 +27,7 @@ import {
|
||||
Versions,
|
||||
} from '@app/unraid-api/graph/resolvers/info/info.model.js';
|
||||
import { InfoService } from '@app/unraid-api/graph/resolvers/info/info.service.js';
|
||||
import { SubscriptionHelperService } from '@app/unraid-api/graph/services/subscription-helper.service.js';
|
||||
import { SubscriptionTrackerService } from '@app/unraid-api/graph/services/subscription-tracker.service.js';
|
||||
|
||||
@Resolver(() => Info)
|
||||
@@ -36,7 +37,8 @@ export class InfoResolver implements OnModuleInit {
|
||||
constructor(
|
||||
private readonly infoService: InfoService,
|
||||
private readonly displayService: DisplayService,
|
||||
private readonly subscriptionTracker: SubscriptionTrackerService
|
||||
private readonly subscriptionTracker: SubscriptionTrackerService,
|
||||
private readonly subscriptionHelper: SubscriptionHelperService
|
||||
) {}
|
||||
|
||||
onModuleInit() {
|
||||
@@ -159,18 +161,7 @@ export class InfoResolver implements OnModuleInit {
|
||||
possession: AuthPossession.ANY,
|
||||
})
|
||||
public async cpuUtilizationSubscription() {
|
||||
const iterator = createSubscription(PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
|
||||
return {
|
||||
[Symbol.asyncIterator]: () => {
|
||||
this.subscriptionTracker.subscribe(PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
return iterator[Symbol.asyncIterator]();
|
||||
},
|
||||
return: () => {
|
||||
this.subscriptionTracker.unsubscribe(PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
return iterator.return();
|
||||
},
|
||||
};
|
||||
return this.subscriptionHelper.createTrackedSubscription(PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
import { Module } from '@nestjs/common';
|
||||
|
||||
import { SubscriptionHelperService } from '@app/unraid-api/graph/services/subscription-helper.service.js';
|
||||
import { SubscriptionTrackerService } from '@app/unraid-api/graph/services/subscription-tracker.service.js';
|
||||
|
||||
@Module({
|
||||
providers: [SubscriptionTrackerService],
|
||||
exports: [SubscriptionTrackerService],
|
||||
providers: [SubscriptionTrackerService, SubscriptionHelperService],
|
||||
exports: [SubscriptionTrackerService, SubscriptionHelperService],
|
||||
})
|
||||
export class ServicesModule {}
|
||||
|
||||
@@ -0,0 +1,302 @@
|
||||
import { Logger } from '@nestjs/common';
|
||||
|
||||
import { PubSub } from 'graphql-subscriptions';
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
import { pubsub, PUBSUB_CHANNEL } from '@app/core/pubsub.js';
|
||||
import { SubscriptionHelperService } from '@app/unraid-api/graph/services/subscription-helper.service.js';
|
||||
import { SubscriptionTrackerService } from '@app/unraid-api/graph/services/subscription-tracker.service.js';
|
||||
|
||||
describe('SubscriptionHelperService', () => {
|
||||
let helperService: SubscriptionHelperService;
|
||||
let trackerService: SubscriptionTrackerService;
|
||||
let loggerSpy: any;
|
||||
|
||||
beforeEach(() => {
|
||||
trackerService = new SubscriptionTrackerService();
|
||||
helperService = new SubscriptionHelperService(trackerService);
|
||||
loggerSpy = vi.spyOn(Logger.prototype, 'debug').mockImplementation(() => {});
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
describe('createTrackedSubscription', () => {
|
||||
it('should create an async iterator that tracks subscriptions', async () => {
|
||||
const iterator = helperService.createTrackedSubscription(PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
|
||||
expect(iterator).toBeDefined();
|
||||
expect(iterator.next).toBeDefined();
|
||||
expect(iterator.return).toBeDefined();
|
||||
expect(iterator.throw).toBeDefined();
|
||||
expect(iterator[Symbol.asyncIterator]).toBeDefined();
|
||||
|
||||
// Should have subscribed
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(1);
|
||||
});
|
||||
|
||||
it('should return itself when Symbol.asyncIterator is called', () => {
|
||||
const iterator = helperService.createTrackedSubscription(PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
|
||||
expect(iterator[Symbol.asyncIterator]()).toBe(iterator);
|
||||
});
|
||||
|
||||
it('should unsubscribe when return() is called', async () => {
|
||||
const iterator = helperService.createTrackedSubscription(PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(1);
|
||||
|
||||
await iterator.return?.();
|
||||
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(0);
|
||||
});
|
||||
|
||||
it('should unsubscribe when throw() is called', async () => {
|
||||
const iterator = helperService.createTrackedSubscription(PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(1);
|
||||
|
||||
try {
|
||||
await iterator.throw?.(new Error('Test error'));
|
||||
} catch (e) {
|
||||
// Expected to throw
|
||||
}
|
||||
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe('integration with pubsub', () => {
|
||||
it('should receive published messages', async () => {
|
||||
const iterator = helperService.createTrackedSubscription<{ cpuUtilization: any }>(
|
||||
PUBSUB_CHANNEL.CPU_UTILIZATION
|
||||
);
|
||||
|
||||
const testData = {
|
||||
cpuUtilization: {
|
||||
id: 'test',
|
||||
load: 50,
|
||||
cpus: [],
|
||||
},
|
||||
};
|
||||
|
||||
// Set up the consumption promise first
|
||||
const consumePromise = iterator.next();
|
||||
|
||||
// Give a small delay to ensure subscription is fully set up
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
|
||||
// Publish a message
|
||||
await (pubsub as PubSub).publish(PUBSUB_CHANNEL.CPU_UTILIZATION, testData);
|
||||
|
||||
// Wait for the message
|
||||
const result = await consumePromise;
|
||||
|
||||
expect(result.done).toBe(false);
|
||||
expect(result.value).toEqual(testData);
|
||||
|
||||
await iterator.return?.();
|
||||
});
|
||||
|
||||
it('should handle multiple subscribers independently', async () => {
|
||||
// Register handlers to verify start/stop behavior
|
||||
const onStart = vi.fn();
|
||||
const onStop = vi.fn();
|
||||
trackerService.registerTopic(PUBSUB_CHANNEL.CPU_UTILIZATION, onStart, onStop);
|
||||
|
||||
// Create first subscriber
|
||||
const iterator1 = helperService.createTrackedSubscription(PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(1);
|
||||
expect(onStart).toHaveBeenCalledTimes(1);
|
||||
|
||||
// Create second subscriber
|
||||
const iterator2 = helperService.createTrackedSubscription(PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(2);
|
||||
expect(onStart).toHaveBeenCalledTimes(1); // Should not call again
|
||||
|
||||
// Create third subscriber
|
||||
const iterator3 = helperService.createTrackedSubscription(PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(3);
|
||||
|
||||
// Set up consumption promises first
|
||||
const consume1 = iterator1.next();
|
||||
const consume2 = iterator2.next();
|
||||
const consume3 = iterator3.next();
|
||||
|
||||
// Give a small delay to ensure subscriptions are fully set up
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
|
||||
// Publish a message - all should receive it
|
||||
const testData = { cpuUtilization: { id: 'test', load: 75, cpus: [] } };
|
||||
await (pubsub as PubSub).publish(PUBSUB_CHANNEL.CPU_UTILIZATION, testData);
|
||||
|
||||
const [result1, result2, result3] = await Promise.all([consume1, consume2, consume3]);
|
||||
|
||||
expect(result1.value).toEqual(testData);
|
||||
expect(result2.value).toEqual(testData);
|
||||
expect(result3.value).toEqual(testData);
|
||||
|
||||
// Clean up first subscriber
|
||||
await iterator1.return?.();
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(2);
|
||||
expect(onStop).not.toHaveBeenCalled();
|
||||
|
||||
// Clean up second subscriber
|
||||
await iterator2.return?.();
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(1);
|
||||
expect(onStop).not.toHaveBeenCalled();
|
||||
|
||||
// Clean up last subscriber - should trigger onStop
|
||||
await iterator3.return?.();
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(0);
|
||||
expect(onStop).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('should handle rapid subscribe/unsubscribe cycles', async () => {
|
||||
const iterations = 10;
|
||||
|
||||
for (let i = 0; i < iterations; i++) {
|
||||
const iterator = helperService.createTrackedSubscription(PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(1);
|
||||
|
||||
await iterator.return?.();
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(0);
|
||||
}
|
||||
});
|
||||
|
||||
it('should properly clean up on error', async () => {
|
||||
const iterator = helperService.createTrackedSubscription(PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(1);
|
||||
|
||||
const testError = new Error('Test error');
|
||||
|
||||
try {
|
||||
await iterator.throw?.(testError);
|
||||
expect.fail('Should have thrown');
|
||||
} catch (error) {
|
||||
expect(error).toBe(testError);
|
||||
}
|
||||
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(0);
|
||||
});
|
||||
|
||||
it('should log debug messages for subscription lifecycle', async () => {
|
||||
vi.clearAllMocks();
|
||||
|
||||
const iterator = helperService.createTrackedSubscription(PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
|
||||
expect(loggerSpy).toHaveBeenCalledWith(
|
||||
expect.stringContaining('Subscription added for topic')
|
||||
);
|
||||
|
||||
await iterator.return?.();
|
||||
|
||||
expect(loggerSpy).toHaveBeenCalledWith(
|
||||
expect.stringContaining('Subscription removed for topic')
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('different topic types', () => {
|
||||
it('should handle INFO channel subscriptions', async () => {
|
||||
const iterator = helperService.createTrackedSubscription(PUBSUB_CHANNEL.INFO);
|
||||
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.INFO)).toBe(1);
|
||||
|
||||
// Set up consumption promise first
|
||||
const consumePromise = iterator.next();
|
||||
|
||||
// Give a small delay to ensure subscription is fully set up
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
|
||||
const testData = { info: { id: 'test-info' } };
|
||||
await (pubsub as PubSub).publish(PUBSUB_CHANNEL.INFO, testData);
|
||||
|
||||
const result = await consumePromise;
|
||||
expect(result.value).toEqual(testData);
|
||||
|
||||
await iterator.return?.();
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.INFO)).toBe(0);
|
||||
});
|
||||
|
||||
it('should track multiple different topics independently', async () => {
|
||||
const cpuIterator = helperService.createTrackedSubscription(PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
const infoIterator = helperService.createTrackedSubscription(PUBSUB_CHANNEL.INFO);
|
||||
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(1);
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.INFO)).toBe(1);
|
||||
|
||||
const allCounts = trackerService.getAllSubscriberCounts();
|
||||
expect(allCounts.get(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(1);
|
||||
expect(allCounts.get(PUBSUB_CHANNEL.INFO)).toBe(1);
|
||||
|
||||
await cpuIterator.return?.();
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(0);
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.INFO)).toBe(1);
|
||||
|
||||
await infoIterator.return?.();
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.INFO)).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe('edge cases', () => {
|
||||
it('should handle return() called multiple times', async () => {
|
||||
const iterator = helperService.createTrackedSubscription(PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(1);
|
||||
|
||||
await iterator.return?.();
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(0);
|
||||
|
||||
// Second return should be idempotent
|
||||
await iterator.return?.();
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(0);
|
||||
|
||||
// Check that idempotent message was logged
|
||||
expect(loggerSpy).toHaveBeenCalledWith(
|
||||
expect.stringContaining('no active subscribers (idempotent)')
|
||||
);
|
||||
});
|
||||
|
||||
it('should handle async iterator protocol correctly', async () => {
|
||||
const iterator = helperService.createTrackedSubscription(PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
|
||||
// Test that it works in for-await loop (would use Symbol.asyncIterator)
|
||||
const receivedMessages: any[] = [];
|
||||
const maxMessages = 3;
|
||||
|
||||
// Start consuming in background
|
||||
const consumePromise = (async () => {
|
||||
let count = 0;
|
||||
for await (const message of iterator) {
|
||||
receivedMessages.push(message);
|
||||
count++;
|
||||
if (count >= maxMessages) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
})();
|
||||
|
||||
// Publish messages
|
||||
for (let i = 0; i < maxMessages; i++) {
|
||||
await (pubsub as PubSub).publish(PUBSUB_CHANNEL.CPU_UTILIZATION, {
|
||||
cpuUtilization: { id: `test-${i}`, load: i * 10, cpus: [] },
|
||||
});
|
||||
}
|
||||
|
||||
// Wait for consumption to complete
|
||||
await consumePromise;
|
||||
|
||||
expect(receivedMessages).toHaveLength(maxMessages);
|
||||
expect(receivedMessages[0].cpuUtilization.load).toBe(0);
|
||||
expect(receivedMessages[1].cpuUtilization.load).toBe(10);
|
||||
expect(receivedMessages[2].cpuUtilization.load).toBe(20);
|
||||
|
||||
// Clean up
|
||||
await iterator.return?.();
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(0);
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,57 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
|
||||
import { createSubscription, PUBSUB_CHANNEL } from '@app/core/pubsub.js';
|
||||
import { SubscriptionTrackerService } from '@app/unraid-api/graph/services/subscription-tracker.service.js';
|
||||
|
||||
/**
|
||||
* Helper service for creating tracked GraphQL subscriptions with automatic cleanup
|
||||
*/
|
||||
@Injectable()
|
||||
export class SubscriptionHelperService {
|
||||
constructor(private readonly subscriptionTracker: SubscriptionTrackerService) {}
|
||||
|
||||
/**
|
||||
* Creates a tracked async iterator that automatically handles subscription/unsubscription
|
||||
* @param topic The subscription topic/channel to subscribe to
|
||||
* @returns A proxy async iterator with automatic cleanup
|
||||
*/
|
||||
public createTrackedSubscription<T = any>(topic: PUBSUB_CHANNEL): AsyncIterableIterator<T> {
|
||||
const iterator = createSubscription(topic) as AsyncIterable<T>;
|
||||
const innerIterator = iterator[Symbol.asyncIterator]();
|
||||
|
||||
// Subscribe when the subscription starts
|
||||
this.subscriptionTracker.subscribe(topic);
|
||||
|
||||
// Return a proxy async iterator that properly handles cleanup
|
||||
const proxyIterator: AsyncIterableIterator<T> = {
|
||||
next: () => innerIterator.next(),
|
||||
|
||||
return: async () => {
|
||||
// Cleanup: unsubscribe from tracker
|
||||
this.subscriptionTracker.unsubscribe(topic);
|
||||
|
||||
// Forward the return call to inner iterator
|
||||
if (innerIterator.return) {
|
||||
return innerIterator.return();
|
||||
}
|
||||
return Promise.resolve({ value: undefined, done: true });
|
||||
},
|
||||
|
||||
throw: async (error?: any) => {
|
||||
// Cleanup: unsubscribe from tracker on error
|
||||
this.subscriptionTracker.unsubscribe(topic);
|
||||
|
||||
// Forward the throw call to inner iterator
|
||||
if (innerIterator.throw) {
|
||||
return innerIterator.throw(error);
|
||||
}
|
||||
return Promise.reject(error);
|
||||
},
|
||||
|
||||
// The proxy iterator returns itself for Symbol.asyncIterator
|
||||
[Symbol.asyncIterator]: () => proxyIterator,
|
||||
};
|
||||
|
||||
return proxyIterator;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,285 @@
|
||||
import { Logger } from '@nestjs/common';
|
||||
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
import { SubscriptionTrackerService } from '@app/unraid-api/graph/services/subscription-tracker.service.js';
|
||||
|
||||
describe('SubscriptionTrackerService', () => {
|
||||
let service: SubscriptionTrackerService;
|
||||
let loggerSpy: any;
|
||||
|
||||
beforeEach(() => {
|
||||
service = new SubscriptionTrackerService();
|
||||
// Spy on logger methods
|
||||
loggerSpy = vi.spyOn(Logger.prototype, 'debug').mockImplementation(() => {});
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
describe('registerTopic', () => {
|
||||
it('should register topic handlers', () => {
|
||||
const onStart = vi.fn();
|
||||
const onStop = vi.fn();
|
||||
|
||||
service.registerTopic('TEST_TOPIC', onStart, onStop);
|
||||
|
||||
// Verify handlers are stored (indirectly through subscribe/unsubscribe)
|
||||
service.subscribe('TEST_TOPIC');
|
||||
expect(onStart).toHaveBeenCalledTimes(1);
|
||||
|
||||
service.unsubscribe('TEST_TOPIC');
|
||||
expect(onStop).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
|
||||
describe('subscribe', () => {
|
||||
it('should increment subscriber count', () => {
|
||||
service.subscribe('TEST_TOPIC');
|
||||
expect(service.getSubscriberCount('TEST_TOPIC')).toBe(1);
|
||||
|
||||
service.subscribe('TEST_TOPIC');
|
||||
expect(service.getSubscriberCount('TEST_TOPIC')).toBe(2);
|
||||
|
||||
service.subscribe('TEST_TOPIC');
|
||||
expect(service.getSubscriberCount('TEST_TOPIC')).toBe(3);
|
||||
});
|
||||
|
||||
it('should call onStart handler only for first subscriber', () => {
|
||||
const onStart = vi.fn();
|
||||
const onStop = vi.fn();
|
||||
|
||||
service.registerTopic('TEST_TOPIC', onStart, onStop);
|
||||
|
||||
// First subscriber should trigger onStart
|
||||
service.subscribe('TEST_TOPIC');
|
||||
expect(onStart).toHaveBeenCalledTimes(1);
|
||||
|
||||
// Additional subscribers should not trigger onStart
|
||||
service.subscribe('TEST_TOPIC');
|
||||
service.subscribe('TEST_TOPIC');
|
||||
expect(onStart).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('should log subscription events', () => {
|
||||
service.subscribe('TEST_TOPIC');
|
||||
expect(loggerSpy).toHaveBeenCalledWith(
|
||||
"Subscription added for topic 'TEST_TOPIC': 1 active subscriber(s)"
|
||||
);
|
||||
|
||||
service.subscribe('TEST_TOPIC');
|
||||
expect(loggerSpy).toHaveBeenCalledWith(
|
||||
"Subscription added for topic 'TEST_TOPIC': 2 active subscriber(s)"
|
||||
);
|
||||
});
|
||||
|
||||
it('should log when starting a topic', () => {
|
||||
const onStart = vi.fn();
|
||||
const onStop = vi.fn();
|
||||
|
||||
service.registerTopic('TEST_TOPIC', onStart, onStop);
|
||||
service.subscribe('TEST_TOPIC');
|
||||
|
||||
expect(loggerSpy).toHaveBeenCalledWith("Starting topic 'TEST_TOPIC' (first subscriber)");
|
||||
});
|
||||
});
|
||||
|
||||
describe('unsubscribe', () => {
|
||||
it('should decrement subscriber count', () => {
|
||||
service.subscribe('TEST_TOPIC');
|
||||
service.subscribe('TEST_TOPIC');
|
||||
service.subscribe('TEST_TOPIC');
|
||||
expect(service.getSubscriberCount('TEST_TOPIC')).toBe(3);
|
||||
|
||||
service.unsubscribe('TEST_TOPIC');
|
||||
expect(service.getSubscriberCount('TEST_TOPIC')).toBe(2);
|
||||
|
||||
service.unsubscribe('TEST_TOPIC');
|
||||
expect(service.getSubscriberCount('TEST_TOPIC')).toBe(1);
|
||||
|
||||
service.unsubscribe('TEST_TOPIC');
|
||||
expect(service.getSubscriberCount('TEST_TOPIC')).toBe(0);
|
||||
});
|
||||
|
||||
it('should call onStop handler only when last subscriber unsubscribes', () => {
|
||||
const onStart = vi.fn();
|
||||
const onStop = vi.fn();
|
||||
|
||||
service.registerTopic('TEST_TOPIC', onStart, onStop);
|
||||
|
||||
service.subscribe('TEST_TOPIC');
|
||||
service.subscribe('TEST_TOPIC');
|
||||
service.subscribe('TEST_TOPIC');
|
||||
|
||||
service.unsubscribe('TEST_TOPIC');
|
||||
expect(onStop).not.toHaveBeenCalled();
|
||||
|
||||
service.unsubscribe('TEST_TOPIC');
|
||||
expect(onStop).not.toHaveBeenCalled();
|
||||
|
||||
service.unsubscribe('TEST_TOPIC');
|
||||
expect(onStop).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('should be idempotent when called with no subscribers', () => {
|
||||
const onStart = vi.fn();
|
||||
const onStop = vi.fn();
|
||||
|
||||
service.registerTopic('TEST_TOPIC', onStart, onStop);
|
||||
|
||||
// Unsubscribe without any subscribers
|
||||
service.unsubscribe('TEST_TOPIC');
|
||||
expect(onStop).not.toHaveBeenCalled();
|
||||
expect(service.getSubscriberCount('TEST_TOPIC')).toBe(0);
|
||||
|
||||
// Should log idempotent message
|
||||
expect(loggerSpy).toHaveBeenCalledWith(
|
||||
"Unsubscribe called for topic 'TEST_TOPIC' but no active subscribers (idempotent)"
|
||||
);
|
||||
});
|
||||
|
||||
it('should log unsubscription events', () => {
|
||||
service.subscribe('TEST_TOPIC');
|
||||
service.subscribe('TEST_TOPIC');
|
||||
|
||||
vi.clearAllMocks();
|
||||
|
||||
service.unsubscribe('TEST_TOPIC');
|
||||
expect(loggerSpy).toHaveBeenCalledWith(
|
||||
"Subscription removed for topic 'TEST_TOPIC': 1 active subscriber(s) remaining"
|
||||
);
|
||||
|
||||
service.unsubscribe('TEST_TOPIC');
|
||||
expect(loggerSpy).toHaveBeenCalledWith(
|
||||
"Subscription removed for topic 'TEST_TOPIC': 0 active subscriber(s) remaining"
|
||||
);
|
||||
});
|
||||
|
||||
it('should log when stopping a topic', () => {
|
||||
const onStart = vi.fn();
|
||||
const onStop = vi.fn();
|
||||
|
||||
service.registerTopic('TEST_TOPIC', onStart, onStop);
|
||||
service.subscribe('TEST_TOPIC');
|
||||
|
||||
vi.clearAllMocks();
|
||||
|
||||
service.unsubscribe('TEST_TOPIC');
|
||||
expect(loggerSpy).toHaveBeenCalledWith(
|
||||
"Stopping topic 'TEST_TOPIC' (last subscriber removed)"
|
||||
);
|
||||
});
|
||||
|
||||
it('should delete topic entry when count reaches zero', () => {
|
||||
service.subscribe('TEST_TOPIC');
|
||||
expect(service.getSubscriberCount('TEST_TOPIC')).toBe(1);
|
||||
|
||||
service.unsubscribe('TEST_TOPIC');
|
||||
expect(service.getSubscriberCount('TEST_TOPIC')).toBe(0);
|
||||
|
||||
// Should return 0 for non-existent topics
|
||||
expect(service.getAllSubscriberCounts().has('TEST_TOPIC')).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe('getSubscriberCount', () => {
|
||||
it('should return correct count for active topic', () => {
|
||||
service.subscribe('TEST_TOPIC');
|
||||
service.subscribe('TEST_TOPIC');
|
||||
|
||||
expect(service.getSubscriberCount('TEST_TOPIC')).toBe(2);
|
||||
});
|
||||
|
||||
it('should return 0 for non-existent topic', () => {
|
||||
expect(service.getSubscriberCount('UNKNOWN_TOPIC')).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe('getAllSubscriberCounts', () => {
|
||||
it('should return all active topics and counts', () => {
|
||||
service.subscribe('TOPIC_1');
|
||||
service.subscribe('TOPIC_1');
|
||||
service.subscribe('TOPIC_2');
|
||||
service.subscribe('TOPIC_3');
|
||||
service.subscribe('TOPIC_3');
|
||||
service.subscribe('TOPIC_3');
|
||||
|
||||
const counts = service.getAllSubscriberCounts();
|
||||
|
||||
expect(counts.get('TOPIC_1')).toBe(2);
|
||||
expect(counts.get('TOPIC_2')).toBe(1);
|
||||
expect(counts.get('TOPIC_3')).toBe(3);
|
||||
});
|
||||
|
||||
it('should return empty map when no subscribers', () => {
|
||||
const counts = service.getAllSubscriberCounts();
|
||||
expect(counts.size).toBe(0);
|
||||
});
|
||||
|
||||
it('should return a copy of the internal map', () => {
|
||||
service.subscribe('TEST_TOPIC');
|
||||
|
||||
const counts1 = service.getAllSubscriberCounts();
|
||||
counts1.set('TEST_TOPIC', 999);
|
||||
|
||||
const counts2 = service.getAllSubscriberCounts();
|
||||
expect(counts2.get('TEST_TOPIC')).toBe(1);
|
||||
});
|
||||
});
|
||||
|
||||
describe('complex scenarios', () => {
|
||||
it('should handle multiple topics independently', () => {
|
||||
const onStart1 = vi.fn();
|
||||
const onStop1 = vi.fn();
|
||||
const onStart2 = vi.fn();
|
||||
const onStop2 = vi.fn();
|
||||
|
||||
service.registerTopic('TOPIC_1', onStart1, onStop1);
|
||||
service.registerTopic('TOPIC_2', onStart2, onStop2);
|
||||
|
||||
service.subscribe('TOPIC_1');
|
||||
expect(onStart1).toHaveBeenCalledTimes(1);
|
||||
expect(onStart2).not.toHaveBeenCalled();
|
||||
|
||||
service.subscribe('TOPIC_2');
|
||||
expect(onStart2).toHaveBeenCalledTimes(1);
|
||||
|
||||
service.unsubscribe('TOPIC_1');
|
||||
expect(onStop1).toHaveBeenCalledTimes(1);
|
||||
expect(onStop2).not.toHaveBeenCalled();
|
||||
|
||||
service.unsubscribe('TOPIC_2');
|
||||
expect(onStop2).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('should handle resubscription after all unsubscribed', () => {
|
||||
const onStart = vi.fn();
|
||||
const onStop = vi.fn();
|
||||
|
||||
service.registerTopic('TEST_TOPIC', onStart, onStop);
|
||||
|
||||
// First cycle
|
||||
service.subscribe('TEST_TOPIC');
|
||||
service.unsubscribe('TEST_TOPIC');
|
||||
|
||||
expect(onStart).toHaveBeenCalledTimes(1);
|
||||
expect(onStop).toHaveBeenCalledTimes(1);
|
||||
|
||||
// Second cycle - should call onStart again
|
||||
service.subscribe('TEST_TOPIC');
|
||||
expect(onStart).toHaveBeenCalledTimes(2);
|
||||
|
||||
service.unsubscribe('TEST_TOPIC');
|
||||
expect(onStop).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it('should handle missing handlers gracefully', () => {
|
||||
// Subscribe without registering handlers
|
||||
expect(() => service.subscribe('UNREGISTERED_TOPIC')).not.toThrow();
|
||||
expect(() => service.unsubscribe('UNREGISTERED_TOPIC')).not.toThrow();
|
||||
|
||||
expect(service.getSubscriberCount('UNREGISTERED_TOPIC')).toBe(0);
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -1,7 +1,8 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
|
||||
@Injectable()
|
||||
export class SubscriptionTrackerService {
|
||||
private readonly logger = new Logger(SubscriptionTrackerService.name);
|
||||
private subscriberCounts = new Map<string, number>();
|
||||
private topicHandlers = new Map<string, { onStart: () => void; onStop: () => void }>();
|
||||
|
||||
@@ -11,9 +12,13 @@ export class SubscriptionTrackerService {
|
||||
|
||||
public subscribe(topic: string): void {
|
||||
const currentCount = this.subscriberCounts.get(topic) ?? 0;
|
||||
this.subscriberCounts.set(topic, currentCount + 1);
|
||||
const newCount = currentCount + 1;
|
||||
this.subscriberCounts.set(topic, newCount);
|
||||
|
||||
this.logger.debug(`Subscription added for topic '${topic}': ${newCount} active subscriber(s)`);
|
||||
|
||||
if (currentCount === 0) {
|
||||
this.logger.debug(`Starting topic '${topic}' (first subscriber)`);
|
||||
const handlers = this.topicHandlers.get(topic);
|
||||
if (handlers?.onStart) {
|
||||
handlers.onStart();
|
||||
@@ -21,20 +26,46 @@ export class SubscriptionTrackerService {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current subscriber count for a topic
|
||||
* @param topic The topic to check
|
||||
* @returns The number of active subscribers
|
||||
*/
|
||||
public getSubscriberCount(topic: string): number {
|
||||
return this.subscriberCounts.get(topic) ?? 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all active topics and their subscriber counts
|
||||
* @returns A map of topics to subscriber counts
|
||||
*/
|
||||
public getAllSubscriberCounts(): Map<string, number> {
|
||||
return new Map(this.subscriberCounts);
|
||||
}
|
||||
|
||||
public unsubscribe(topic: string): void {
|
||||
const currentCount = this.subscriberCounts.get(topic) ?? 0;
|
||||
|
||||
// Early return for idempotency - if already at 0, do nothing
|
||||
if (currentCount === 0) {
|
||||
this.logger.debug(
|
||||
`Unsubscribe called for topic '${topic}' but no active subscribers (idempotent)`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const newCount = currentCount - 1;
|
||||
|
||||
this.logger.debug(
|
||||
`Subscription removed for topic '${topic}': ${newCount} active subscriber(s) remaining`
|
||||
);
|
||||
|
||||
if (newCount === 0) {
|
||||
// Delete the topic entry when reaching zero
|
||||
this.subscriberCounts.delete(topic);
|
||||
|
||||
this.logger.debug(`Stopping topic '${topic}' (last subscriber removed)`);
|
||||
|
||||
// Call onStop handler if it exists
|
||||
const handlers = this.topicHandlers.get(topic);
|
||||
if (handlers?.onStop) {
|
||||
|
||||
Reference in New Issue
Block a user