fix: refactor API client to support Unix socket connections (#1575)

This commit is contained in:
Eli Bosley
2025-08-13 16:15:15 -04:00
committed by GitHub
parent b3216874fa
commit a2c5d2495f
31 changed files with 2178 additions and 213 deletions

View File

@@ -62,6 +62,7 @@
"rxjs": "7.8.2",
"type-fest": "4.41.0",
"typescript": "5.9.2",
"undici": "^7.13.0",
"vitest": "3.2.4",
"ws": "8.18.3",
"zen-observable-ts": "1.1.0"
@@ -97,6 +98,7 @@
"lodash-es": "4.17.21",
"nest-authz": "2.17.0",
"rxjs": "7.8.2",
"undici": "^7.13.0",
"ws": "8.18.3",
"zen-observable-ts": "1.1.0"
}

View File

@@ -0,0 +1,275 @@
import { describe, it, expect, beforeEach, vi, afterEach } from 'vitest';
import { InternalClientService } from './internal.client.js';
describe('InternalClientService', () => {
let service: InternalClientService;
let clientFactory: any;
let apiKeyService: any;
const mockApolloClient = {
query: vi.fn(),
mutate: vi.fn(),
stop: vi.fn(),
};
beforeEach(() => {
clientFactory = {
createClient: vi.fn().mockResolvedValue(mockApolloClient),
};
apiKeyService = {
getOrCreateLocalApiKey: vi.fn().mockResolvedValue('test-connect-key'),
};
service = new InternalClientService(
clientFactory as any,
apiKeyService as any
);
});
afterEach(() => {
vi.clearAllMocks();
});
it('should be defined', () => {
expect(service).toBeDefined();
});
describe('getClient', () => {
it('should create a client with Connect API key and subscriptions', async () => {
const client = await service.getClient();
// The API key is now fetched lazily through getApiKey function
expect(clientFactory.createClient).toHaveBeenCalledWith({
getApiKey: expect.any(Function),
enableSubscriptions: true,
});
// Verify the getApiKey function works correctly when called
const callArgs = vi.mocked(clientFactory.createClient).mock.calls[0][0];
const apiKey = await callArgs.getApiKey();
expect(apiKey).toBe('test-connect-key');
expect(apiKeyService.getOrCreateLocalApiKey).toHaveBeenCalled();
expect(client).toBe(mockApolloClient);
});
it('should return cached client on subsequent calls', async () => {
const client1 = await service.getClient();
const client2 = await service.getClient();
expect(client1).toBe(client2);
expect(clientFactory.createClient).toHaveBeenCalledTimes(1);
});
it('should handle concurrent calls correctly', async () => {
// Create a delayed mock to simulate async client creation
let resolveClientCreation: (value: any) => void;
const clientCreationPromise = new Promise((resolve) => {
resolveClientCreation = resolve;
});
vi.mocked(clientFactory.createClient).mockReturnValueOnce(clientCreationPromise);
// Start multiple concurrent calls
const promise1 = service.getClient();
const promise2 = service.getClient();
const promise3 = service.getClient();
// Resolve the client creation
resolveClientCreation!(mockApolloClient);
// Wait for all promises to resolve
const [client1, client2, client3] = await Promise.all([promise1, promise2, promise3]);
// All should return the same client
expect(client1).toBe(mockApolloClient);
expect(client2).toBe(mockApolloClient);
expect(client3).toBe(mockApolloClient);
// createClient should only have been called once
expect(clientFactory.createClient).toHaveBeenCalledTimes(1);
});
it('should handle errors during client creation', async () => {
const error = new Error('Failed to create client');
vi.mocked(clientFactory.createClient).mockRejectedValueOnce(error);
await expect(service.getClient()).rejects.toThrow();
// The in-flight promise should be cleared after error
// A subsequent call should attempt creation again
vi.mocked(clientFactory.createClient).mockResolvedValueOnce(mockApolloClient);
const client = await service.getClient();
expect(client).toBe(mockApolloClient);
expect(clientFactory.createClient).toHaveBeenCalledTimes(2);
});
});
describe('clearClient', () => {
it('should stop and clear the client', async () => {
// First create a client
await service.getClient();
// Clear the client
service.clearClient();
expect(mockApolloClient.stop).toHaveBeenCalled();
});
it('should handle clearing when no client exists', () => {
// Should not throw when clearing a non-existent client
expect(() => service.clearClient()).not.toThrow();
});
it('should create a new client after clearing', async () => {
// Create initial client
await service.getClient();
// Clear it
service.clearClient();
// Reset mock to return a new client
const newMockClient = {
query: vi.fn(),
mutate: vi.fn(),
stop: vi.fn(),
};
vi.mocked(clientFactory.createClient).mockResolvedValueOnce(newMockClient);
// Create new client
const newClient = await service.getClient();
// Should have created client twice total
expect(clientFactory.createClient).toHaveBeenCalledTimes(2);
expect(newClient).toBe(newMockClient);
});
it('should clear in-flight promise when clearing client', async () => {
// Create a delayed mock to simulate async client creation
let resolveClientCreation: (value: any) => void;
const clientCreationPromise = new Promise((resolve) => {
resolveClientCreation = resolve;
});
vi.mocked(clientFactory.createClient).mockReturnValueOnce(clientCreationPromise);
// Start client creation
const promise1 = service.getClient();
// Clear client while creation is in progress
service.clearClient();
// Resolve the original creation
resolveClientCreation!(mockApolloClient);
await promise1;
// Reset mock for new client
const newMockClient = {
query: vi.fn(),
mutate: vi.fn(),
stop: vi.fn(),
};
vi.mocked(clientFactory.createClient).mockResolvedValueOnce(newMockClient);
// Try to get client again - should create a new one
const client = await service.getClient();
expect(client).toBe(newMockClient);
expect(clientFactory.createClient).toHaveBeenCalledTimes(2);
});
it('should handle clearClient during creation followed by new getClient call', async () => {
// Create two delayed mocks to simulate async client creation
let resolveFirstCreation: (value: any) => void;
let resolveSecondCreation: (value: any) => void;
const firstCreationPromise = new Promise((resolve) => {
resolveFirstCreation = resolve;
});
const secondCreationPromise = new Promise((resolve) => {
resolveSecondCreation = resolve;
});
const firstMockClient = {
query: vi.fn(),
mutate: vi.fn(),
stop: vi.fn(),
};
const secondMockClient = {
query: vi.fn(),
mutate: vi.fn(),
stop: vi.fn(),
};
vi.mocked(clientFactory.createClient)
.mockReturnValueOnce(firstCreationPromise)
.mockReturnValueOnce(secondCreationPromise);
// Thread A: Start first client creation
const promiseA = service.getClient();
// Thread B: Clear client while first creation is in progress
service.clearClient();
// Thread C: Start second client creation
const promiseC = service.getClient();
// Resolve first creation (should not set client)
resolveFirstCreation!(firstMockClient);
const clientA = await promiseA;
// Resolve second creation (should set client)
resolveSecondCreation!(secondMockClient);
const clientC = await promiseC;
// Both should return their respective clients
expect(clientA).toBe(firstMockClient);
expect(clientC).toBe(secondMockClient);
// But only the second client should be cached
const cachedClient = await service.getClient();
expect(cachedClient).toBe(secondMockClient);
// Should have created exactly 2 clients
expect(clientFactory.createClient).toHaveBeenCalledTimes(2);
});
it('should handle rapid clear and get cycles correctly', async () => {
// Test rapid clear/get cycles
const clients: any[] = [];
for (let i = 0; i < 3; i++) {
const mockClient = {
query: vi.fn(),
mutate: vi.fn(),
stop: vi.fn(),
};
clients.push(mockClient);
vi.mocked(clientFactory.createClient).mockResolvedValueOnce(mockClient);
}
// Cycle 1: Create and immediately clear
const promise1 = service.getClient();
service.clearClient();
const client1 = await promise1;
expect(client1).toBe(clients[0]);
// Cycle 2: Create and immediately clear
const promise2 = service.getClient();
service.clearClient();
const client2 = await promise2;
expect(client2).toBe(clients[1]);
// Cycle 3: Create and let it complete
const client3 = await service.getClient();
expect(client3).toBe(clients[2]);
// Verify the third client is cached
const cachedClient = await service.getClient();
expect(cachedClient).toBe(clients[2]);
// Should have created exactly 3 clients
expect(clientFactory.createClient).toHaveBeenCalledTimes(3);
});
});
});

