diff --git a/app/core/api-manager.ts b/app/core/api-manager.ts index d5e8a8210..4ef72c75c 100644 --- a/app/core/api-manager.ts +++ b/app/core/api-manager.ts @@ -3,10 +3,16 @@ * Written by: Alexis Tyler */ +import chokidar from 'chokidar'; +import { EventEmitter } from 'events'; import toMillisecond from 'ms'; +import dotProp from 'dot-prop'; import { Cache as MemoryCache } from 'clean-cache'; // @ts-ignore import { validate as validateArgument } from 'bycontract'; +import { validateApiKeyFormat, loadState } from './utils'; +import { paths } from './paths'; +import { log } from './log'; export interface CacheItem { /** Machine readable name of the key. */ @@ -14,12 +20,12 @@ export interface CacheItem { /** Owner's id */ userId: string; /** The API key. */ - key: string | number; + key: string; /** When the key will expire in human readable form. This will be converted internally to ms. */ expiration: string; } -export interface AddOptions { +export interface KeyOptions { /** Owner's id */ userId?: string; /** When the key will expire in human readable form. This will be converted internally to ms. */ @@ -28,28 +34,86 @@ export interface AddOptions { interface ApiKey { name: string; - key: string | number; + key: string; userId: string; expiresAt: number; } +interface Options { + watch: boolean; +} + /** * Api manager */ -export class ApiManager { +export class ApiManager extends EventEmitter { private static instance: ApiManager; /** Note: Keys expire by default after 365 days. */ private readonly keys = new MemoryCache(Number(toMillisecond('1y'))); - constructor() { + constructor(options: Options = { watch: true }) { + super({ + captureRejections: true + }); + + // Return or create the singleton class if (ApiManager.instance) { - // This is needed as this is a singleton class // @eslint-disable-next-line no-constructor-return return ApiManager.instance; } + // Create singleton ApiManager.instance = this; + + // Watch for changes to the dynamix.cfg file + // @todo Move API keys to their own file + if (options.watch) { + const basePath = paths.get('dynamix-base')!; + const configPath = paths.get('dynamix-config')!; + chokidar.watch(basePath).on('all', (eventName, filePath) => { + if (filePath === configPath) { + try { + const file = loadState<{ remote: { apikey: string } }>(filePath); + const apiKey = dotProp.get(file, 'remote.apikey') as string; + + validateApiKeyFormat(apiKey); + + // Add the new key + this.replace('my_servers', apiKey, { + userId: '0' + }); + } catch (error) { + // File was deleted + if (error.code === 'ENOENT') { + log.debug('%s was deleted, removing "my_servers" API key.', filePath); + } else { + log.debug('%s, removing "my_servers" API key.', error.message); + } + + // Reset key as it's not valid anymore + this.expire('my_servers'); + }; + } + }); + } + } + + /** + * Replace a key. + * + * Note: This will bump the expiration by the original length. + */ + replace(name: string, key: string, options: KeyOptions) { + // Delete existing key + // @ts-ignore + this.keys.items[name] = null; + + // Add new key + this.add(name, key, options); + + // Emit update + this.emit('replace', name, this.getKey(name)); } /** @@ -59,20 +123,25 @@ export class ApiManager { * * @memberof ApiManager */ - add(name: string, key: string|number, options: AddOptions): void { + add(name: string, key: string, options: KeyOptions): void { const { userId, expiration = '1y' } = options; validateArgument(name, 'string'); - validateArgument(key, 'string|number'); + validateArgument(key, 'string'); validateArgument(expiration, 'string|number'); const ttl = Number(toMillisecond(expiration)); - - this.keys.add(name, { + const keyObject = { name, key, userId - }, ttl); + }; + + // Add new key + this.keys.add(name, keyObject, ttl); + + // Emit update + this.emit('add', name, this.getKey(name)); } /** @@ -83,9 +152,9 @@ export class ApiManager { * @returns `true` if the key is valid, otherwise `false`. * @memberof ApiManager */ - isValid(nameOrKey: string|number, key?: string|number): boolean { - validateArgument(nameOrKey, 'string|number'); - validateArgument(key, 'string|number|undefined'); + isValid(nameOrKey: string, key?: string): boolean { + validateArgument(nameOrKey, 'string'); + validateArgument(key, 'string|undefined'); if (!key) { try { @@ -149,6 +218,7 @@ export class ApiManager { validateArgument(name, 'string'); this.keys.invalidate(name); + this.emit('expire', name); } /** @@ -178,8 +248,8 @@ export class ApiManager { * @returns The API key's machine readable name. * @memberof ApiManager */ - getNameFromKey(key: string|number): string { - validateArgument(key, 'string|number'); + getNameFromKey(key: string): string { + validateArgument(key, 'string'); const keyObject = Object .entries(this.keys.items) diff --git a/app/core/utils/misc/exit-app.ts b/app/core/utils/misc/exit-app.ts index 6bbee718a..1b5238993 100644 --- a/app/core/utils/misc/exit-app.ts +++ b/app/core/utils/misc/exit-app.ts @@ -23,7 +23,7 @@ export const exitApp = (error?: Error, exitCode?: number) => { } // Log last error - log.error(error.message); + log.error(error); // Kill application process.exitCode = exitCode; diff --git a/app/mothership/index.ts b/app/mothership/index.ts index 09a9b29b7..871920cb7 100644 --- a/app/mothership/index.ts +++ b/app/mothership/index.ts @@ -1,18 +1,13 @@ import fs from 'fs'; import WebSocket from 'ws'; import * as Sentry from '@sentry/node'; -import { utils, paths, states, log } from '../core'; -import type { DynamixConfig } from '../core/types'; +import { Mutex, MutexInterface } from 'async-mutex'; import { MOTHERSHIP_RELAY_WS_LINK, INTERNAL_WS_LINK, ONE_MINUTE } from '../consts'; +import { log, apiManager } from '../core'; +import { sleep, getMachineId } from '../core/utils'; +import { varState, networkState } from '../core/states'; import { subscribeToServers } from './subscribe-to-servers'; -const { loadState, sleep, validateApiKeyFormat } = utils; -const { varState } = states; - -// Websocket closed state -// https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/readyState -const CLOSED_READY_STATE = 3; - /** * Get a number between the lowest and highest value. * @param low Lowest value. @@ -33,8 +28,6 @@ const backoff = (attempt: number, maxDelay: number, multiplier: number) => { return Math.round(Math.min(delay * multiplier, maxDelay)); }; -let relay: WebSocket; - interface WebSocketWithHeartBeat extends WebSocket { pingTimeout?: NodeJS.Timeout } @@ -61,205 +54,221 @@ const readFileIfExists = (filePath: string) => { return Buffer.from(''); }; -/** - * Connect to unraid's proxy server - */ -export const connectToMothership = async (wsServer: WebSocket.Server, currentRetryAttempt: number = 0) => { - // Kill any existing connection before we proceed - if (relay) { - await disconnectFromMothership(); - } +class MothershipService { + private relayWebsocketLink = MOTHERSHIP_RELAY_WS_LINK; + private internalWsLink = INTERNAL_WS_LINK; - let retryAttempt = currentRetryAttempt; - if (retryAttempt >= 1) { - log.debug(`Reconnecting to mothership, attempt ${retryAttempt}.`); - } - - const apiKey = loadState(paths.get('dynamix-config')!).remote.apikey || ''; - const keyFile = varState.data?.regFile ? readFileIfExists(varState.data?.regFile).toString('base64') : ''; - const serverName = `${varState.data?.name}`; - const lanIp = states.networkState.data.find(network => network.ipaddr[0]).ipaddr[0] || ''; - const machineId = `${await utils.getMachineId()}`; - let localGraphqlApi: WebSocket; - let mothershipServersEndpoint: { + private lock?: MutexInterface; + private relay?: WebSocket; + private connectionAttempt = 0; + private localGraphqlApi?: WebSocketWithHeartBeat; + private mothershipServersEndpoint?: { unsubscribe: () => void; }; - // Ensure API key is in the correct format - try { - validateApiKeyFormat(apiKey); - } catch (error) { - log.debug(error.message); - return; + constructor() {} + + public async getLock() { + if (!this.lock) { + this.lock = new Mutex(); + } + + const release = await this.lock.acquire(); + return { + release + }; } - // Connect to mothership's relay endpoint - // Keep reference outside this scope so we can disconnect later - relay = new WebSocket(MOTHERSHIP_RELAY_WS_LINK, ['graphql-ws'], { - headers: { - 'x-api-key': apiKey, - 'x-flash-guid': varState.data?.flashGuid ?? '', - 'x-key-file': keyFile ?? '', - 'x-server-name': serverName, - 'x-lan-ip': lanIp, - 'x-machine-id': machineId - } - }); - - relay.on('open', async () => { - log.debug(`Connected to mothership's relay via ${MOTHERSHIP_RELAY_WS_LINK}.`); - - // Reset retry attempts - retryAttempt = 0; - - // Connect to the internal graphql server - localGraphqlApi = new WebSocket(INTERNAL_WS_LINK, ['graphql-ws']); - - // Heartbeat - localGraphqlApi.on('ping', () => { - heartbeat.bind(localGraphqlApi)(); - }); - - // Errors - localGraphqlApi.on('error', error => { - Sentry.captureException(error); - log.error('ws:local-relay', 'error', error); - }); - - // Connection to local graphql endpoint is "closed" - localGraphqlApi.on('close', () => { - log.debug('ws:local-relay', 'close'); - }); - - // Connection to local graphql endpoint is "open" - localGraphqlApi.on('open', () => { - log.debug('ws:local-relay', 'open'); - - // Authenticate with ourselves - localGraphqlApi.send(JSON.stringify({ - type: 'connection_init', - payload: { - 'x-api-key': apiKey - } - })); - }); - - // Relay message back to mothership - localGraphqlApi.on('message', (data) => { - try { - relay.send(data); - } catch (error) { - // Relay socket is closed, close internal one - if (error.message.includes('WebSocket is not open')) { - localGraphqlApi.close(); - } - } - }); - - // Sub to /servers on mothership - mothershipServersEndpoint = subscribeToServers(apiKey); - }); - - // Relay is closed - relay.on('close', async function (this: WebSocketWithHeartBeat, code, _message) { + public async connect(wsServer: WebSocket.Server, currentRetryAttempt: number = 0): Promise { + const lock = await this.getLock(); try { - log.debug('Connection closed with code %s.', code); - - if (this.pingTimeout) { - clearTimeout(this.pingTimeout); - } - - // Close connection to local graphql endpoint - localGraphqlApi?.close(); - - // Clear all listeners before running this again - relay?.removeAllListeners(); - - // Stop subscriptions with mothership - mothershipServersEndpoint?.unsubscribe(); - - // Http 4XX error - if (code >= 4400 && code <= 4499) { - // Unauthorized - No API key? - if (code === 4401) { - log.debug('Invalid API key, waiting for new key...'); + const apiKey = apiManager.getKey('my_servers')?.key!; + const keyFile = varState.data?.regFile ? readFileIfExists(varState.data?.regFile).toString('base64') : ''; + const serverName = `${varState.data?.name}`; + const lanIp = networkState.data.find(network => network.ipaddr[0]).ipaddr[0] || ''; + const machineId = `${await getMachineId()}`; + + // Connect to mothership's relay endpoint + this.relay = new WebSocket(this.relayWebsocketLink, ['graphql-ws'], { + headers: { + 'x-api-key': apiKey, + 'x-flash-guid': varState.data?.flashGuid ?? '', + 'x-key-file': keyFile ?? '', + 'x-server-name': serverName, + 'x-lan-ip': lanIp, + 'x-machine-id': machineId + } + }); + + this.relay.on('open', async () => { + log.debug('Connected to mothership\'s relay via %s.', this.relayWebsocketLink); + + // Reset connection attempts + this.connectionAttempt = 0; + + // Connect to the internal graphql server + this.localGraphqlApi = new WebSocket(this.internalWsLink, ['graphql-ws']); + + // Heartbeat + this.localGraphqlApi.on('ping', () => { + if (this.localGraphqlApi) { + heartbeat.bind(this.localGraphqlApi)(); + } + }); + + // Errors + this.localGraphqlApi.on('error', error => { + Sentry.captureException(error); + log.error('ws:local-relay', 'error', error); + }); + + // Connection to local graphql endpoint is "closed" + this.localGraphqlApi.on('close', () => { + log.debug('ws:local-relay', 'close'); + }); + + // Connection to local graphql endpoint is "open" + this.localGraphqlApi.on('open', () => { + log.debug('ws:local-relay', 'open'); + + // Authenticate with ourselves + this.localGraphqlApi?.send(JSON.stringify({ + type: 'connection_init', + payload: { + 'x-api-key': apiKey + } + })); + }); + + // Relay message back to mothership + this.localGraphqlApi.on('message', (data) => { + try { + this.relay?.send(data); + } catch (error) { + // Relay socket is closed, close internal one + if (error.message.includes('WebSocket is not open')) { + this.localGraphqlApi?.close(); + } + } + }); + + // Sub to /servers on mothership + this.mothershipServersEndpoint = subscribeToServers(apiKey); + }); + + // Relay is closed + const mothership = this; + this.relay.on('close', async function (this: WebSocketWithHeartBeat, code, _message) { + try { + log.debug('Connection closed with code %s.', code); + + if (this.pingTimeout) { + clearTimeout(this.pingTimeout); + } + + // Close connection to local graphql endpoint + mothership.localGraphqlApi?.close(); + + // Clear all listeners before running this again + mothership.relay?.removeAllListeners(); + + // Stop subscriptions with mothership + mothership.mothershipServersEndpoint?.unsubscribe(); + + // Http 4XX error + if (code >= 4400 && code <= 4499) { + // Unauthorized - No API key? + if (code === 4401) { + log.debug('Invalid API key, waiting for new key...'); + return; + } + } + + // We likely closed this + // This is usually because the API key is updated + if (code === 4200) { + // Reconnect + mothership.connect(wsServer); + return; + } + + // Wait a few seconds + await sleep(backoff(mothership.connectionAttempt, ONE_MINUTE, 5)); + + // Reconnect + await mothership.connect(wsServer, currentRetryAttempt + 1); + } catch (error) { + log.error('close error', error); + } + }); + + this.relay.on('error', (error: NodeJS.ErrnoException) => { + // The relay is down + if (error.message.includes('502')) { + return; + } + + // Connection refused, aka couldn't connect + // This is usually because the address is wrong or offline + if (error.code === 'ECONNREFUSED') { + // @ts-expect-error + log.debug(`Couldn't connect to %s:%s`, error.address, error.port); return; } - } - // We likely closed this - // This is usually because the API key is updated - if (code === 4200) { - // Reconnect - connectToMothership(wsServer); - return; - } - - // Wait a few seconds - await sleep(backoff(retryAttempt, ONE_MINUTE, 5)); + if (error.toString().includes('WebSocket was closed before the connection was established')) { + log.debug(error.message); + return; + } - // Reconnect - await connectToMothership(wsServer, retryAttempt + 1); + log.error('socket error', error); + }); + + this.relay.on('ping', heartbeat); + + const sendMessage = (client, message, timeout = 1000) => { + try { + if (client.readyState === 0) { + setTimeout(() => { + sendMessage(client, message, timeout); + log.debug('Message sent to mothership.', message) + }, timeout); + return; + } + + client.send(message); + } catch (error) { + log.error('Failed replying to mothership.', error); + }; + }; + + this.relay.on('message', async (data: string) => { + try { + sendMessage(this.localGraphqlApi, data); + } catch (error) { + // Something weird happened while processing the message + // This is likely a malformed message + log.error(error); + } + }); } catch (error) { - log.error('close error', error); + } finally { + lock.release(); } - }); - - relay.on('error', (error: NodeJS.ErrnoException) => { - // The relay is down - if (error.message.includes('502')) { - return; - } - - // Connection refused, aka couldn't connect - // This is usually because the address is wrong or offline - if (error.code === 'ECONNREFUSED') { - // @ts-expect-error - log.debug(`Couldn't connect to ${error.address}:${error.port}`); - return; - } - - log.error('socket error', error); - }); - - relay.on('ping', heartbeat); - - const sendMessage = (client, message, timeout = 1000) => { + } + + async disconnect() { + const lock = await this.getLock(); try { - if (client.readyState === 0) { - setTimeout(() => { - sendMessage(client, message, timeout); - log.debug('Message sent to mothership.', message) - }, timeout); - return; + if (this.relay && (this.relay?.readyState !== this.relay?.CLOSED)) { + // 4200 === ok + this.relay.close(4200); } - - client.send(message); - } catch (error) { - log.error('Failed replying to mothership.', error); - }; - }; - - relay.on('message', async (data: string) => { - try { - sendMessage(localGraphqlApi, data); - } catch (error) { - // Something weird happened while processing the message - // This is likely a malformed message - log.error(error); + } catch(error) { + } finally { + lock.release(); } - }); + } }; -/** - * Disconnect from mothership. - */ -export const disconnectFromMothership = async () => { - if (relay && relay.readyState !== CLOSED_READY_STATE) { - log.debug('Disconnecting from the proxy server.'); - try { - // 4200 === ok - relay.close(4200); - } catch {} - } -}; \ No newline at end of file +export const mothership = new MothershipService(); \ No newline at end of file diff --git a/app/my_servers.ts b/app/my_servers.ts deleted file mode 100644 index f92bfc9b1..000000000 --- a/app/my_servers.ts +++ /dev/null @@ -1,80 +0,0 @@ -/*! - * Copyright 2019-2020 Lime Technology Inc. All rights reserved. - * Written by: Alexis Tyler - */ - -import path from 'path'; -import chokidar from 'chokidar'; -import waitFor from 'p-wait-for'; -import dotProp from 'dot-prop'; -import { utils, log, apiManager, paths, pubsub } from './core'; -import display from './graphql/resolvers/query/display'; - -const { validateApiKeyFormat, loadState } = utils; - -/** - * One second in milliseconds. - */ -const ONE_SECOND = 1000; - -export const init = async () => { - const filePath = paths.get('dynamix-config')!; - const configFilePath = path.join(paths.get('dynamix-base')!, 'case-model.cfg'); - const customImageFilePath = path.join(paths.get('dynamix-base')!, 'case-model.png'); - const getApiKey = () => dotProp.get(loadState(filePath), 'remote.apikey') as string; - - // Wait for api key to be valid - // We have to use await otherwise the module will keep loading without the apikey being added to the api manager - await waitFor(() => getApiKey() !== undefined, { - // Check every 1 second - interval: ONE_SECOND - }).then(() => { - log.debug('Found my_servers apiKey, adding to manager.'); - - // Add key to manager - apiManager.add('my_servers', getApiKey(), { - userId: '0' - }); - }); - - // Update or remove key when file changes - chokidar.watch(filePath).on('all', () => { - // Invalidate old API key - apiManager.expire('my_servers'); - - // Get current API key - const apiKey = getApiKey(); - - // Ensure API key is in the correct format - try { - validateApiKeyFormat(apiKey); - } catch (error) { - return; - } - - log.debug('my_servers API key was updated, updating ApiManager.'); - log.debug('Using %s for my_servers API key', apiKey.replace(/./g, '*')); - - process.nextTick(() => { - // Bail if we have no API key - if (apiKey === undefined) { - return; - } - - // Either add or update the key - apiManager.add('my_servers', apiKey, { - userId: '0' - }); - }); - }); - - const updatePubsub = async () => { - pubsub.publish('display', { - display: await display() - }); - }; - - // Update pub/sub when config/image file is added/updated/removed - chokidar.watch(configFilePath).on('all', updatePubsub); - chokidar.watch(customImageFilePath).on('all', updatePubsub); -}; \ No newline at end of file diff --git a/app/server.ts b/app/server.ts index 571c189a5..1621e811d 100644 --- a/app/server.ts +++ b/app/server.ts @@ -5,23 +5,31 @@ import fs from 'fs'; import net from 'net'; +import path from 'path'; import stoppable from 'stoppable'; import chokidar from 'chokidar'; import express from 'express'; import http from 'http'; -import waitFor from 'p-wait-for'; -import dotProp from 'dot-prop'; import WebSocket from 'ws'; import { ApolloServer } from 'apollo-server-express'; -import { log, config, utils, paths } from './core'; +import { log, config, utils, paths, pubsub, apiManager } from './core'; +import { getEndpoints, globalErrorHandler, exitApp } from './core/utils'; import { graphql } from './graphql'; -import { connectToMothership } from './mothership'; -import { init as loadMyServers } from './my_servers'; +import { mothership } from './mothership'; +import display from './graphql/resolvers/query/display'; -// @todo: move this -loadMyServers(); +const configFilePath = path.join(paths.get('dynamix-base')!, 'case-model.cfg'); +const customImageFilePath = path.join(paths.get('dynamix-base')!, 'case-model.png'); -const { getEndpoints, globalErrorHandler, exitApp, loadState, validateApiKeyFormat } = utils; +const updatePubsub = async () => { + pubsub.publish('display', { + display: await display() + }); +}; + +// Update pub/sub when config/image file is added/updated/removed +chokidar.watch(configFilePath).on('all', updatePubsub); +chokidar.watch(customImageFilePath).on('all', updatePubsub); /** * One second in milliseconds. @@ -165,45 +173,14 @@ export const server = { httpServer, server: stoppableServer, async start() { - const filePath = paths.get('dynamix-config')!; - const watcher = chokidar.watch(filePath); - const getApiKey = () => { - const apiKey = dotProp.get(loadState(filePath), 'remote.apikey') as string; - try { - validateApiKeyFormat(apiKey); - return apiKey; - } catch {} + // If key is in an invalid format disconnect + apiManager.on('expire', async () => { + await mothership.disconnect(); + }); - return; - }; - const reconnect = async () => { - process.nextTick(() => { - if (getApiKey() !== undefined) { - log.debug('my_servers API key was updated, restarting proxy connection.'); - connectToMothership(wsServer); - } else { - log.debug('my_servers API key was updated, invalid key found.'); - } - }); - }; - - let timeout: NodeJS.Timeout; - // If we detect an event wait 0.5s before doing anything - const startWatcher = () => { - watcher.on('all', () => { - clearTimeout(timeout); - timeout = setTimeout(reconnect, 500); - }); - }; - - // Once we have a valid key connect to the proxy server - waitFor(() => getApiKey() !== undefined, { - // Check every 1 second - interval: ONE_SECOND - }).then(async () => { - log.debug('Found my_servers apiKey, starting proxy connection.'); - await connectToMothership(wsServer); - startWatcher(); + // If key looks valid try and connect with it + apiManager.on('replace', async () => { + await mothership.connect(wsServer); }); // Start http server diff --git a/package-lock.json b/package-lock.json index 01c07e4c8..63a8a332d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -2587,6 +2587,21 @@ } } }, + "async-mutex": { + "version": "0.2.4", + "resolved": "https://registry.npmjs.org/async-mutex/-/async-mutex-0.2.4.tgz", + "integrity": "sha512-fcQKOXUKMQc57JlmjBCHtkKNrfGpHyR7vu18RfuLfeTAf4hK9PgOadPR5cDrBQ682zasrLUhJFe7EKAHJOduDg==", + "requires": { + "tslib": "^2.0.0" + }, + "dependencies": { + "tslib": { + "version": "2.0.3", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.0.3.tgz", + "integrity": "sha512-uZtkfKblCEQtZKBF6EBXVZeQNl82yqtDQdv+eck8u7tdPxjLu2/lp5/uPW+um2tpuxINHWy3GhiccY7QgEaVHQ==" + } + } + }, "async-retry": { "version": "1.3.1", "resolved": "https://registry.npmjs.org/async-retry/-/async-retry-1.3.1.tgz", diff --git a/package.json b/package.json index 136b9dd88..9e27eef7b 100644 --- a/package.json +++ b/package.json @@ -40,6 +40,7 @@ "apollo-server": "2.18.2", "apollo-server-express": "2.18.2", "async-exit-hook": "^2.0.1", + "async-mutex": "^0.2.4", "bycontract": "^2.0.10", "bytes": "^3.1.0", "camelcase": "6.1.0", @@ -164,6 +165,7 @@ "apollo-server", "apollo-server-express", "async-exit-hook", + "async-mutex", "bycontract", "bytes", "camelcase", @@ -195,7 +197,6 @@ "graphql-type-uuid", "htpasswd-js", "ini", - "libvirt", "lodash.get", "map-obj", "merge-graphql-schemas", @@ -234,4 +235,4 @@ "uuid", "uuid-apikey" ] -} +} \ No newline at end of file