mirror of
https://github.com/unraid/api.git
synced 2026-01-09 10:10:45 -06:00
refactor: rework relay ws
This commit is contained in:
@@ -1,505 +0,0 @@
|
||||
import { Mutex, MutexInterface } from 'async-mutex';
|
||||
import WebSocket, { Server as WebsocketServer } from 'ws';
|
||||
|
||||
import { ONE_MINUTE, ONE_SECOND } from '../consts';
|
||||
import { log } from '../core';
|
||||
import { AppError } from '../core/errors';
|
||||
import { isNodeError, sleep } from '../core/utils';
|
||||
import { sockets } from '../sockets';
|
||||
import { backoff } from './utils';
|
||||
|
||||
export interface WebSocketWithHeartBeat extends WebSocket {
|
||||
heartbeat?: NodeJS.Timeout;
|
||||
}
|
||||
|
||||
function heartbeat(this: WebSocketWithHeartBeat) {
|
||||
if (this.heartbeat) {
|
||||
clearTimeout(this.heartbeat);
|
||||
}
|
||||
|
||||
// Use `WebSocket#terminate()`, which immediately destroys the connection,
|
||||
// instead of `WebSocket#close()`, which waits for the close timer.
|
||||
// Delay should be equal to the interval at which your server
|
||||
// sends out pings plus a conservative assumption of the latency.
|
||||
this.heartbeat = setTimeout(() => {
|
||||
this.terminate();
|
||||
}, 30000 + 1000);
|
||||
}
|
||||
|
||||
interface Options {
|
||||
name: string;
|
||||
uri: string;
|
||||
apiKey: string;
|
||||
logger: typeof log;
|
||||
lazy: boolean;
|
||||
wsServer: WebsocketServer;
|
||||
}
|
||||
|
||||
export class CustomSocket {
|
||||
public name: string;
|
||||
public uri: string;
|
||||
public connection?: WebSocketWithHeartBeat;
|
||||
|
||||
protected apiKey: string;
|
||||
protected logger: typeof log;
|
||||
protected connectionAttempts = 0;
|
||||
|
||||
private lock?: MutexInterface;
|
||||
private isOutdated = false;
|
||||
|
||||
constructor(public options: Partial<Options> = {}) {
|
||||
this.name = options.name ?? 'CustomSocket';
|
||||
this.uri = options.uri ?? 'localhost';
|
||||
this.apiKey = options.apiKey ?? '';
|
||||
this.logger = options.logger ?? log;
|
||||
|
||||
// Connect right away
|
||||
if (!options.lazy) {
|
||||
this.connect().catch((error: unknown) => {
|
||||
if (isNodeError(error)) {
|
||||
log.error('Failed connecting with error %s', error.message);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public isConnected() {
|
||||
return this.connection && (this.connection.readyState === this.connection.OPEN);
|
||||
}
|
||||
|
||||
public isConnecting() {
|
||||
return this.connection && (this.connection.readyState === this.connection.CONNECTING);
|
||||
}
|
||||
|
||||
public onConnect() {
|
||||
const logger = this.logger;
|
||||
const connection = this.connection;
|
||||
const apiKey = this.apiKey;
|
||||
// eslint-disable-next-line @typescript-eslint/no-this-alias
|
||||
const customSocket = this;
|
||||
return async function (this: WebSocketWithHeartBeat) {
|
||||
try {
|
||||
if (!apiKey || (typeof apiKey === 'string' && apiKey.length === 0)) {
|
||||
throw new AppError('Missing key', 422);
|
||||
}
|
||||
|
||||
logger.debug('Connected via %s.', connection?.url);
|
||||
|
||||
// Reset connection attempts
|
||||
customSocket.connectionAttempts = 0;
|
||||
} catch (error: unknown) {
|
||||
if (isNodeError(error, AppError)) {
|
||||
this.close(Number(error.code ?? 500), 'INTERNAL_SERVER_ERROR');
|
||||
} else {
|
||||
this.close(500, 'INTERNAL_SERVER_ERROR');
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public onMessage() {
|
||||
const logger = this.logger;
|
||||
return async function (message: string, ...args: any[]) {
|
||||
logger.silly('message="%s" args="%s"', message, ...args);
|
||||
};
|
||||
}
|
||||
|
||||
public async connect(retryAttempt = 0) {
|
||||
if (this.isOutdated) {
|
||||
this.logger.error('This client is currently outdated, please update unraid-api to reconnect!');
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.debug('Connecting to %s', this.uri);
|
||||
const lock = await this.getLock();
|
||||
try {
|
||||
this.logger.debug('Lock aquired for connection to %s', this.uri);
|
||||
|
||||
// Set retry attempt count
|
||||
await this.setRetryAttempt(retryAttempt);
|
||||
|
||||
// Get the current apiKey
|
||||
this.apiKey = await this.getApiKey();
|
||||
|
||||
// Check the connection is allowed
|
||||
await this.isConnectionAllowed();
|
||||
|
||||
// Cleanup old connections
|
||||
// await this.cleanup();
|
||||
|
||||
// Connect to endpoint
|
||||
await this._connect();
|
||||
|
||||
// Log we connected
|
||||
this.logger.debug('Connected to %s', this.uri);
|
||||
} catch (error: unknown) {
|
||||
this.logger.error('Failed connecting reason=%s', (error as Error).message);
|
||||
} finally {
|
||||
lock.release();
|
||||
}
|
||||
}
|
||||
|
||||
public async disconnect(code?: number, message?: string) {
|
||||
this.logger.debug('Disconnecting from %s', this.uri);
|
||||
const lock = await this.getLock();
|
||||
try {
|
||||
this.logger.debug('Lock aquired for disconnection from %s', this.uri);
|
||||
|
||||
// Don't try and disconnect if there's no connection
|
||||
if (!this.connection || (this.connection.readyState === this.connection.CLOSED)) {
|
||||
this.logger.debug('Cannot disconnect from %s as it\'s already disconnected', this.uri);
|
||||
return;
|
||||
}
|
||||
|
||||
// If there's a custom code pass it to the close method
|
||||
if (code) {
|
||||
this.logger.error('Disconnect with code=%s reason=%s', code, message);
|
||||
this.connection.close(code, message);
|
||||
return;
|
||||
}
|
||||
|
||||
// Fallback to a "ok" disconnect
|
||||
// 4200 === ok
|
||||
this.logger.error('Disconnect with code=%s reason=%s', code, 'OK');
|
||||
this.connection.close(4200, 'OK');
|
||||
} catch (error: unknown) {
|
||||
this.logger.error('Failed disconnecting code=%s reason=%s', code, (error as Error).message);
|
||||
} finally {
|
||||
lock.release();
|
||||
}
|
||||
}
|
||||
|
||||
public async reconnect() {
|
||||
this.logger.warn(`Reconnecting to ${this.uri}`);
|
||||
return this.disconnect();
|
||||
}
|
||||
|
||||
public onError() {
|
||||
// Connection attempts
|
||||
let connectionAttempts = this.connectionAttempts;
|
||||
let shouldReconnect = true;
|
||||
|
||||
const logger = this.logger;
|
||||
const connect = this.connect.bind(this);
|
||||
const uri = this.uri;
|
||||
const responses = {
|
||||
// Unauthorized - Invalid/missing API key.
|
||||
401: async () => {
|
||||
this.logger.debug('Invalid API key, waiting for new key...');
|
||||
shouldReconnect = false;
|
||||
},
|
||||
// Outdated
|
||||
426: async () => {
|
||||
// Mark this client as outdated so it doesn't reconnect
|
||||
this.isOutdated = true;
|
||||
},
|
||||
// Rate limited
|
||||
429: async message => {
|
||||
try {
|
||||
let interval: NodeJS.Timeout | undefined;
|
||||
const retryAfter = parseInt(message['Retry-After'], 10) || 30;
|
||||
this.logger.debug('Rate limited, retrying after %ss', retryAfter);
|
||||
|
||||
// Less than 30s
|
||||
if (retryAfter <= 30) {
|
||||
let seconds = retryAfter;
|
||||
|
||||
// Print retry once per second
|
||||
interval = setInterval(() => {
|
||||
seconds--;
|
||||
this.logger.debug('Retrying connection in %ss', seconds);
|
||||
}, ONE_SECOND);
|
||||
}
|
||||
|
||||
if (retryAfter >= 1) {
|
||||
await sleep(ONE_SECOND * retryAfter);
|
||||
}
|
||||
|
||||
if (interval) {
|
||||
clearInterval(interval);
|
||||
}
|
||||
} catch {}
|
||||
},
|
||||
// Server Error
|
||||
500: async () => {
|
||||
// Something went wrong on the connection
|
||||
// Let's wait an extra bit
|
||||
await sleep(ONE_SECOND * 5);
|
||||
}
|
||||
};
|
||||
return async function (this: WebSocketWithHeartBeat, code: number, message: string) {
|
||||
try {
|
||||
// Log disconnection
|
||||
logger.error('HTTP connection closed with code=%s reason="%s"', code, code === 1006 ? 'Terminated' : message);
|
||||
|
||||
// Stop ws heartbeat
|
||||
if (this.heartbeat) {
|
||||
clearTimeout(this.heartbeat);
|
||||
}
|
||||
|
||||
// Known status code
|
||||
if (Object.keys(responses).includes(`${code}`)) {
|
||||
await responses[code]();
|
||||
} else {
|
||||
// Unknown status code
|
||||
await responses[500]();
|
||||
}
|
||||
} catch (error: unknown) {
|
||||
logger.error('HTTP connection closed with code=%s caught_error="%s"', code, (error as Error).message);
|
||||
}
|
||||
|
||||
// We shouldn't reconnect
|
||||
if (!shouldReconnect) {
|
||||
logger.error('Skipping reconnecting to %s as "shouldReconnect" is true', uri);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const sleepMs = backoff(connectionAttempts, ONE_MINUTE, 5);
|
||||
logger.error('Waiting for %s before re-connecting to %s', sleepMs, uri);
|
||||
|
||||
// Wait a few seconds
|
||||
await sleep(sleepMs * 1000);
|
||||
|
||||
// Reconnect
|
||||
logger.error('Establishing connection to %s', uri);
|
||||
await connect(connectionAttempts + 1);
|
||||
} catch (error: unknown) {
|
||||
logger.error('Failed reconnecting to %s reason="%s"', uri, (error as Error).message);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
protected onDisconnect() {
|
||||
// Connection attempts
|
||||
let connectionAttempts = this.connectionAttempts;
|
||||
let shouldReconnect = true;
|
||||
|
||||
const logger = this.logger;
|
||||
const connect = this.connect.bind(this);
|
||||
const uri = this.uri;
|
||||
const responses = {
|
||||
// Mothership dropped, this can happen for various reasons
|
||||
// 1. Mothership's relay restarted
|
||||
// 2. The client's internet restarted
|
||||
// 3. The client's internet is flakey
|
||||
// 4. Who knows?
|
||||
1006: async () => {
|
||||
// We some how lost connection to mothership, this was not expected by the client.
|
||||
// Let's give mothership's relay time to come back up incase it restarted reconnect.
|
||||
this.logger.debug('We lost connection to mothership, reconnecting...');
|
||||
|
||||
// Wait for 30s before allowing reconnection
|
||||
await sleep(ONE_SECOND * 30);
|
||||
|
||||
// Let's reset the reconnect count so we reconnect instantly
|
||||
this.connectionAttempts = 0;
|
||||
connectionAttempts = 0;
|
||||
},
|
||||
// OK
|
||||
4200: async () => {
|
||||
// This is usually because the API key is updated
|
||||
// Let's reset the reconnect count so we reconnect instantly
|
||||
this.connectionAttempts = 0;
|
||||
connectionAttempts = 0;
|
||||
},
|
||||
// Unauthorized - Invalid/missing API key.
|
||||
4401: async () => {
|
||||
this.logger.debug('Invalid API key, waiting for new key...');
|
||||
shouldReconnect = false;
|
||||
},
|
||||
// Request Timeout - Mothership disconnected us.
|
||||
4408: async () => {
|
||||
// Mothership kicked this connection for any number of reasons
|
||||
this.logger.debug('Kicked by mothership, reconnecting...');
|
||||
|
||||
// Wait for 5s before allowing reconnection
|
||||
await sleep(ONE_SECOND * 5);
|
||||
|
||||
// Let's reset the reconnect count so we reconnect instantly
|
||||
this.connectionAttempts = 0;
|
||||
connectionAttempts = 0;
|
||||
},
|
||||
// Outdated
|
||||
4426: async () => {
|
||||
// Mark this client as outdated so it doesn't reconnect
|
||||
this.isOutdated = true;
|
||||
},
|
||||
// Rate limited
|
||||
4429: async message => {
|
||||
try {
|
||||
let interval: NodeJS.Timeout | undefined;
|
||||
const retryAfter = parseInt(message['Retry-After'], 10) || 30;
|
||||
this.logger.debug('Rate limited, retrying after %ss', retryAfter);
|
||||
|
||||
// Less than 30s
|
||||
if (retryAfter <= 30) {
|
||||
let seconds = retryAfter;
|
||||
|
||||
// Print retry once per second
|
||||
interval = setInterval(() => {
|
||||
seconds--;
|
||||
this.logger.debug('Retrying connection in %ss', seconds);
|
||||
}, ONE_SECOND);
|
||||
}
|
||||
|
||||
if (retryAfter >= 1) {
|
||||
await sleep(ONE_SECOND * retryAfter);
|
||||
}
|
||||
|
||||
if (interval) {
|
||||
clearInterval(interval);
|
||||
}
|
||||
} catch {}
|
||||
},
|
||||
// Server Error
|
||||
4500: async () => {
|
||||
// Something went wrong on the connection
|
||||
// Let's wait an extra bit
|
||||
await sleep(ONE_SECOND * 5);
|
||||
}
|
||||
};
|
||||
return async function (this: WebSocketWithHeartBeat, code: number, message: string) {
|
||||
try {
|
||||
// Log disconnection
|
||||
logger.error('Websocket connection closed with code=%s reason="%s"', code, code === 1006 ? 'Terminated' : message);
|
||||
|
||||
// Stop ws heartbeat
|
||||
if (this.heartbeat) {
|
||||
clearTimeout(this.heartbeat);
|
||||
}
|
||||
|
||||
// Known status code
|
||||
if (Object.keys(responses).includes(`${code}`)) {
|
||||
await responses[code](message);
|
||||
} else {
|
||||
// Unknown status code
|
||||
await responses[500]();
|
||||
}
|
||||
} catch (error: unknown) {
|
||||
logger.error('Websocket connection closed with code=%s reason="%s"', code, (error as Error).message);
|
||||
}
|
||||
|
||||
// We shouldn't reconnect
|
||||
if (!shouldReconnect) {
|
||||
logger.error('Skipping reconnecting to %s as "shouldReconnect" is true', uri);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const sleepMs = backoff(connectionAttempts, ONE_MINUTE, 5);
|
||||
logger.error('Waiting for %s before re-connecting to %s', sleepMs, uri);
|
||||
|
||||
// Wait a few seconds
|
||||
await sleep(sleepMs);
|
||||
|
||||
// Reconnect
|
||||
logger.error('Establishing connection to %s', uri);
|
||||
await connect(connectionAttempts + 1);
|
||||
} catch (error: unknown) {
|
||||
logger.error('Failed reconnecting to %s reason="%s"', uri, (error as Error).message);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
protected async cleanup() {
|
||||
// Kill existing socket connection
|
||||
if (this.connection?.heartbeat) {
|
||||
this.connection.close(408, 'REQUEST_TIMEOUT');
|
||||
}
|
||||
}
|
||||
|
||||
protected async getApiKey() {
|
||||
return '';
|
||||
}
|
||||
|
||||
protected async getHeaders() {
|
||||
return {};
|
||||
}
|
||||
|
||||
protected async isConnectionAllowed() {
|
||||
return true;
|
||||
}
|
||||
|
||||
protected async sendMessage(clientName: 'relay' | 'internalGraphql', message?: string, timeout = 1000) {
|
||||
const client = sockets.get(clientName)?.connection;
|
||||
|
||||
try {
|
||||
if (!client || client.readyState === client.CONNECTING) {
|
||||
this.logger.silly('Waiting %ss to retry sending to %s.', timeout / 1000, client?.url);
|
||||
// Wait for $timeout seconds
|
||||
await sleep(timeout);
|
||||
|
||||
// Retry sending
|
||||
return this.sendMessage(clientName, message, timeout);
|
||||
}
|
||||
|
||||
// Only send when socket is open
|
||||
if (client.readyState === client.OPEN) {
|
||||
client.send(message);
|
||||
this.logger.silly('Message sent to %s.', client?.url);
|
||||
return;
|
||||
}
|
||||
|
||||
// Wait 10 seconds if we're closed
|
||||
if (client.readyState === client.CLOSED) {
|
||||
// Connection closed waiting 10s to retry
|
||||
await sleep(10 * ONE_SECOND);
|
||||
|
||||
// Retry sending
|
||||
return this.sendMessage(clientName, message, timeout);
|
||||
}
|
||||
|
||||
// Failed replying as socket isn't open
|
||||
this.logger.error('Failed replying to %s. state=%s message="%s"', client?.url, client.readyState, message);
|
||||
} catch (error: unknown) {
|
||||
this.logger.error('Failed replying to %s with %s.', client?.url, (error as Error).message);
|
||||
}
|
||||
}
|
||||
|
||||
private async getLock() {
|
||||
if (!this.lock) {
|
||||
this.lock = new Mutex();
|
||||
}
|
||||
|
||||
const release = await this.lock.acquire();
|
||||
return {
|
||||
release
|
||||
};
|
||||
}
|
||||
|
||||
private async setRetryAttempt(currentRetryAttempt = 0) {
|
||||
this.connectionAttempts += 1;
|
||||
if (currentRetryAttempt >= 1) {
|
||||
this.logger.debug('Connection attempt %s', currentRetryAttempt);
|
||||
}
|
||||
}
|
||||
|
||||
private async _connect() {
|
||||
this.connection = new WebSocket(this.uri, ['graphql-ws'], {
|
||||
headers: await this.getHeaders()
|
||||
});
|
||||
|
||||
this.connection.on('ping', heartbeat.bind(this.connection));
|
||||
this.connection.on('error', this.onError());
|
||||
this.connection.on('close', this.onDisconnect());
|
||||
this.connection.on('open', this.onConnect());
|
||||
this.connection.on('message', this.onMessage());
|
||||
|
||||
// Unbind handlers and then kill the connection
|
||||
process.once('SIGTERM', () => {
|
||||
this.logger.info('Closing mothership connection...');
|
||||
|
||||
// Unbind handlers
|
||||
this.connection?.removeAllListeners();
|
||||
|
||||
// Kill connection with mothership
|
||||
this.connection?.close();
|
||||
|
||||
this.logger.info('Closed mothership connection!');
|
||||
this.logger.info('Process exiting...');
|
||||
|
||||
process.exit(0);
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -1,2 +1,114 @@
|
||||
export { MothershipSocket } from './sockets/mothership';
|
||||
export { InternalGraphql } from './sockets/internal-graphql';
|
||||
import GracefulWebSocket from 'graceful-ws';
|
||||
import { INTERNAL_WS_LINK, MOTHERSHIP_RELAY_WS_LINK } from '../consts';
|
||||
|
||||
let internal: GracefulWebSocket;
|
||||
let relay: GracefulWebSocket;
|
||||
|
||||
let internalOpen = false;
|
||||
let relayOpen = false;
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-empty-function
|
||||
const noop = () => {};
|
||||
|
||||
const startInternal = (apiKey: string) => {
|
||||
internal = new GracefulWebSocket(INTERNAL_WS_LINK, ['graphql-ws'], {
|
||||
headers: {
|
||||
'x-api-key': apiKey,
|
||||
}
|
||||
});
|
||||
|
||||
internal.on('connected', () => {
|
||||
internalOpen = true;
|
||||
internal.send(JSON.stringify({
|
||||
type: 'connection_init',
|
||||
payload: {
|
||||
'x-api-key': apiKey
|
||||
}
|
||||
}));
|
||||
|
||||
// Internal is ready at this point
|
||||
if (!relay) {
|
||||
startRelay(apiKey);
|
||||
}
|
||||
});
|
||||
|
||||
internal.on('disconnected', () => {
|
||||
internalOpen = false;
|
||||
});
|
||||
|
||||
internal.on('message', e => {
|
||||
// Skip auth acknowledgement
|
||||
if (e.data === '{"type":"connection_ack"}') {
|
||||
return;
|
||||
}
|
||||
|
||||
// Skip keep-alive if relay is down
|
||||
if (e.data === '{"type":"ka"}' && (!relay || relay.readyState === 0)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (relayOpen) {
|
||||
relay.send(e.data);
|
||||
}
|
||||
});
|
||||
|
||||
internal.on('error', error => {
|
||||
console.log('INTERNAL:ERROR', error);
|
||||
});
|
||||
};
|
||||
|
||||
const startRelay = (apiKey: string) => {
|
||||
relay = new GracefulWebSocket(MOTHERSHIP_RELAY_WS_LINK, ['graphql-ws'], {
|
||||
headers: {
|
||||
'x-api-key': apiKey,
|
||||
'x-flash-guid': '0951-1666-3841-AF30A0001E64',
|
||||
'x-key-file': '',
|
||||
'x-server-name': 'mocked.tld',
|
||||
'x-unraid-api-version': '2.21.3'
|
||||
}
|
||||
});
|
||||
|
||||
// Connection-state related events
|
||||
relay.on('connected', () => {
|
||||
relayOpen = true;
|
||||
});
|
||||
relay.on('disconnected', () => {
|
||||
relayOpen = false;
|
||||
// Close internal
|
||||
internal.close();
|
||||
// Start internal
|
||||
internal.start();
|
||||
});
|
||||
relay.on('killed', noop);
|
||||
relay.on('error', noop);
|
||||
|
||||
// Message event
|
||||
relay.on('message', e => {
|
||||
if (internalOpen) {
|
||||
internal.send(e.data);
|
||||
}
|
||||
});
|
||||
|
||||
relay.on('unexpected-response', (code, message) => {
|
||||
switch (code) {
|
||||
case 429:
|
||||
if (message === 'API_KEY_IN_USE') {
|
||||
setTimeout(() => {
|
||||
// Restart relay connection
|
||||
relay.start();
|
||||
}, 10_000);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
default:
|
||||
// Restart relay connection
|
||||
relay.start();
|
||||
|
||||
break;
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
// This starts it all
|
||||
startInternal(API_KEY_HERE);
|
||||
|
||||
@@ -1,97 +0,0 @@
|
||||
import { INTERNAL_WS_LINK } from '../../consts';
|
||||
import { apiManager, relayLogger } from '../../core';
|
||||
import { isNodeError, sleep } from '../../core/utils';
|
||||
import { AppError } from '../../core/errors';
|
||||
import { CustomSocket, WebSocketWithHeartBeat } from '../custom-socket';
|
||||
|
||||
export class InternalGraphql extends CustomSocket {
|
||||
constructor(options: CustomSocket['options'] = {}) {
|
||||
super({
|
||||
name: 'InternalGraphql',
|
||||
uri: INTERNAL_WS_LINK,
|
||||
logger: relayLogger,
|
||||
...options
|
||||
});
|
||||
}
|
||||
|
||||
onMessage() {
|
||||
const logger = this.logger;
|
||||
const sendMessage = this.sendMessage.bind(this);
|
||||
return async function (this: WebSocketWithHeartBeat, data: string) {
|
||||
try {
|
||||
// Internal API accepted our authentication message
|
||||
if (data === '{"type":"connection_ack"}') {
|
||||
logger.debug('Internal graphql accepted authentication');
|
||||
return;
|
||||
}
|
||||
|
||||
logger.silly('Received message from the internal API, forwarding to the relay');
|
||||
// Forward message
|
||||
await sendMessage('relay', data);
|
||||
logger.silly('Message sent to the relay successfully.');
|
||||
} catch (error: unknown) {
|
||||
if (isNodeError(error, AppError)) {
|
||||
// Relay socket is closed, close internal one
|
||||
if (error.message.includes('WebSocket is not open')) {
|
||||
this.close(4200, JSON.stringify({
|
||||
message: error.message
|
||||
}));
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
onError() {
|
||||
return async (error: NodeJS.ErrnoException) => {
|
||||
if (error.message === 'WebSocket was closed before the connection was established') {
|
||||
// Likely the internal relay-ws connection was started but then mothership
|
||||
// decided the key was invalid so it killed it
|
||||
// When this happens the relay-ws sometimes is still in the CONNECTING phase
|
||||
// This isn't an actual error so we skip it
|
||||
return;
|
||||
}
|
||||
|
||||
// Socket was still offline try again?
|
||||
if (error.code && ['ENOENT', 'ECONNREFUSED'].includes(error.code)) {
|
||||
// Wait 1s
|
||||
await sleep(1000);
|
||||
|
||||
// Re-connect to internal graphql server
|
||||
await this.connect();
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.error(error);
|
||||
};
|
||||
}
|
||||
|
||||
onConnect() {
|
||||
const apiKey = this.apiKey;
|
||||
const logger = this.logger;
|
||||
return async function (this: WebSocketWithHeartBeat) {
|
||||
// No API key, close internal connection
|
||||
if (!apiKey) {
|
||||
this.close(4403, 'FORBIDDEN');
|
||||
}
|
||||
|
||||
// Authenticate with ourselves
|
||||
logger.debug('Authenticating with internal graphql');
|
||||
this.send(JSON.stringify({
|
||||
type: 'connection_init',
|
||||
payload: {
|
||||
'x-api-key': apiKey
|
||||
}
|
||||
}));
|
||||
};
|
||||
}
|
||||
|
||||
protected async getApiKey() {
|
||||
const key = apiManager.getKey('my_servers');
|
||||
if (!key) {
|
||||
throw new AppError('No API key found.');
|
||||
}
|
||||
|
||||
return key.key;
|
||||
}
|
||||
}
|
||||
@@ -1,208 +0,0 @@
|
||||
import { MOTHERSHIP_GRAPHQL_LINK, MOTHERSHIP_RELAY_WS_LINK, ONE_MINUTE } from '../../consts';
|
||||
import { relayLogger, apiManager, pubsub } from '../../core';
|
||||
import { isNodeError, sleep } from '../../core/utils';
|
||||
import { varState } from '../../core/states';
|
||||
import { subscribeToServers } from '../subscribe-to-servers';
|
||||
import { AppError } from '../../core/errors';
|
||||
import { readFileIfExists } from '../utils';
|
||||
import { CustomSocket, WebSocketWithHeartBeat } from '../custom-socket';
|
||||
import packageJson from '../../../package.json';
|
||||
import { sockets } from '../../sockets';
|
||||
import { userCache, CachedServers } from '../../cache';
|
||||
import originalFetch from 'node-fetch';
|
||||
import fetchRetry from 'fetch-retry';
|
||||
|
||||
const fetch = fetchRetry(originalFetch as any, {
|
||||
retries: 5,
|
||||
retryOn: [429],
|
||||
retryDelay: function (attempt: number, _error, _response) {
|
||||
// Apply random jitter to the reconnection delay
|
||||
return Math.floor(Math.random() * (2500 * attempt));
|
||||
}
|
||||
}) as unknown as typeof originalFetch;
|
||||
|
||||
export class MothershipSocket extends CustomSocket {
|
||||
private mothershipServersEndpoint?: {
|
||||
unsubscribe: () => void;
|
||||
};
|
||||
|
||||
constructor(options: CustomSocket['options'] = {}) {
|
||||
super({
|
||||
name: 'Mothership',
|
||||
uri: MOTHERSHIP_RELAY_WS_LINK,
|
||||
logger: relayLogger,
|
||||
lazy: false,
|
||||
...options
|
||||
});
|
||||
}
|
||||
|
||||
onConnect() {
|
||||
const connectToMothershipsGraphql = this.connectToMothershipsGraphql.bind(this);
|
||||
const queryMothershipsGraphql = this.queryMothershipsGraphql.bind(this);
|
||||
const onConnect = super.onConnect.bind(this);
|
||||
return async function (this: WebSocketWithHeartBeat) {
|
||||
try {
|
||||
// Run super
|
||||
await onConnect().bind(this)();
|
||||
|
||||
// Query /servers on mothership
|
||||
await queryMothershipsGraphql();
|
||||
|
||||
// Sub to /servers on mothership
|
||||
await connectToMothershipsGraphql();
|
||||
} catch (error: unknown) {
|
||||
if (isNodeError(error, AppError)) {
|
||||
const code = (error.code) ?? 500;
|
||||
this.close(`${code}`.length === 4 ? Number(code) : Number(`4${code}`), 'INTERNAL_SERVER_ERROR');
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
onDisconnect() {
|
||||
const onDisconnect = super.onDisconnect().bind(this as unknown as WebSocketWithHeartBeat);
|
||||
return async (code: number, message: string) => {
|
||||
try {
|
||||
// Close connection to motherships's server's endpoint
|
||||
await this.disconnectFromMothershipsGraphql();
|
||||
|
||||
// Close connection to internal graphql
|
||||
const internalGraphqlClient = sockets.get('internalGraphql');
|
||||
if (internalGraphqlClient?.isConnected) {
|
||||
await internalGraphqlClient?.disconnect();
|
||||
}
|
||||
|
||||
// Process disconnection
|
||||
await onDisconnect(code, message);
|
||||
} catch (error: unknown) {
|
||||
if (isNodeError(error, AppError)) {
|
||||
this.logger.debug('Connection closed with code=%s reason="%s"', code, error.message);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// When we get a message from relay send it through to our local graphql instance
|
||||
onMessage() {
|
||||
const sendMessage = this.sendMessage.bind(this);
|
||||
return async (data: string) => {
|
||||
try {
|
||||
this.logger.silly('Recieved message from mothership\'s relay, forwarding to the internal graphql connection');
|
||||
await sendMessage('internalGraphql', data);
|
||||
this.logger.silly('Message sent to the internal graphql connection successfully.');
|
||||
} catch (error: unknown) {
|
||||
if (isNodeError(error, AppError)) {
|
||||
// Something weird happened while processing the message
|
||||
// This is likely a malformed message
|
||||
this.logger.error('Failed sending message to relay.', error);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
onError() {
|
||||
const onError = super.onError().bind(this as unknown as WebSocketWithHeartBeat);
|
||||
const logger = this.logger;
|
||||
return async function (this: WebSocketWithHeartBeat, error: any) {
|
||||
try {
|
||||
const messageParts = error.message.split('Unexpected server response: ');
|
||||
const isHTTPError = messageParts.length === 2;
|
||||
|
||||
// Is this a HTTP error that was passed back before the ws estabished a connection?
|
||||
// This is ususally fired in the "upgrade" phase before the request is upgraded.
|
||||
if (isHTTPError) {
|
||||
// HTTP status code e.g. 401/429/500
|
||||
const code = parseInt(messageParts[1], 10);
|
||||
|
||||
// Process error
|
||||
await onError(code);
|
||||
return;
|
||||
}
|
||||
|
||||
// Connection refused, aka couldn't connect
|
||||
// This is usually because the address is wrong or offline
|
||||
if (error.code === 'ECONNREFUSED') {
|
||||
logger.debug('Couldn\'t connect to %s:%s', error.address, error.port);
|
||||
return;
|
||||
}
|
||||
|
||||
// Closed before connection started
|
||||
if (error.message.toString().includes('WebSocket was closed before the connection was established')) {
|
||||
logger.debug(error.message);
|
||||
return;
|
||||
}
|
||||
|
||||
throw error;
|
||||
} catch {
|
||||
// Unknown error
|
||||
logger.error('UNKNOWN_SOCKET_ERROR', error);
|
||||
|
||||
// Kick the connection
|
||||
this.close(4408, 'REQUEST_TIMEOUT');
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
protected async getApiKey() {
|
||||
const key = apiManager.getKey('my_servers');
|
||||
if (!key) {
|
||||
throw new AppError('No API key found.');
|
||||
}
|
||||
|
||||
return key.key;
|
||||
}
|
||||
|
||||
protected async getHeaders() {
|
||||
const apiKey = apiManager.getKey('my_servers')?.key!;
|
||||
const regFile = await readFileIfExists(varState.data?.regFile);
|
||||
const keyFile = varState.data?.regFile ? regFile.toString('base64') : '';
|
||||
const serverName = `${varState.data.name}`;
|
||||
|
||||
return {
|
||||
'x-api-key': apiKey,
|
||||
'x-flash-guid': varState.data?.flashGuid ?? '',
|
||||
'x-key-file': keyFile ?? '',
|
||||
'x-server-name': serverName,
|
||||
'x-unraid-api-version': packageJson.version
|
||||
};
|
||||
}
|
||||
|
||||
private async queryMothershipsGraphql() {
|
||||
const response = await fetch(MOTHERSHIP_GRAPHQL_LINK, {
|
||||
method: 'POST',
|
||||
body: JSON.stringify({
|
||||
operationName: 'getServers',
|
||||
variables: {
|
||||
apiKey: this.apiKey
|
||||
},
|
||||
query: 'query getServers($apiKey: String!) {\n servers @auth(apiKey: $apiKey) {\n owner {\n username\n url\n avatar\n }\n guid\n apikey\n name\n status\n wanip\n lanip\n localurl\n remoteurl\n }\n}\n'
|
||||
})
|
||||
});
|
||||
|
||||
// Failed getting servers
|
||||
if (response.status !== 200) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Get servers
|
||||
const data = response.json() as any;
|
||||
|
||||
// Update internal cache
|
||||
userCache.set<CachedServers>('mine', {
|
||||
servers: data.servers
|
||||
});
|
||||
|
||||
// Update subscribers
|
||||
await pubsub.publish('servers', {
|
||||
servers: data.servers
|
||||
});
|
||||
}
|
||||
|
||||
private async connectToMothershipsGraphql() {
|
||||
this.mothershipServersEndpoint = await subscribeToServers(this.apiKey);
|
||||
}
|
||||
|
||||
private async disconnectFromMothershipsGraphql() {
|
||||
this.mothershipServersEndpoint?.unsubscribe();
|
||||
}
|
||||
}
|
||||
@@ -1,29 +0,0 @@
|
||||
import fs from 'fs';
|
||||
|
||||
/**
|
||||
* Get a number between the lowest and highest value.
|
||||
* @param low Lowest value.
|
||||
* @param high Highest value.
|
||||
*/
|
||||
export const getNumberBetween = (low: number, high: number) => Math.floor((Math.random() * (high - low + 1)) + low);
|
||||
|
||||
/**
|
||||
* Create a jitter of +/- 20%.
|
||||
*/
|
||||
export const applyJitter = (value: number) => {
|
||||
const jitter = getNumberBetween(80, 120) / 100;
|
||||
return Math.floor(value * jitter);
|
||||
};
|
||||
|
||||
export const backoff = (attempt: number, maxDelay: number, multiplier: number) => {
|
||||
const delay = applyJitter(((2.0 ** attempt) - 1.0) * 0.5);
|
||||
return Math.round(Math.min(delay * multiplier, maxDelay));
|
||||
};
|
||||
|
||||
export const readFileIfExists = async (filePath: string): Promise<Buffer> => {
|
||||
try {
|
||||
return await fs.promises.readFile(filePath);
|
||||
} catch {}
|
||||
|
||||
return Buffer.from('');
|
||||
};
|
||||
4
package-lock.json
generated
4
package-lock.json
generated
@@ -7355,6 +7355,10 @@
|
||||
"resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.6.tgz",
|
||||
"integrity": "sha512-nTnJ528pbqxYanhpDYsi4Rd8MAeaBA67+RZ10CM1m3bTAVFEDcd5AuA4a6W5YkGZ1iNXHzZz8T6TBKLeBuNriQ=="
|
||||
},
|
||||
"graceful-ws": {
|
||||
"version": "github:OmgImAlexis/graceful-ws#d38cb2879be854e918c54002bc3ae22e3c244c50",
|
||||
"from": "github:OmgImAlexis/graceful-ws"
|
||||
},
|
||||
"graphql": {
|
||||
"version": "15.5.1",
|
||||
"resolved": "https://registry.npmjs.org/graphql/-/graphql-15.5.1.tgz",
|
||||
|
||||
@@ -77,6 +77,7 @@
|
||||
"get-server-address": "^1.0.1",
|
||||
"glob": "^7.1.7",
|
||||
"globby": "^11.0.3",
|
||||
"graceful-ws": "github:OmgImAlexis/graceful-ws",
|
||||
"graphql": "^15.5.0",
|
||||
"graphql-directive": "^0.2.1",
|
||||
"graphql-fields": "^2.0.3",
|
||||
|
||||
Reference in New Issue
Block a user