mirror of
https://github.com/unraid/api.git
synced 2026-01-02 22:50:02 -06:00
chore: Update API version and refactor pubsub channel references
- Updated API version in api.json from 4.25.3 to 4.27.2. - Refactored pubsub channel references across multiple files to use GRAPHQL_PUBSUB_CHANNEL instead of PUBSUB_CHANNEL, enhancing consistency and clarity in the codebase. - Adjusted related tests to ensure they align with the new pubsub channel structure.
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
{
|
||||
"version": "4.25.3",
|
||||
"version": "4.27.2",
|
||||
"extraOrigins": [],
|
||||
"sandbox": true,
|
||||
"ssoSubIds": [],
|
||||
|
||||
@@ -7,8 +7,6 @@ import { PubSub } from 'graphql-subscriptions';
|
||||
const eventEmitter = new EventEmitter();
|
||||
eventEmitter.setMaxListeners(30);
|
||||
|
||||
export { GRAPHQL_PUBSUB_CHANNEL as PUBSUB_CHANNEL };
|
||||
|
||||
export const pubsub = new PubSub({ eventEmitter });
|
||||
|
||||
/**
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
import { isAnyOf } from '@reduxjs/toolkit';
|
||||
import { GRAPHQL_PUBSUB_CHANNEL } from '@unraid/shared/pubsub/graphql.pubsub.js';
|
||||
import { isEqual } from 'lodash-es';
|
||||
|
||||
import { logger } from '@app/core/log.js';
|
||||
import { getArrayData } from '@app/core/modules/array/get-array-data.js';
|
||||
import { pubsub, PUBSUB_CHANNEL } from '@app/core/pubsub.js';
|
||||
import { pubsub } from '@app/core/pubsub.js';
|
||||
import { startAppListening } from '@app/store/listeners/listener-middleware.js';
|
||||
import { loadSingleStateFile } from '@app/store/modules/emhttp.js';
|
||||
import { StateFileKey } from '@app/store/types.js';
|
||||
@@ -20,14 +21,14 @@ export const enableArrayEventListener = () =>
|
||||
await delay(5_000);
|
||||
const array = getArrayData(getState);
|
||||
if (!isEqual(oldArrayData, array)) {
|
||||
pubsub.publish(PUBSUB_CHANNEL.ARRAY, { array });
|
||||
pubsub.publish(GRAPHQL_PUBSUB_CHANNEL.ARRAY, { array });
|
||||
logger.debug({ event: array }, 'Array was updated, publishing event');
|
||||
}
|
||||
|
||||
subscribe();
|
||||
} else if (action.meta.arg === StateFileKey.var) {
|
||||
if (!isEqual(getOriginalState().emhttp.var?.name, getState().emhttp.var?.name)) {
|
||||
await pubsub.publish(PUBSUB_CHANNEL.INFO, {
|
||||
await pubsub.publish(GRAPHQL_PUBSUB_CHANNEL.INFO, {
|
||||
info: {
|
||||
os: {
|
||||
hostname: getState().emhttp.var?.name,
|
||||
|
||||
@@ -4,14 +4,20 @@ import { join } from 'node:path';
|
||||
|
||||
import { afterAll, beforeAll, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
import { LogService } from '@app/unraid-api/cli/log.service.js';
|
||||
|
||||
const logger = {
|
||||
clear: vi.fn(),
|
||||
shouldLog: vi.fn(() => true),
|
||||
table: vi.fn(),
|
||||
trace: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
error: vi.fn(),
|
||||
log: vi.fn(),
|
||||
info: vi.fn(),
|
||||
debug: vi.fn(),
|
||||
} as const;
|
||||
always: vi.fn(),
|
||||
} as unknown as LogService;
|
||||
|
||||
describe('NodemonService (real nodemon)', () => {
|
||||
const tmpRoot = join(tmpdir(), 'nodemon-service-');
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
import { Query, Resolver, Subscription } from '@nestjs/graphql';
|
||||
|
||||
import { AuthAction, Resource } from '@unraid/shared/graphql.model.js';
|
||||
import { GRAPHQL_PUBSUB_CHANNEL } from '@unraid/shared/pubsub/graphql.pubsub.js';
|
||||
import { UsePermissions } from '@unraid/shared/use-permissions.directive.js';
|
||||
|
||||
import { createSubscription, PUBSUB_CHANNEL } from '@app/core/pubsub.js';
|
||||
import { createSubscription } from '@app/core/pubsub.js';
|
||||
import { UnraidArray } from '@app/unraid-api/graph/resolvers/array/array.model.js';
|
||||
import { ArrayService } from '@app/unraid-api/graph/resolvers/array/array.service.js';
|
||||
|
||||
@@ -26,6 +27,6 @@ export class ArrayResolver {
|
||||
resource: Resource.ARRAY,
|
||||
})
|
||||
public async arraySubscription() {
|
||||
return createSubscription(PUBSUB_CHANNEL.ARRAY);
|
||||
return createSubscription(GRAPHQL_PUBSUB_CHANNEL.ARRAY);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
import { Query, Resolver, Subscription } from '@nestjs/graphql';
|
||||
|
||||
import { AuthAction, Resource } from '@unraid/shared/graphql.model.js';
|
||||
import { GRAPHQL_PUBSUB_CHANNEL } from '@unraid/shared/pubsub/graphql.pubsub.js';
|
||||
import { UsePermissions } from '@unraid/shared/use-permissions.directive.js';
|
||||
import { PubSub } from 'graphql-subscriptions';
|
||||
|
||||
import { PUBSUB_CHANNEL } from '@app/core/pubsub.js';
|
||||
import { ArrayService } from '@app/unraid-api/graph/resolvers/array/array.service.js';
|
||||
import { ParityCheck } from '@app/unraid-api/graph/resolvers/array/parity.model.js';
|
||||
import { ParityService } from '@app/unraid-api/graph/resolvers/array/parity.service.js';
|
||||
@@ -33,6 +33,6 @@ export class ParityResolver {
|
||||
})
|
||||
@Subscription(() => ParityCheck)
|
||||
parityHistorySubscription() {
|
||||
return pubSub.asyncIterableIterator(PUBSUB_CHANNEL.PARITY);
|
||||
return pubSub.asyncIterableIterator(GRAPHQL_PUBSUB_CHANNEL.PARITY);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import type { TestingModule } from '@nestjs/testing';
|
||||
import { Test } from '@nestjs/testing';
|
||||
|
||||
import { GRAPHQL_PUBSUB_CHANNEL } from '@unraid/shared/pubsub/graphql.pubsub.js';
|
||||
import { beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
import { DisplayResolver } from '@app/unraid-api/graph/resolvers/display/display.resolver.js';
|
||||
@@ -9,9 +10,6 @@ import { DisplayService } from '@app/unraid-api/graph/resolvers/info/display/dis
|
||||
// Mock the pubsub module
|
||||
vi.mock('@app/core/pubsub.js', () => ({
|
||||
createSubscription: vi.fn().mockReturnValue('mock-subscription'),
|
||||
PUBSUB_CHANNEL: {
|
||||
DISPLAY: 'display',
|
||||
},
|
||||
}));
|
||||
|
||||
describe('DisplayResolver', () => {
|
||||
@@ -80,11 +78,11 @@ describe('DisplayResolver', () => {
|
||||
|
||||
describe('displaySubscription', () => {
|
||||
it('should create and return subscription', async () => {
|
||||
const { createSubscription, PUBSUB_CHANNEL } = await import('@app/core/pubsub.js');
|
||||
const { createSubscription } = await import('@app/core/pubsub.js');
|
||||
|
||||
const result = await resolver.displaySubscription();
|
||||
|
||||
expect(createSubscription).toHaveBeenCalledWith(PUBSUB_CHANNEL.DISPLAY);
|
||||
expect(createSubscription).toHaveBeenCalledWith(GRAPHQL_PUBSUB_CHANNEL.DISPLAY);
|
||||
expect(result).toBe('mock-subscription');
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
import { Query, Resolver, Subscription } from '@nestjs/graphql';
|
||||
|
||||
import { AuthAction, Resource } from '@unraid/shared/graphql.model.js';
|
||||
import { GRAPHQL_PUBSUB_CHANNEL } from '@unraid/shared/pubsub/graphql.pubsub.js';
|
||||
import { UsePermissions } from '@unraid/shared/use-permissions.directive.js';
|
||||
|
||||
import { createSubscription, PUBSUB_CHANNEL } from '@app/core/pubsub.js';
|
||||
import { createSubscription } from '@app/core/pubsub.js';
|
||||
import { Display } from '@app/unraid-api/graph/resolvers/info/display/display.model.js';
|
||||
import { DisplayService } from '@app/unraid-api/graph/resolvers/info/display/display.service.js';
|
||||
|
||||
@@ -26,6 +27,6 @@ export class DisplayResolver {
|
||||
resource: Resource.DISPLAY,
|
||||
})
|
||||
public async displaySubscription() {
|
||||
return createSubscription(PUBSUB_CHANNEL.DISPLAY);
|
||||
return createSubscription(GRAPHQL_PUBSUB_CHANNEL.DISPLAY);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,11 +2,12 @@ import { Logger } from '@nestjs/common';
|
||||
import { Test, TestingModule } from '@nestjs/testing';
|
||||
import { PassThrough, Readable } from 'stream';
|
||||
|
||||
import { GRAPHQL_PUBSUB_CHANNEL } from '@unraid/shared/pubsub/graphql.pubsub.js';
|
||||
import Docker from 'dockerode';
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
// Import pubsub for use in tests
|
||||
import { pubsub, PUBSUB_CHANNEL } from '@app/core/pubsub.js';
|
||||
import { pubsub } from '@app/core/pubsub.js';
|
||||
import { DockerEventService } from '@app/unraid-api/graph/resolvers/docker/docker-event.service.js';
|
||||
import { DockerService } from '@app/unraid-api/graph/resolvers/docker/docker.service.js';
|
||||
|
||||
@@ -46,9 +47,6 @@ vi.mock('@app/core/pubsub.js', () => ({
|
||||
pubsub: {
|
||||
publish: vi.fn().mockResolvedValue(undefined),
|
||||
},
|
||||
PUBSUB_CHANNEL: {
|
||||
INFO: 'info',
|
||||
},
|
||||
}));
|
||||
|
||||
// Mock DockerService
|
||||
@@ -140,7 +138,7 @@ describe('DockerEventService', () => {
|
||||
|
||||
expect(dockerService.clearContainerCache).toHaveBeenCalled();
|
||||
expect(dockerService.getAppInfo).toHaveBeenCalled();
|
||||
expect(pubsub.publish).toHaveBeenCalledWith(PUBSUB_CHANNEL.INFO, expect.any(Object));
|
||||
expect(pubsub.publish).toHaveBeenCalledWith(GRAPHQL_PUBSUB_CHANNEL.INFO, expect.any(Object));
|
||||
});
|
||||
|
||||
it('should ignore non-watched actions', async () => {
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
import { Injectable, Logger, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
|
||||
import { Readable } from 'stream';
|
||||
|
||||
import { GRAPHQL_PUBSUB_CHANNEL } from '@unraid/shared/pubsub/graphql.pubsub.js';
|
||||
import { watch } from 'chokidar';
|
||||
import Docker from 'dockerode';
|
||||
|
||||
import { pubsub, PUBSUB_CHANNEL } from '@app/core/pubsub.js';
|
||||
import { pubsub } from '@app/core/pubsub.js';
|
||||
import { getters } from '@app/store/index.js';
|
||||
import { DockerService } from '@app/unraid-api/graph/resolvers/docker/docker.service.js';
|
||||
|
||||
@@ -132,7 +133,7 @@ export class DockerEventService implements OnModuleDestroy, OnModuleInit {
|
||||
await this.dockerService.clearContainerCache();
|
||||
// Get updated counts and publish
|
||||
const appInfo = await this.dockerService.getAppInfo();
|
||||
await pubsub.publish(PUBSUB_CHANNEL.INFO, appInfo);
|
||||
await pubsub.publish(GRAPHQL_PUBSUB_CHANNEL.INFO, appInfo);
|
||||
this.logger.debug(`Published app info update due to event: ${actionName}`);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,11 +2,12 @@ import type { TestingModule } from '@nestjs/testing';
|
||||
import { CACHE_MANAGER } from '@nestjs/cache-manager';
|
||||
import { Test } from '@nestjs/testing';
|
||||
|
||||
import { GRAPHQL_PUBSUB_CHANNEL } from '@unraid/shared/pubsub/graphql.pubsub.js';
|
||||
import Docker from 'dockerode';
|
||||
import { beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
// Import the mocked pubsub parts
|
||||
import { pubsub, PUBSUB_CHANNEL } from '@app/core/pubsub.js';
|
||||
import { pubsub } from '@app/core/pubsub.js';
|
||||
import { ContainerState, DockerContainer } from '@app/unraid-api/graph/resolvers/docker/docker.model.js';
|
||||
import { DockerService } from '@app/unraid-api/graph/resolvers/docker/docker.service.js';
|
||||
|
||||
@@ -15,7 +16,7 @@ vi.mock('@app/core/pubsub.js', () => ({
|
||||
pubsub: {
|
||||
publish: vi.fn().mockResolvedValue(undefined),
|
||||
},
|
||||
PUBSUB_CHANNEL: {
|
||||
GRAPHQL_PUBSUB_CHANNEL: {
|
||||
INFO: 'info',
|
||||
},
|
||||
}));
|
||||
@@ -274,7 +275,7 @@ describe('DockerService', () => {
|
||||
expect(mockCacheManager.del).toHaveBeenCalledWith(DockerService.CONTAINER_CACHE_KEY);
|
||||
expect(mockListContainers).toHaveBeenCalled();
|
||||
expect(mockCacheManager.set).toHaveBeenCalled();
|
||||
expect(pubsub.publish).toHaveBeenCalledWith(PUBSUB_CHANNEL.INFO, {
|
||||
expect(pubsub.publish).toHaveBeenCalledWith(GRAPHQL_PUBSUB_CHANNEL.INFO, {
|
||||
info: {
|
||||
apps: { installed: 1, running: 1 },
|
||||
},
|
||||
@@ -332,7 +333,7 @@ describe('DockerService', () => {
|
||||
expect(mockCacheManager.del).toHaveBeenCalledWith(DockerService.CONTAINER_CACHE_KEY);
|
||||
expect(mockListContainers).toHaveBeenCalled();
|
||||
expect(mockCacheManager.set).toHaveBeenCalled();
|
||||
expect(pubsub.publish).toHaveBeenCalledWith(PUBSUB_CHANNEL.INFO, {
|
||||
expect(pubsub.publish).toHaveBeenCalledWith(GRAPHQL_PUBSUB_CHANNEL.INFO, {
|
||||
info: {
|
||||
apps: { installed: 1, running: 0 },
|
||||
},
|
||||
|
||||
@@ -2,10 +2,11 @@ import { CACHE_MANAGER } from '@nestjs/cache-manager';
|
||||
import { Inject, Injectable, Logger, OnModuleInit } from '@nestjs/common';
|
||||
import { readFile } from 'fs/promises';
|
||||
|
||||
import { GRAPHQL_PUBSUB_CHANNEL } from '@unraid/shared/pubsub/graphql.pubsub.js';
|
||||
import { type Cache } from 'cache-manager';
|
||||
import Docker from 'dockerode';
|
||||
|
||||
import { pubsub, PUBSUB_CHANNEL } from '@app/core/pubsub.js';
|
||||
import { pubsub } from '@app/core/pubsub.js';
|
||||
import { catchHandlers } from '@app/core/utils/misc/catch-handlers.js';
|
||||
import { sleep } from '@app/core/utils/misc/sleep.js';
|
||||
import { getters } from '@app/store/index.js';
|
||||
@@ -210,7 +211,7 @@ export class DockerService {
|
||||
throw new Error(`Container ${id} not found after starting`);
|
||||
}
|
||||
const appInfo = await this.getAppInfo();
|
||||
await pubsub.publish(PUBSUB_CHANNEL.INFO, appInfo);
|
||||
await pubsub.publish(GRAPHQL_PUBSUB_CHANNEL.INFO, appInfo);
|
||||
return updatedContainer;
|
||||
}
|
||||
|
||||
@@ -240,7 +241,7 @@ export class DockerService {
|
||||
this.logger.warn(`Container ${id} did not reach EXITED state after stop command.`);
|
||||
}
|
||||
const appInfo = await this.getAppInfo();
|
||||
await pubsub.publish(PUBSUB_CHANNEL.INFO, appInfo);
|
||||
await pubsub.publish(GRAPHQL_PUBSUB_CHANNEL.INFO, appInfo);
|
||||
return updatedContainer;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,9 +2,10 @@ import type { TestingModule } from '@nestjs/testing';
|
||||
import { ScheduleModule } from '@nestjs/schedule';
|
||||
import { Test } from '@nestjs/testing';
|
||||
|
||||
import { GRAPHQL_PUBSUB_CHANNEL } from '@unraid/shared/pubsub/graphql.pubsub.js';
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
import { pubsub, PUBSUB_CHANNEL } from '@app/core/pubsub.js';
|
||||
import { pubsub } from '@app/core/pubsub.js';
|
||||
import { CpuTopologyService } from '@app/unraid-api/graph/resolvers/info/cpu/cpu-topology.service.js';
|
||||
import { CpuService } from '@app/unraid-api/graph/resolvers/info/cpu/cpu.service.js';
|
||||
import { MemoryService } from '@app/unraid-api/graph/resolvers/info/memory/memory.service.js';
|
||||
@@ -107,7 +108,7 @@ describe('MetricsResolver Integration Tests', () => {
|
||||
});
|
||||
|
||||
// Trigger polling by simulating subscription
|
||||
trackerService.subscribe(PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
trackerService.subscribe(GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
|
||||
// Wait a bit for potential multiple executions
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
@@ -141,7 +142,7 @@ describe('MetricsResolver Integration Tests', () => {
|
||||
});
|
||||
|
||||
// Trigger polling by simulating subscription
|
||||
trackerService.subscribe(PUBSUB_CHANNEL.MEMORY_UTILIZATION);
|
||||
trackerService.subscribe(GRAPHQL_PUBSUB_CHANNEL.MEMORY_UTILIZATION);
|
||||
|
||||
// Wait a bit for potential multiple executions
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
@@ -155,13 +156,13 @@ describe('MetricsResolver Integration Tests', () => {
|
||||
const trackerService = module.get<SubscriptionTrackerService>(SubscriptionTrackerService);
|
||||
|
||||
// Trigger polling by starting subscription
|
||||
trackerService.subscribe(PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
trackerService.subscribe(GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
|
||||
// Wait for the polling interval to trigger (1000ms for CPU)
|
||||
await new Promise((resolve) => setTimeout(resolve, 1100));
|
||||
|
||||
expect(publishSpy).toHaveBeenCalledWith(
|
||||
PUBSUB_CHANNEL.CPU_UTILIZATION,
|
||||
GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION,
|
||||
expect.objectContaining({
|
||||
systemMetricsCpu: expect.objectContaining({
|
||||
id: 'info/cpu-load',
|
||||
@@ -171,7 +172,7 @@ describe('MetricsResolver Integration Tests', () => {
|
||||
})
|
||||
);
|
||||
|
||||
trackerService.unsubscribe(PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
trackerService.unsubscribe(GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
publishSpy.mockRestore();
|
||||
});
|
||||
|
||||
@@ -180,13 +181,13 @@ describe('MetricsResolver Integration Tests', () => {
|
||||
const trackerService = module.get<SubscriptionTrackerService>(SubscriptionTrackerService);
|
||||
|
||||
// Trigger polling by starting subscription
|
||||
trackerService.subscribe(PUBSUB_CHANNEL.MEMORY_UTILIZATION);
|
||||
trackerService.subscribe(GRAPHQL_PUBSUB_CHANNEL.MEMORY_UTILIZATION);
|
||||
|
||||
// Wait for the polling interval to trigger (2000ms for memory)
|
||||
await new Promise((resolve) => setTimeout(resolve, 2100));
|
||||
|
||||
expect(publishSpy).toHaveBeenCalledWith(
|
||||
PUBSUB_CHANNEL.MEMORY_UTILIZATION,
|
||||
GRAPHQL_PUBSUB_CHANNEL.MEMORY_UTILIZATION,
|
||||
expect.objectContaining({
|
||||
systemMetricsMemory: expect.objectContaining({
|
||||
id: 'memory-utilization',
|
||||
@@ -197,7 +198,7 @@ describe('MetricsResolver Integration Tests', () => {
|
||||
})
|
||||
);
|
||||
|
||||
trackerService.unsubscribe(PUBSUB_CHANNEL.MEMORY_UTILIZATION);
|
||||
trackerService.unsubscribe(GRAPHQL_PUBSUB_CHANNEL.MEMORY_UTILIZATION);
|
||||
publishSpy.mockRestore();
|
||||
});
|
||||
|
||||
@@ -214,7 +215,7 @@ describe('MetricsResolver Integration Tests', () => {
|
||||
vi.spyOn(service, 'generateCpuLoad').mockRejectedValueOnce(new Error('CPU error'));
|
||||
|
||||
// Trigger polling
|
||||
trackerService.subscribe(PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
trackerService.subscribe(GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
|
||||
// Wait for polling interval to trigger and handle error (1000ms for CPU)
|
||||
await new Promise((resolve) => setTimeout(resolve, 1100));
|
||||
@@ -224,7 +225,7 @@ describe('MetricsResolver Integration Tests', () => {
|
||||
expect.any(Error)
|
||||
);
|
||||
|
||||
trackerService.unsubscribe(PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
trackerService.unsubscribe(GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
loggerSpy.mockRestore();
|
||||
});
|
||||
|
||||
@@ -241,7 +242,7 @@ describe('MetricsResolver Integration Tests', () => {
|
||||
vi.spyOn(service, 'generateMemoryLoad').mockRejectedValueOnce(new Error('Memory error'));
|
||||
|
||||
// Trigger polling
|
||||
trackerService.subscribe(PUBSUB_CHANNEL.MEMORY_UTILIZATION);
|
||||
trackerService.subscribe(GRAPHQL_PUBSUB_CHANNEL.MEMORY_UTILIZATION);
|
||||
|
||||
// Wait for polling interval to trigger and handle error (2000ms for memory)
|
||||
await new Promise((resolve) => setTimeout(resolve, 2100));
|
||||
@@ -251,7 +252,7 @@ describe('MetricsResolver Integration Tests', () => {
|
||||
expect.any(Error)
|
||||
);
|
||||
|
||||
trackerService.unsubscribe(PUBSUB_CHANNEL.MEMORY_UTILIZATION);
|
||||
trackerService.unsubscribe(GRAPHQL_PUBSUB_CHANNEL.MEMORY_UTILIZATION);
|
||||
loggerSpy.mockRestore();
|
||||
});
|
||||
});
|
||||
@@ -263,26 +264,30 @@ describe('MetricsResolver Integration Tests', () => {
|
||||
module.get<SubscriptionManagerService>(SubscriptionManagerService);
|
||||
|
||||
// Start polling
|
||||
trackerService.subscribe(PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
trackerService.subscribe(PUBSUB_CHANNEL.MEMORY_UTILIZATION);
|
||||
trackerService.subscribe(GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
trackerService.subscribe(GRAPHQL_PUBSUB_CHANNEL.MEMORY_UTILIZATION);
|
||||
|
||||
// Wait a bit for subscriptions to be fully set up
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
|
||||
// Verify subscriptions are active
|
||||
expect(subscriptionManager.isSubscriptionActive(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(true);
|
||||
expect(subscriptionManager.isSubscriptionActive(PUBSUB_CHANNEL.MEMORY_UTILIZATION)).toBe(
|
||||
true
|
||||
);
|
||||
expect(
|
||||
subscriptionManager.isSubscriptionActive(GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION)
|
||||
).toBe(true);
|
||||
expect(
|
||||
subscriptionManager.isSubscriptionActive(GRAPHQL_PUBSUB_CHANNEL.MEMORY_UTILIZATION)
|
||||
).toBe(true);
|
||||
|
||||
// Clean up the module
|
||||
await module.close();
|
||||
|
||||
// Subscriptions should be cleaned up
|
||||
expect(subscriptionManager.isSubscriptionActive(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(false);
|
||||
expect(subscriptionManager.isSubscriptionActive(PUBSUB_CHANNEL.MEMORY_UTILIZATION)).toBe(
|
||||
false
|
||||
);
|
||||
expect(
|
||||
subscriptionManager.isSubscriptionActive(GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION)
|
||||
).toBe(false);
|
||||
expect(
|
||||
subscriptionManager.isSubscriptionActive(GRAPHQL_PUBSUB_CHANNEL.MEMORY_UTILIZATION)
|
||||
).toBe(false);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -2,9 +2,10 @@ import { Logger, OnModuleInit } from '@nestjs/common';
|
||||
import { Query, ResolveField, Resolver, Subscription } from '@nestjs/graphql';
|
||||
|
||||
import { AuthAction, Resource } from '@unraid/shared/graphql.model.js';
|
||||
import { GRAPHQL_PUBSUB_CHANNEL } from '@unraid/shared/pubsub/graphql.pubsub.js';
|
||||
import { UsePermissions } from '@unraid/shared/use-permissions.directive.js';
|
||||
|
||||
import { pubsub, PUBSUB_CHANNEL } from '@app/core/pubsub.js';
|
||||
import { pubsub } from '@app/core/pubsub.js';
|
||||
import { CpuTopologyService } from '@app/unraid-api/graph/resolvers/info/cpu/cpu-topology.service.js';
|
||||
import { CpuPackages, CpuUtilization } from '@app/unraid-api/graph/resolvers/info/cpu/cpu.model.js';
|
||||
import { CpuService } from '@app/unraid-api/graph/resolvers/info/cpu/cpu.service.js';
|
||||
@@ -28,16 +29,16 @@ export class MetricsResolver implements OnModuleInit {
|
||||
onModuleInit() {
|
||||
// Register CPU polling with 1 second interval
|
||||
this.subscriptionTracker.registerTopic(
|
||||
PUBSUB_CHANNEL.CPU_UTILIZATION,
|
||||
GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION,
|
||||
async () => {
|
||||
const payload = await this.cpuService.generateCpuLoad();
|
||||
pubsub.publish(PUBSUB_CHANNEL.CPU_UTILIZATION, { systemMetricsCpu: payload });
|
||||
pubsub.publish(GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION, { systemMetricsCpu: payload });
|
||||
},
|
||||
1000
|
||||
);
|
||||
|
||||
this.subscriptionTracker.registerTopic(
|
||||
PUBSUB_CHANNEL.CPU_TELEMETRY,
|
||||
GRAPHQL_PUBSUB_CHANNEL.CPU_TELEMETRY,
|
||||
async () => {
|
||||
const packageList = (await this.cpuTopologyService.generateTelemetry()) ?? [];
|
||||
|
||||
@@ -59,7 +60,7 @@ export class MetricsResolver implements OnModuleInit {
|
||||
this.logger.debug(`CPU_TELEMETRY payload: ${JSON.stringify(packages)}`);
|
||||
|
||||
// Publish the payload
|
||||
pubsub.publish(PUBSUB_CHANNEL.CPU_TELEMETRY, {
|
||||
pubsub.publish(GRAPHQL_PUBSUB_CHANNEL.CPU_TELEMETRY, {
|
||||
systemMetricsCpuTelemetry: packages,
|
||||
});
|
||||
|
||||
@@ -70,10 +71,12 @@ export class MetricsResolver implements OnModuleInit {
|
||||
|
||||
// Register memory polling with 2 second interval
|
||||
this.subscriptionTracker.registerTopic(
|
||||
PUBSUB_CHANNEL.MEMORY_UTILIZATION,
|
||||
GRAPHQL_PUBSUB_CHANNEL.MEMORY_UTILIZATION,
|
||||
async () => {
|
||||
const payload = await this.memoryService.generateMemoryLoad();
|
||||
pubsub.publish(PUBSUB_CHANNEL.MEMORY_UTILIZATION, { systemMetricsMemory: payload });
|
||||
pubsub.publish(GRAPHQL_PUBSUB_CHANNEL.MEMORY_UTILIZATION, {
|
||||
systemMetricsMemory: payload,
|
||||
});
|
||||
},
|
||||
2000
|
||||
);
|
||||
@@ -109,7 +112,7 @@ export class MetricsResolver implements OnModuleInit {
|
||||
resource: Resource.INFO,
|
||||
})
|
||||
public async systemMetricsCpuSubscription() {
|
||||
return this.subscriptionHelper.createTrackedSubscription(PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
return this.subscriptionHelper.createTrackedSubscription(GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
}
|
||||
|
||||
@Subscription(() => CpuPackages, {
|
||||
@@ -121,7 +124,7 @@ export class MetricsResolver implements OnModuleInit {
|
||||
resource: Resource.INFO,
|
||||
})
|
||||
public async systemMetricsCpuTelemetrySubscription() {
|
||||
return this.subscriptionHelper.createTrackedSubscription(PUBSUB_CHANNEL.CPU_TELEMETRY);
|
||||
return this.subscriptionHelper.createTrackedSubscription(GRAPHQL_PUBSUB_CHANNEL.CPU_TELEMETRY);
|
||||
}
|
||||
|
||||
@Subscription(() => MemoryUtilization, {
|
||||
@@ -133,6 +136,8 @@ export class MetricsResolver implements OnModuleInit {
|
||||
resource: Resource.INFO,
|
||||
})
|
||||
public async systemMetricsMemorySubscription() {
|
||||
return this.subscriptionHelper.createTrackedSubscription(PUBSUB_CHANNEL.MEMORY_UTILIZATION);
|
||||
return this.subscriptionHelper.createTrackedSubscription(
|
||||
GRAPHQL_PUBSUB_CHANNEL.MEMORY_UTILIZATION
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -46,7 +46,7 @@ vi.mock('@app/core/pubsub.js', () => ({
|
||||
pubsub: {
|
||||
publish: vi.fn(),
|
||||
},
|
||||
PUBSUB_CHANNEL: {
|
||||
GRAPHQL_PUBSUB_CHANNEL: {
|
||||
NOTIFICATION_OVERVIEW: 'notification_overview',
|
||||
NOTIFICATION_ADDED: 'notification_added',
|
||||
},
|
||||
|
||||
@@ -2,10 +2,11 @@ import { Args, Mutation, Query, ResolveField, Resolver, Subscription } from '@ne
|
||||
|
||||
import { AuthAction, Resource } from '@unraid/shared/graphql.model.js';
|
||||
import { PrefixedID } from '@unraid/shared/prefixed-id-scalar.js';
|
||||
import { GRAPHQL_PUBSUB_CHANNEL } from '@unraid/shared/pubsub/graphql.pubsub.js';
|
||||
import { UsePermissions } from '@unraid/shared/use-permissions.directive.js';
|
||||
|
||||
import { AppError } from '@app/core/errors/app-error.js';
|
||||
import { createSubscription, PUBSUB_CHANNEL } from '@app/core/pubsub.js';
|
||||
import { createSubscription } from '@app/core/pubsub.js';
|
||||
import {
|
||||
Notification,
|
||||
NotificationData,
|
||||
@@ -152,7 +153,7 @@ export class NotificationsResolver {
|
||||
resource: Resource.NOTIFICATIONS,
|
||||
})
|
||||
async notificationAdded() {
|
||||
return createSubscription(PUBSUB_CHANNEL.NOTIFICATION_ADDED);
|
||||
return createSubscription(GRAPHQL_PUBSUB_CHANNEL.NOTIFICATION_ADDED);
|
||||
}
|
||||
|
||||
@Subscription(() => NotificationOverview)
|
||||
@@ -161,6 +162,6 @@ export class NotificationsResolver {
|
||||
resource: Resource.NOTIFICATIONS,
|
||||
})
|
||||
async notificationsOverview() {
|
||||
return createSubscription(PUBSUB_CHANNEL.NOTIFICATION_OVERVIEW);
|
||||
return createSubscription(GRAPHQL_PUBSUB_CHANNEL.NOTIFICATION_OVERVIEW);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ import { readdir, readFile, rename, stat, unlink, writeFile } from 'fs/promises'
|
||||
import { basename, join } from 'path';
|
||||
|
||||
import type { Stats } from 'fs';
|
||||
import { GRAPHQL_PUBSUB_CHANNEL } from '@unraid/shared/pubsub/graphql.pubsub.js';
|
||||
import { FSWatcher, watch } from 'chokidar';
|
||||
import { ValidationError } from 'class-validator';
|
||||
import { execa } from 'execa';
|
||||
@@ -12,7 +13,7 @@ import { encode as encodeIni } from 'ini';
|
||||
import { v7 as uuidv7 } from 'uuid';
|
||||
|
||||
import { AppError } from '@app/core/errors/app-error.js';
|
||||
import { pubsub, PUBSUB_CHANNEL } from '@app/core/pubsub.js';
|
||||
import { pubsub } from '@app/core/pubsub.js';
|
||||
import { NotificationIni } from '@app/core/types/states/notification.js';
|
||||
import { fileExists } from '@app/core/utils/files/file-exists.js';
|
||||
import { parseConfig } from '@app/core/utils/misc/parse-config.js';
|
||||
@@ -118,7 +119,7 @@ export class NotificationsService {
|
||||
|
||||
if (type === NotificationType.UNREAD) {
|
||||
this.publishOverview();
|
||||
pubsub.publish(PUBSUB_CHANNEL.NOTIFICATION_ADDED, {
|
||||
pubsub.publish(GRAPHQL_PUBSUB_CHANNEL.NOTIFICATION_ADDED, {
|
||||
notificationAdded: notification,
|
||||
});
|
||||
}
|
||||
@@ -137,7 +138,7 @@ export class NotificationsService {
|
||||
}
|
||||
|
||||
private publishOverview(overview = NotificationsService.overview) {
|
||||
return pubsub.publish(PUBSUB_CHANNEL.NOTIFICATION_OVERVIEW, {
|
||||
return pubsub.publish(GRAPHQL_PUBSUB_CHANNEL.NOTIFICATION_OVERVIEW, {
|
||||
notificationsOverview: overview,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2,9 +2,10 @@ import { ConfigService } from '@nestjs/config';
|
||||
import { Query, Resolver, Subscription } from '@nestjs/graphql';
|
||||
|
||||
import { AuthAction, Resource } from '@unraid/shared/graphql.model.js';
|
||||
import { GRAPHQL_PUBSUB_CHANNEL } from '@unraid/shared/pubsub/graphql.pubsub.js';
|
||||
import { UsePermissions } from '@unraid/shared/use-permissions.directive.js';
|
||||
|
||||
import { createSubscription, PUBSUB_CHANNEL } from '@app/core/pubsub.js';
|
||||
import { createSubscription } from '@app/core/pubsub.js';
|
||||
import { Owner } from '@app/unraid-api/graph/resolvers/owner/owner.model.js';
|
||||
|
||||
// Question: should we move this into the connect plugin, or should this always be available?
|
||||
@@ -39,6 +40,6 @@ export class OwnerResolver {
|
||||
resource: Resource.OWNER,
|
||||
})
|
||||
public ownerSubscription() {
|
||||
return createSubscription(PUBSUB_CHANNEL.OWNER);
|
||||
return createSubscription(GRAPHQL_PUBSUB_CHANNEL.OWNER);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,9 +3,10 @@ import { ConfigService } from '@nestjs/config';
|
||||
import { Query, Resolver, Subscription } from '@nestjs/graphql';
|
||||
|
||||
import { AuthAction, Resource } from '@unraid/shared/graphql.model.js';
|
||||
import { GRAPHQL_PUBSUB_CHANNEL } from '@unraid/shared/pubsub/graphql.pubsub.js';
|
||||
import { UsePermissions } from '@unraid/shared/use-permissions.directive.js';
|
||||
|
||||
import { createSubscription, PUBSUB_CHANNEL } from '@app/core/pubsub.js';
|
||||
import { createSubscription } from '@app/core/pubsub.js';
|
||||
import { getters } from '@app/store/index.js';
|
||||
import { MinigraphStatus } from '@app/unraid-api/graph/resolvers/cloud/cloud.model.js';
|
||||
import {
|
||||
@@ -42,7 +43,7 @@ export class ServerResolver {
|
||||
resource: Resource.SERVERS,
|
||||
})
|
||||
public async serversSubscription() {
|
||||
return createSubscription(PUBSUB_CHANNEL.SERVERS);
|
||||
return createSubscription(GRAPHQL_PUBSUB_CHANNEL.SERVERS);
|
||||
}
|
||||
|
||||
private getLocalServer(): ServerModel {
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
import { Logger } from '@nestjs/common';
|
||||
|
||||
import { GRAPHQL_PUBSUB_CHANNEL } from '@unraid/shared/pubsub/graphql.pubsub.js';
|
||||
import { PubSub } from 'graphql-subscriptions';
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
import { pubsub, PUBSUB_CHANNEL } from '@app/core/pubsub.js';
|
||||
import { pubsub } from '@app/core/pubsub.js';
|
||||
import { SubscriptionHelperService } from '@app/unraid-api/graph/services/subscription-helper.service.js';
|
||||
import { SubscriptionTrackerService } from '@app/unraid-api/graph/services/subscription-tracker.service.js';
|
||||
|
||||
@@ -28,7 +29,9 @@ describe('SubscriptionHelperService', () => {
|
||||
|
||||
describe('createTrackedSubscription', () => {
|
||||
it('should create an async iterator that tracks subscriptions', async () => {
|
||||
const iterator = helperService.createTrackedSubscription(PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
const iterator = helperService.createTrackedSubscription(
|
||||
GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION
|
||||
);
|
||||
|
||||
expect(iterator).toBeDefined();
|
||||
expect(iterator.next).toBeDefined();
|
||||
@@ -37,29 +40,35 @@ describe('SubscriptionHelperService', () => {
|
||||
expect(iterator[Symbol.asyncIterator]).toBeDefined();
|
||||
|
||||
// Should have subscribed
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(1);
|
||||
expect(trackerService.getSubscriberCount(GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(1);
|
||||
});
|
||||
|
||||
it('should return itself when Symbol.asyncIterator is called', () => {
|
||||
const iterator = helperService.createTrackedSubscription(PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
const iterator = helperService.createTrackedSubscription(
|
||||
GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION
|
||||
);
|
||||
|
||||
expect(iterator[Symbol.asyncIterator]()).toBe(iterator);
|
||||
});
|
||||
|
||||
it('should unsubscribe when return() is called', async () => {
|
||||
const iterator = helperService.createTrackedSubscription(PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
const iterator = helperService.createTrackedSubscription(
|
||||
GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION
|
||||
);
|
||||
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(1);
|
||||
expect(trackerService.getSubscriberCount(GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(1);
|
||||
|
||||
await iterator.return?.();
|
||||
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(0);
|
||||
expect(trackerService.getSubscriberCount(GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(0);
|
||||
});
|
||||
|
||||
it('should unsubscribe when throw() is called', async () => {
|
||||
const iterator = helperService.createTrackedSubscription(PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
const iterator = helperService.createTrackedSubscription(
|
||||
GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION
|
||||
);
|
||||
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(1);
|
||||
expect(trackerService.getSubscriberCount(GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(1);
|
||||
|
||||
try {
|
||||
await iterator.throw?.(new Error('Test error'));
|
||||
@@ -67,14 +76,14 @@ describe('SubscriptionHelperService', () => {
|
||||
// Expected to throw
|
||||
}
|
||||
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(0);
|
||||
expect(trackerService.getSubscriberCount(GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe('integration with pubsub', () => {
|
||||
it('should receive published messages', async () => {
|
||||
const iterator = helperService.createTrackedSubscription<{ cpuUtilization: any }>(
|
||||
PUBSUB_CHANNEL.CPU_UTILIZATION
|
||||
GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION
|
||||
);
|
||||
|
||||
const testData = {
|
||||
@@ -92,7 +101,7 @@ describe('SubscriptionHelperService', () => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
|
||||
// Publish a message
|
||||
await (pubsub as PubSub).publish(PUBSUB_CHANNEL.CPU_UTILIZATION, testData);
|
||||
await (pubsub as PubSub).publish(GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION, testData);
|
||||
|
||||
// Wait for the message
|
||||
const result = await consumePromise;
|
||||
@@ -107,21 +116,27 @@ describe('SubscriptionHelperService', () => {
|
||||
// Register handlers to verify start/stop behavior
|
||||
const onStart = vi.fn();
|
||||
const onStop = vi.fn();
|
||||
trackerService.registerTopic(PUBSUB_CHANNEL.CPU_UTILIZATION, onStart, onStop);
|
||||
trackerService.registerTopic(GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION, onStart, onStop);
|
||||
|
||||
// Create first subscriber
|
||||
const iterator1 = helperService.createTrackedSubscription(PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(1);
|
||||
const iterator1 = helperService.createTrackedSubscription(
|
||||
GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION
|
||||
);
|
||||
expect(trackerService.getSubscriberCount(GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(1);
|
||||
expect(onStart).toHaveBeenCalledTimes(1);
|
||||
|
||||
// Create second subscriber
|
||||
const iterator2 = helperService.createTrackedSubscription(PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(2);
|
||||
const iterator2 = helperService.createTrackedSubscription(
|
||||
GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION
|
||||
);
|
||||
expect(trackerService.getSubscriberCount(GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(2);
|
||||
expect(onStart).toHaveBeenCalledTimes(1); // Should not call again
|
||||
|
||||
// Create third subscriber
|
||||
const iterator3 = helperService.createTrackedSubscription(PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(3);
|
||||
const iterator3 = helperService.createTrackedSubscription(
|
||||
GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION
|
||||
);
|
||||
expect(trackerService.getSubscriberCount(GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(3);
|
||||
|
||||
// Set up consumption promises first
|
||||
const consume1 = iterator1.next();
|
||||
@@ -133,7 +148,7 @@ describe('SubscriptionHelperService', () => {
|
||||
|
||||
// Publish a message - all should receive it
|
||||
const testData = { cpuUtilization: { id: 'test', load: 75, cpus: [] } };
|
||||
await (pubsub as PubSub).publish(PUBSUB_CHANNEL.CPU_UTILIZATION, testData);
|
||||
await (pubsub as PubSub).publish(GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION, testData);
|
||||
|
||||
const [result1, result2, result3] = await Promise.all([consume1, consume2, consume3]);
|
||||
|
||||
@@ -143,17 +158,17 @@ describe('SubscriptionHelperService', () => {
|
||||
|
||||
// Clean up first subscriber
|
||||
await iterator1.return?.();
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(2);
|
||||
expect(trackerService.getSubscriberCount(GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(2);
|
||||
expect(onStop).not.toHaveBeenCalled();
|
||||
|
||||
// Clean up second subscriber
|
||||
await iterator2.return?.();
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(1);
|
||||
expect(trackerService.getSubscriberCount(GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(1);
|
||||
expect(onStop).not.toHaveBeenCalled();
|
||||
|
||||
// Clean up last subscriber - should trigger onStop
|
||||
await iterator3.return?.();
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(0);
|
||||
expect(trackerService.getSubscriberCount(GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(0);
|
||||
expect(onStop).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
@@ -161,18 +176,26 @@ describe('SubscriptionHelperService', () => {
|
||||
const iterations = 10;
|
||||
|
||||
for (let i = 0; i < iterations; i++) {
|
||||
const iterator = helperService.createTrackedSubscription(PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(1);
|
||||
const iterator = helperService.createTrackedSubscription(
|
||||
GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION
|
||||
);
|
||||
expect(trackerService.getSubscriberCount(GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(
|
||||
1
|
||||
);
|
||||
|
||||
await iterator.return?.();
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(0);
|
||||
expect(trackerService.getSubscriberCount(GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(
|
||||
0
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
it('should properly clean up on error', async () => {
|
||||
const iterator = helperService.createTrackedSubscription(PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
const iterator = helperService.createTrackedSubscription(
|
||||
GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION
|
||||
);
|
||||
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(1);
|
||||
expect(trackerService.getSubscriberCount(GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(1);
|
||||
|
||||
const testError = new Error('Test error');
|
||||
|
||||
@@ -183,13 +206,15 @@ describe('SubscriptionHelperService', () => {
|
||||
expect(error).toBe(testError);
|
||||
}
|
||||
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(0);
|
||||
expect(trackerService.getSubscriberCount(GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(0);
|
||||
});
|
||||
|
||||
it('should log debug messages for subscription lifecycle', async () => {
|
||||
vi.clearAllMocks();
|
||||
|
||||
const iterator = helperService.createTrackedSubscription(PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
const iterator = helperService.createTrackedSubscription(
|
||||
GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION
|
||||
);
|
||||
|
||||
expect(loggerSpy).toHaveBeenCalledWith(
|
||||
expect.stringContaining('Subscription added for topic')
|
||||
@@ -205,9 +230,9 @@ describe('SubscriptionHelperService', () => {
|
||||
|
||||
describe('different topic types', () => {
|
||||
it('should handle INFO channel subscriptions', async () => {
|
||||
const iterator = helperService.createTrackedSubscription(PUBSUB_CHANNEL.INFO);
|
||||
const iterator = helperService.createTrackedSubscription(GRAPHQL_PUBSUB_CHANNEL.INFO);
|
||||
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.INFO)).toBe(1);
|
||||
expect(trackerService.getSubscriberCount(GRAPHQL_PUBSUB_CHANNEL.INFO)).toBe(1);
|
||||
|
||||
// Set up consumption promise first
|
||||
const consumePromise = iterator.next();
|
||||
@@ -216,47 +241,51 @@ describe('SubscriptionHelperService', () => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
|
||||
const testData = { info: { id: 'test-info' } };
|
||||
await (pubsub as PubSub).publish(PUBSUB_CHANNEL.INFO, testData);
|
||||
await (pubsub as PubSub).publish(GRAPHQL_PUBSUB_CHANNEL.INFO, testData);
|
||||
|
||||
const result = await consumePromise;
|
||||
expect(result.value).toEqual(testData);
|
||||
|
||||
await iterator.return?.();
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.INFO)).toBe(0);
|
||||
expect(trackerService.getSubscriberCount(GRAPHQL_PUBSUB_CHANNEL.INFO)).toBe(0);
|
||||
});
|
||||
|
||||
it('should track multiple different topics independently', async () => {
|
||||
const cpuIterator = helperService.createTrackedSubscription(PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
const infoIterator = helperService.createTrackedSubscription(PUBSUB_CHANNEL.INFO);
|
||||
const cpuIterator = helperService.createTrackedSubscription(
|
||||
GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION
|
||||
);
|
||||
const infoIterator = helperService.createTrackedSubscription(GRAPHQL_PUBSUB_CHANNEL.INFO);
|
||||
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(1);
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.INFO)).toBe(1);
|
||||
expect(trackerService.getSubscriberCount(GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(1);
|
||||
expect(trackerService.getSubscriberCount(GRAPHQL_PUBSUB_CHANNEL.INFO)).toBe(1);
|
||||
|
||||
const allCounts = trackerService.getAllSubscriberCounts();
|
||||
expect(allCounts.get(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(1);
|
||||
expect(allCounts.get(PUBSUB_CHANNEL.INFO)).toBe(1);
|
||||
expect(allCounts.get(GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(1);
|
||||
expect(allCounts.get(GRAPHQL_PUBSUB_CHANNEL.INFO)).toBe(1);
|
||||
|
||||
await cpuIterator.return?.();
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(0);
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.INFO)).toBe(1);
|
||||
expect(trackerService.getSubscriberCount(GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(0);
|
||||
expect(trackerService.getSubscriberCount(GRAPHQL_PUBSUB_CHANNEL.INFO)).toBe(1);
|
||||
|
||||
await infoIterator.return?.();
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.INFO)).toBe(0);
|
||||
expect(trackerService.getSubscriberCount(GRAPHQL_PUBSUB_CHANNEL.INFO)).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe('edge cases', () => {
|
||||
it('should handle return() called multiple times', async () => {
|
||||
const iterator = helperService.createTrackedSubscription(PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
const iterator = helperService.createTrackedSubscription(
|
||||
GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION
|
||||
);
|
||||
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(1);
|
||||
expect(trackerService.getSubscriberCount(GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(1);
|
||||
|
||||
await iterator.return?.();
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(0);
|
||||
expect(trackerService.getSubscriberCount(GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(0);
|
||||
|
||||
// Second return should be idempotent
|
||||
await iterator.return?.();
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(0);
|
||||
expect(trackerService.getSubscriberCount(GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(0);
|
||||
|
||||
// Check that idempotent message was logged
|
||||
expect(loggerSpy).toHaveBeenCalledWith(
|
||||
@@ -265,7 +294,9 @@ describe('SubscriptionHelperService', () => {
|
||||
});
|
||||
|
||||
it('should handle async iterator protocol correctly', async () => {
|
||||
const iterator = helperService.createTrackedSubscription(PUBSUB_CHANNEL.CPU_UTILIZATION);
|
||||
const iterator = helperService.createTrackedSubscription(
|
||||
GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION
|
||||
);
|
||||
|
||||
// Test that it works in for-await loop (would use Symbol.asyncIterator)
|
||||
const receivedMessages: any[] = [];
|
||||
@@ -285,7 +316,7 @@ describe('SubscriptionHelperService', () => {
|
||||
|
||||
// Publish messages
|
||||
for (let i = 0; i < maxMessages; i++) {
|
||||
await (pubsub as PubSub).publish(PUBSUB_CHANNEL.CPU_UTILIZATION, {
|
||||
await (pubsub as PubSub).publish(GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION, {
|
||||
cpuUtilization: { id: `test-${i}`, load: i * 10, cpus: [] },
|
||||
});
|
||||
}
|
||||
@@ -300,7 +331,7 @@ describe('SubscriptionHelperService', () => {
|
||||
|
||||
// Clean up
|
||||
await iterator.return?.();
|
||||
expect(trackerService.getSubscriberCount(PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(0);
|
||||
expect(trackerService.getSubscriberCount(GRAPHQL_PUBSUB_CHANNEL.CPU_UTILIZATION)).toBe(0);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
|
||||
import { createSubscription, PUBSUB_CHANNEL } from '@app/core/pubsub.js';
|
||||
import { GRAPHQL_PUBSUB_CHANNEL } from '@unraid/shared/pubsub/graphql.pubsub.js';
|
||||
|
||||
import { createSubscription } from '@app/core/pubsub.js';
|
||||
import { SubscriptionTrackerService } from '@app/unraid-api/graph/services/subscription-tracker.service.js';
|
||||
|
||||
/**
|
||||
@@ -21,7 +23,7 @@ import { SubscriptionTrackerService } from '@app/unraid-api/graph/services/subsc
|
||||
* \@Subscription(() => MetricsUpdate)
|
||||
* async metricsSubscription() {
|
||||
* // Topic must be registered first via SubscriptionTrackerService
|
||||
* return this.subscriptionHelper.createTrackedSubscription(PUBSUB_CHANNEL.METRICS);
|
||||
* return this.subscriptionHelper.createTrackedSubscription(GRAPHQL_PUBSUB_CHANNEL.METRICS);
|
||||
* }
|
||||
*/
|
||||
@Injectable()
|
||||
@@ -33,7 +35,9 @@ export class SubscriptionHelperService {
|
||||
* @param topic The subscription topic/channel to subscribe to
|
||||
* @returns A proxy async iterator with automatic cleanup
|
||||
*/
|
||||
public createTrackedSubscription<T = any>(topic: PUBSUB_CHANNEL | string): AsyncIterableIterator<T> {
|
||||
public createTrackedSubscription<T = any>(
|
||||
topic: GRAPHQL_PUBSUB_CHANNEL | string
|
||||
): AsyncIterableIterator<T> {
|
||||
const innerIterator = createSubscription<T>(topic);
|
||||
|
||||
// Subscribe when the subscription starts
|
||||
|
||||
Reference in New Issue
Block a user