View File

@@ -1,135 +1,74 @@
import { Injectable, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { ApolloClient, InMemoryCache, NormalizedCacheObject } from '@apollo/client/core/index.js';
import { split } from '@apollo/client/link/core/index.js';
import { onError } from '@apollo/client/link/error/index.js';
import { HttpLink } from '@apollo/client/link/http/index.js';
import { GraphQLWsLink } from '@apollo/client/link/subscriptions/index.js';
import { getMainDefinition } from '@apollo/client/utilities/index.js';
import { createClient } from 'graphql-ws';
import { Inject, Injectable, Logger } from '@nestjs/common';
import { ApolloClient, NormalizedCacheObject } from '@apollo/client/core/index.js';
import { INTERNAL_CLIENT_SERVICE_TOKEN, type InternalGraphQLClientFactory } from '@unraid/shared';
import { ConnectApiKeyService } from '../authn/connect-api-key.service.js';
/**
* Internal GraphQL "RPC" client.
*
* Unfortunately, there's no simple way to make perform internal gql operations that go through
* all of the validations, filters, authorization, etc. in our setup.
*
* The simplest and most maintainable solution, unfortunately, is to maintain an actual graphql client
* that queries our own graphql server.
*
* This service handles the lifecycle and construction of that client.
* Connect-specific internal GraphQL client.
*
* This uses the shared GraphQL client factory with Connect's API key
* and enables subscriptions for real-time updates.
*/
@Injectable()
export class InternalClientService {
private readonly logger = new Logger(InternalClientService.name);
private client: ApolloClient<NormalizedCacheObject> | null = null;
private clientCreationPromise: Promise<ApolloClient<NormalizedCacheObject>> | null = null;
constructor(
private readonly configService: ConfigService,
@Inject(INTERNAL_CLIENT_SERVICE_TOKEN)
private readonly clientFactory: InternalGraphQLClientFactory,
private readonly apiKeyService: ConnectApiKeyService
) {}
private PROD_NGINX_PORT = 80;
private logger = new Logger(InternalClientService.name);
private client: ApolloClient<NormalizedCacheObject> | null = null;
private getNginxPort() {
return Number(this.configService.get('store.emhttp.nginx.httpPort', this.PROD_NGINX_PORT));
}
/**
* Get the port override from the environment variable PORT. e.g. during development.
* If the port is a socket port, return undefined.
*/
private getNonSocketPortOverride() {
const port = this.configService.get<string | number | undefined>('PORT');
if (!port || port.toString().includes('.sock')) {
return undefined;
}
return Number(port);
}
/**
* Get the API address for the given protocol.
* @param protocol - The protocol to use.
* @param port - The port to use.
* @returns The API address.
*/
private getApiAddress(protocol: 'http' | 'ws', port = this.getNginxPort()) {
const portOverride = this.getNonSocketPortOverride();
if (portOverride) {
return `${protocol}://127.0.0.1:${portOverride}/graphql`;
}
if (port !== this.PROD_NGINX_PORT) {
return `${protocol}://127.0.0.1:${port}/graphql`;
}
return `${protocol}://127.0.0.1/graphql`;
}
private createApiClient({ apiKey }: { apiKey: string }) {
const httpUri = this.getApiAddress('http');
const wsUri = this.getApiAddress('ws');
this.logger.debug('Internal GraphQL URL: %s', httpUri);
const httpLink = new HttpLink({
uri: httpUri,
fetch,
headers: {
Origin: '/var/run/unraid-cli.sock',
'x-api-key': apiKey,
'Content-Type': 'application/json',
},
});
const wsLink = new GraphQLWsLink(
createClient({
url: wsUri,
connectionParams: () => ({ 'x-api-key': apiKey }),
})
);
const splitLink = split(
({ query }) => {
const definition = getMainDefinition(query);
return (
definition.kind === 'OperationDefinition' && definition.operation === 'subscription'
);
},
wsLink,
httpLink
);
const errorLink = onError(({ networkError }) => {
if (networkError) {
this.logger.warn('[GRAPHQL-CLIENT] NETWORK ERROR ENCOUNTERED %o', networkError);
}
});
return new ApolloClient({
defaultOptions: {
query: {
fetchPolicy: 'no-cache',
},
mutate: {
fetchPolicy: 'no-cache',
},
},
cache: new InMemoryCache(),
link: errorLink.concat(splitLink),
});
}
public async getClient() {
public async getClient(): Promise<ApolloClient<NormalizedCacheObject>> {
// If client already exists, return it
if (this.client) {
return this.client;
}
const localApiKey = await this.apiKeyService.getOrCreateLocalApiKey();
this.client = this.createApiClient({ apiKey: localApiKey });
return this.client;
// If client creation is in progress, wait for it
if (this.clientCreationPromise) {
return this.clientCreationPromise;
}
// Start client creation and store the promise
const creationPromise = this.createClient();
this.clientCreationPromise = creationPromise;
try {
// Wait for client creation to complete
const client = await creationPromise;
// Only set the client if this is still the current creation promise
// (if clearClient was called, clientCreationPromise would be null)
if (this.clientCreationPromise === creationPromise) {
this.client = client;
}
return client;
} finally {
// Clear the in-flight promise only if it's still ours
if (this.clientCreationPromise === creationPromise) {
this.clientCreationPromise = null;
}
}
}
private async createClient(): Promise<ApolloClient<NormalizedCacheObject>> {
// Create a client with a function to get Connect's API key dynamically
const client = await this.clientFactory.createClient({
getApiKey: () => this.apiKeyService.getOrCreateLocalApiKey(),
enableSubscriptions: true
});
this.logger.debug('Created Connect internal GraphQL client with subscriptions enabled');
return client;
}
public clearClient() {
// Stop the Apollo client to terminate any active processes
this.client?.stop();
this.client = null;
this.clientCreationPromise = null;
}
}

View File

@@ -11,8 +11,7 @@
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true
},
"forceConsistentCasingInFileNames": true },
"include": ["src/**/*.ts"],
"exclude": ["node_modules", "dist"]
}