fix: ensure mothership is locked between disconnect/connect events

This commit is contained in:
Alexis Tyler
2020-11-13 16:46:31 +10:30
parent 2f0f3a75c8
commit 03773578f2
7 changed files with 332 additions and 340 deletions

View File

@@ -3,10 +3,16 @@
* Written by: Alexis Tyler
*/
import chokidar from 'chokidar';
import { EventEmitter } from 'events';
import toMillisecond from 'ms';
import dotProp from 'dot-prop';
import { Cache as MemoryCache } from 'clean-cache';
// @ts-ignore
import { validate as validateArgument } from 'bycontract';
import { validateApiKeyFormat, loadState } from './utils';
import { paths } from './paths';
import { log } from './log';
export interface CacheItem {
/** Machine readable name of the key. */
@@ -14,12 +20,12 @@ export interface CacheItem {
/** Owner's id */
userId: string;
/** The API key. */
key: string | number;
key: string;
/** When the key will expire in human readable form. This will be converted internally to ms. */
expiration: string;
}
export interface AddOptions {
export interface KeyOptions {
/** Owner's id */
userId?: string;
/** When the key will expire in human readable form. This will be converted internally to ms. */
@@ -28,28 +34,86 @@ export interface AddOptions {
interface ApiKey {
name: string;
key: string | number;
key: string;
userId: string;
expiresAt: number;
}
interface Options {
watch: boolean;
}
/**
* Api manager
*/
export class ApiManager {
export class ApiManager extends EventEmitter {
private static instance: ApiManager;
/** Note: Keys expire by default after 365 days. */
private readonly keys = new MemoryCache<CacheItem>(Number(toMillisecond('1y')));
constructor() {
constructor(options: Options = { watch: true }) {
super({
captureRejections: true
});
// Return or create the singleton class
if (ApiManager.instance) {
// This is needed as this is a singleton class
// @eslint-disable-next-line no-constructor-return
return ApiManager.instance;
}
// Create singleton
ApiManager.instance = this;
// Watch for changes to the dynamix.cfg file
// @todo Move API keys to their own file
if (options.watch) {
const basePath = paths.get('dynamix-base')!;
const configPath = paths.get('dynamix-config')!;
chokidar.watch(basePath).on('all', (eventName, filePath) => {
if (filePath === configPath) {
try {
const file = loadState<{ remote: { apikey: string } }>(filePath);
const apiKey = dotProp.get(file, 'remote.apikey') as string;
validateApiKeyFormat(apiKey);
// Add the new key
this.replace('my_servers', apiKey, {
userId: '0'
});
} catch (error) {
// File was deleted
if (error.code === 'ENOENT') {
log.debug('%s was deleted, removing "my_servers" API key.', filePath);
} else {
log.debug('%s, removing "my_servers" API key.', error.message);
}
// Reset key as it's not valid anymore
this.expire('my_servers');
};
}
});
}
}
/**
* Replace a key.
*
* Note: This will bump the expiration by the original length.
*/
replace(name: string, key: string, options: KeyOptions) {
// Delete existing key
// @ts-ignore
this.keys.items[name] = null;
// Add new key
this.add(name, key, options);
// Emit update
this.emit('replace', name, this.getKey(name));
}
/**
@@ -59,20 +123,25 @@ export class ApiManager {
*
* @memberof ApiManager
*/
add(name: string, key: string|number, options: AddOptions): void {
add(name: string, key: string, options: KeyOptions): void {
const { userId, expiration = '1y' } = options;
validateArgument(name, 'string');
validateArgument(key, 'string|number');
validateArgument(key, 'string');
validateArgument(expiration, 'string|number');
const ttl = Number(toMillisecond(expiration));
this.keys.add(name, {
const keyObject = {
name,
key,
userId
}, ttl);
};
// Add new key
this.keys.add(name, keyObject, ttl);
// Emit update
this.emit('add', name, this.getKey(name));
}
/**
@@ -83,9 +152,9 @@ export class ApiManager {
* @returns `true` if the key is valid, otherwise `false`.
* @memberof ApiManager
*/
isValid(nameOrKey: string|number, key?: string|number): boolean {
validateArgument(nameOrKey, 'string|number');
validateArgument(key, 'string|number|undefined');
isValid(nameOrKey: string, key?: string): boolean {
validateArgument(nameOrKey, 'string');
validateArgument(key, 'string|undefined');
if (!key) {
try {
@@ -149,6 +218,7 @@ export class ApiManager {
validateArgument(name, 'string');
this.keys.invalidate(name);
this.emit('expire', name);
}
/**
@@ -178,8 +248,8 @@ export class ApiManager {
* @returns The API key's machine readable name.
* @memberof ApiManager
*/
getNameFromKey(key: string|number): string {
validateArgument(key, 'string|number');
getNameFromKey(key: string): string {
validateArgument(key, 'string');
const keyObject = Object
.entries(this.keys.items)

View File

@@ -23,7 +23,7 @@ export const exitApp = (error?: Error, exitCode?: number) => {
}
// Log last error
log.error(error.message);
log.error(error);
// Kill application
process.exitCode = exitCode;

View File

@@ -1,18 +1,13 @@
import fs from 'fs';
import WebSocket from 'ws';
import * as Sentry from '@sentry/node';
import { utils, paths, states, log } from '../core';
import type { DynamixConfig } from '../core/types';
import { Mutex, MutexInterface } from 'async-mutex';
import { MOTHERSHIP_RELAY_WS_LINK, INTERNAL_WS_LINK, ONE_MINUTE } from '../consts';
import { log, apiManager } from '../core';
import { sleep, getMachineId } from '../core/utils';
import { varState, networkState } from '../core/states';
import { subscribeToServers } from './subscribe-to-servers';
const { loadState, sleep, validateApiKeyFormat } = utils;
const { varState } = states;
// Websocket closed state
// https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/readyState
const CLOSED_READY_STATE = 3;
/**
* Get a number between the lowest and highest value.
* @param low Lowest value.
@@ -33,8 +28,6 @@ const backoff = (attempt: number, maxDelay: number, multiplier: number) => {
return Math.round(Math.min(delay * multiplier, maxDelay));
};
let relay: WebSocket;
interface WebSocketWithHeartBeat extends WebSocket {
pingTimeout?: NodeJS.Timeout
}
@@ -61,205 +54,221 @@ const readFileIfExists = (filePath: string) => {
return Buffer.from('');
};
/**
* Connect to unraid's proxy server
*/
export const connectToMothership = async (wsServer: WebSocket.Server, currentRetryAttempt: number = 0) => {
// Kill any existing connection before we proceed
if (relay) {
await disconnectFromMothership();
}
class MothershipService {
private relayWebsocketLink = MOTHERSHIP_RELAY_WS_LINK;
private internalWsLink = INTERNAL_WS_LINK;
let retryAttempt = currentRetryAttempt;
if (retryAttempt >= 1) {
log.debug(`Reconnecting to mothership, attempt ${retryAttempt}.`);
}
const apiKey = loadState<DynamixConfig>(paths.get('dynamix-config')!).remote.apikey || '';
const keyFile = varState.data?.regFile ? readFileIfExists(varState.data?.regFile).toString('base64') : '';
const serverName = `${varState.data?.name}`;
const lanIp = states.networkState.data.find(network => network.ipaddr[0]).ipaddr[0] || '';
const machineId = `${await utils.getMachineId()}`;
let localGraphqlApi: WebSocket;
let mothershipServersEndpoint: {
private lock?: MutexInterface;
private relay?: WebSocket;
private connectionAttempt = 0;
private localGraphqlApi?: WebSocketWithHeartBeat;
private mothershipServersEndpoint?: {
unsubscribe: () => void;
};
// Ensure API key is in the correct format
try {
validateApiKeyFormat(apiKey);
} catch (error) {
log.debug(error.message);
return;
constructor() {}
public async getLock() {
if (!this.lock) {
this.lock = new Mutex();
}
const release = await this.lock.acquire();
return {
release
};
}
// Connect to mothership's relay endpoint
// Keep reference outside this scope so we can disconnect later
relay = new WebSocket(MOTHERSHIP_RELAY_WS_LINK, ['graphql-ws'], {
headers: {
'x-api-key': apiKey,
'x-flash-guid': varState.data?.flashGuid ?? '',
'x-key-file': keyFile ?? '',
'x-server-name': serverName,
'x-lan-ip': lanIp,
'x-machine-id': machineId
}
});
relay.on('open', async () => {
log.debug(`Connected to mothership's relay via ${MOTHERSHIP_RELAY_WS_LINK}.`);
// Reset retry attempts
retryAttempt = 0;
// Connect to the internal graphql server
localGraphqlApi = new WebSocket(INTERNAL_WS_LINK, ['graphql-ws']);
// Heartbeat
localGraphqlApi.on('ping', () => {
heartbeat.bind(localGraphqlApi)();
});
// Errors
localGraphqlApi.on('error', error => {
Sentry.captureException(error);
log.error('ws:local-relay', 'error', error);
});
// Connection to local graphql endpoint is "closed"
localGraphqlApi.on('close', () => {
log.debug('ws:local-relay', 'close');
});
// Connection to local graphql endpoint is "open"
localGraphqlApi.on('open', () => {
log.debug('ws:local-relay', 'open');
// Authenticate with ourselves
localGraphqlApi.send(JSON.stringify({
type: 'connection_init',
payload: {
'x-api-key': apiKey
}
}));
});
// Relay message back to mothership
localGraphqlApi.on('message', (data) => {
try {
relay.send(data);
} catch (error) {
// Relay socket is closed, close internal one
if (error.message.includes('WebSocket is not open')) {
localGraphqlApi.close();
}
}
});
// Sub to /servers on mothership
mothershipServersEndpoint = subscribeToServers(apiKey);
});
// Relay is closed
relay.on('close', async function (this: WebSocketWithHeartBeat, code, _message) {
public async connect(wsServer: WebSocket.Server, currentRetryAttempt: number = 0): Promise<void> {
const lock = await this.getLock();
try {
log.debug('Connection closed with code %s.', code);
if (this.pingTimeout) {
clearTimeout(this.pingTimeout);
}
// Close connection to local graphql endpoint
localGraphqlApi?.close();
// Clear all listeners before running this again
relay?.removeAllListeners();
// Stop subscriptions with mothership
mothershipServersEndpoint?.unsubscribe();
// Http 4XX error
if (code >= 4400 && code <= 4499) {
// Unauthorized - No API key?
if (code === 4401) {
log.debug('Invalid API key, waiting for new key...');
const apiKey = apiManager.getKey('my_servers')?.key!;
const keyFile = varState.data?.regFile ? readFileIfExists(varState.data?.regFile).toString('base64') : '';
const serverName = `${varState.data?.name}`;
const lanIp = networkState.data.find(network => network.ipaddr[0]).ipaddr[0] || '';
const machineId = `${await getMachineId()}`;
// Connect to mothership's relay endpoint
this.relay = new WebSocket(this.relayWebsocketLink, ['graphql-ws'], {
headers: {
'x-api-key': apiKey,
'x-flash-guid': varState.data?.flashGuid ?? '',
'x-key-file': keyFile ?? '',
'x-server-name': serverName,
'x-lan-ip': lanIp,
'x-machine-id': machineId
}
});
this.relay.on('open', async () => {
log.debug('Connected to mothership\'s relay via %s.', this.relayWebsocketLink);
// Reset connection attempts
this.connectionAttempt = 0;
// Connect to the internal graphql server
this.localGraphqlApi = new WebSocket(this.internalWsLink, ['graphql-ws']);
// Heartbeat
this.localGraphqlApi.on('ping', () => {
if (this.localGraphqlApi) {
heartbeat.bind(this.localGraphqlApi)();
}
});
// Errors
this.localGraphqlApi.on('error', error => {
Sentry.captureException(error);
log.error('ws:local-relay', 'error', error);
});
// Connection to local graphql endpoint is "closed"
this.localGraphqlApi.on('close', () => {
log.debug('ws:local-relay', 'close');
});
// Connection to local graphql endpoint is "open"
this.localGraphqlApi.on('open', () => {
log.debug('ws:local-relay', 'open');
// Authenticate with ourselves
this.localGraphqlApi?.send(JSON.stringify({
type: 'connection_init',
payload: {
'x-api-key': apiKey
}
}));
});
// Relay message back to mothership
this.localGraphqlApi.on('message', (data) => {
try {
this.relay?.send(data);
} catch (error) {
// Relay socket is closed, close internal one
if (error.message.includes('WebSocket is not open')) {
this.localGraphqlApi?.close();
}
}
});
// Sub to /servers on mothership
this.mothershipServersEndpoint = subscribeToServers(apiKey);
});
// Relay is closed
const mothership = this;
this.relay.on('close', async function (this: WebSocketWithHeartBeat, code, _message) {
try {
log.debug('Connection closed with code %s.', code);
if (this.pingTimeout) {
clearTimeout(this.pingTimeout);
}
// Close connection to local graphql endpoint
mothership.localGraphqlApi?.close();
// Clear all listeners before running this again
mothership.relay?.removeAllListeners();
// Stop subscriptions with mothership
mothership.mothershipServersEndpoint?.unsubscribe();
// Http 4XX error
if (code >= 4400 && code <= 4499) {
// Unauthorized - No API key?
if (code === 4401) {
log.debug('Invalid API key, waiting for new key...');
return;
}
}
// We likely closed this
// This is usually because the API key is updated
if (code === 4200) {
// Reconnect
mothership.connect(wsServer);
return;
}
// Wait a few seconds
await sleep(backoff(mothership.connectionAttempt, ONE_MINUTE, 5));
// Reconnect
await mothership.connect(wsServer, currentRetryAttempt + 1);
} catch (error) {
log.error('close error', error);
}
});
this.relay.on('error', (error: NodeJS.ErrnoException) => {
// The relay is down
if (error.message.includes('502')) {
return;
}
// Connection refused, aka couldn't connect
// This is usually because the address is wrong or offline
if (error.code === 'ECONNREFUSED') {
// @ts-expect-error
log.debug(`Couldn't connect to %s:%s`, error.address, error.port);
return;
}
}
// We likely closed this
// This is usually because the API key is updated
if (code === 4200) {
// Reconnect
connectToMothership(wsServer);
return;
}
// Wait a few seconds
await sleep(backoff(retryAttempt, ONE_MINUTE, 5));
if (error.toString().includes('WebSocket was closed before the connection was established')) {
log.debug(error.message);
return;
}
// Reconnect
await connectToMothership(wsServer, retryAttempt + 1);
log.error('socket error', error);
});
this.relay.on('ping', heartbeat);
const sendMessage = (client, message, timeout = 1000) => {
try {
if (client.readyState === 0) {
setTimeout(() => {
sendMessage(client, message, timeout);
log.debug('Message sent to mothership.', message)
}, timeout);
return;
}
client.send(message);
} catch (error) {
log.error('Failed replying to mothership.', error);
};
};
this.relay.on('message', async (data: string) => {
try {
sendMessage(this.localGraphqlApi, data);
} catch (error) {
// Something weird happened while processing the message
// This is likely a malformed message
log.error(error);
}
});
} catch (error) {
log.error('close error', error);
} finally {
lock.release();
}
});
relay.on('error', (error: NodeJS.ErrnoException) => {
// The relay is down
if (error.message.includes('502')) {
return;
}
// Connection refused, aka couldn't connect
// This is usually because the address is wrong or offline
if (error.code === 'ECONNREFUSED') {
// @ts-expect-error
log.debug(`Couldn't connect to ${error.address}:${error.port}`);
return;
}
log.error('socket error', error);
});
relay.on('ping', heartbeat);
const sendMessage = (client, message, timeout = 1000) => {
}
async disconnect() {
const lock = await this.getLock();
try {
if (client.readyState === 0) {
setTimeout(() => {
sendMessage(client, message, timeout);
log.debug('Message sent to mothership.', message)
}, timeout);
return;
if (this.relay && (this.relay?.readyState !== this.relay?.CLOSED)) {
// 4200 === ok
this.relay.close(4200);
}
client.send(message);
} catch (error) {
log.error('Failed replying to mothership.', error);
};
};
relay.on('message', async (data: string) => {
try {
sendMessage(localGraphqlApi, data);
} catch (error) {
// Something weird happened while processing the message
// This is likely a malformed message
log.error(error);
} catch(error) {
} finally {
lock.release();
}
});
}
};
/**
* Disconnect from mothership.
*/
export const disconnectFromMothership = async () => {
if (relay && relay.readyState !== CLOSED_READY_STATE) {
log.debug('Disconnecting from the proxy server.');
try {
// 4200 === ok
relay.close(4200);
} catch {}
}
};
export const mothership = new MothershipService();

View File

@@ -1,80 +0,0 @@
/*!
* Copyright 2019-2020 Lime Technology Inc. All rights reserved.
* Written by: Alexis Tyler
*/
import path from 'path';
import chokidar from 'chokidar';
import waitFor from 'p-wait-for';
import dotProp from 'dot-prop';
import { utils, log, apiManager, paths, pubsub } from './core';
import display from './graphql/resolvers/query/display';
const { validateApiKeyFormat, loadState } = utils;
/**
* One second in milliseconds.
*/
const ONE_SECOND = 1000;
export const init = async () => {
const filePath = paths.get('dynamix-config')!;
const configFilePath = path.join(paths.get('dynamix-base')!, 'case-model.cfg');
const customImageFilePath = path.join(paths.get('dynamix-base')!, 'case-model.png');
const getApiKey = () => dotProp.get(loadState(filePath), 'remote.apikey') as string;
// Wait for api key to be valid
// We have to use await otherwise the module will keep loading without the apikey being added to the api manager
await waitFor(() => getApiKey() !== undefined, {
// Check every 1 second
interval: ONE_SECOND
}).then(() => {
log.debug('Found my_servers apiKey, adding to manager.');
// Add key to manager
apiManager.add('my_servers', getApiKey(), {
userId: '0'
});
});
// Update or remove key when file changes
chokidar.watch(filePath).on('all', () => {
// Invalidate old API key
apiManager.expire('my_servers');
// Get current API key
const apiKey = getApiKey();
// Ensure API key is in the correct format
try {
validateApiKeyFormat(apiKey);
} catch (error) {
return;
}
log.debug('my_servers API key was updated, updating ApiManager.');
log.debug('Using %s for my_servers API key', apiKey.replace(/./g, '*'));
process.nextTick(() => {
// Bail if we have no API key
if (apiKey === undefined) {
return;
}
// Either add or update the key
apiManager.add('my_servers', apiKey, {
userId: '0'
});
});
});
const updatePubsub = async () => {
pubsub.publish('display', {
display: await display()
});
};
// Update pub/sub when config/image file is added/updated/removed
chokidar.watch(configFilePath).on('all', updatePubsub);
chokidar.watch(customImageFilePath).on('all', updatePubsub);
};

View File

@@ -5,23 +5,31 @@
import fs from 'fs';
import net from 'net';
import path from 'path';
import stoppable from 'stoppable';
import chokidar from 'chokidar';
import express from 'express';
import http from 'http';
import waitFor from 'p-wait-for';
import dotProp from 'dot-prop';
import WebSocket from 'ws';
import { ApolloServer } from 'apollo-server-express';
import { log, config, utils, paths } from './core';
import { log, config, utils, paths, pubsub, apiManager } from './core';
import { getEndpoints, globalErrorHandler, exitApp } from './core/utils';
import { graphql } from './graphql';
import { connectToMothership } from './mothership';
import { init as loadMyServers } from './my_servers';
import { mothership } from './mothership';
import display from './graphql/resolvers/query/display';
// @todo: move this
loadMyServers();
const configFilePath = path.join(paths.get('dynamix-base')!, 'case-model.cfg');
const customImageFilePath = path.join(paths.get('dynamix-base')!, 'case-model.png');
const { getEndpoints, globalErrorHandler, exitApp, loadState, validateApiKeyFormat } = utils;
const updatePubsub = async () => {
pubsub.publish('display', {
display: await display()
});
};
// Update pub/sub when config/image file is added/updated/removed
chokidar.watch(configFilePath).on('all', updatePubsub);
chokidar.watch(customImageFilePath).on('all', updatePubsub);
/**
* One second in milliseconds.
@@ -165,45 +173,14 @@ export const server = {
httpServer,
server: stoppableServer,
async start() {
const filePath = paths.get('dynamix-config')!;
const watcher = chokidar.watch(filePath);
const getApiKey = () => {
const apiKey = dotProp.get(loadState(filePath), 'remote.apikey') as string;
try {
validateApiKeyFormat(apiKey);
return apiKey;
} catch {}
// If key is in an invalid format disconnect
apiManager.on('expire', async () => {
await mothership.disconnect();
});
return;
};
const reconnect = async () => {
process.nextTick(() => {
if (getApiKey() !== undefined) {
log.debug('my_servers API key was updated, restarting proxy connection.');
connectToMothership(wsServer);
} else {
log.debug('my_servers API key was updated, invalid key found.');
}
});
};
let timeout: NodeJS.Timeout;
// If we detect an event wait 0.5s before doing anything
const startWatcher = () => {
watcher.on('all', () => {
clearTimeout(timeout);
timeout = setTimeout(reconnect, 500);
});
};
// Once we have a valid key connect to the proxy server
waitFor(() => getApiKey() !== undefined, {
// Check every 1 second
interval: ONE_SECOND
}).then(async () => {
log.debug('Found my_servers apiKey, starting proxy connection.');
await connectToMothership(wsServer);
startWatcher();
// If key looks valid try and connect with it
apiManager.on('replace', async () => {
await mothership.connect(wsServer);
});
// Start http server

15
package-lock.json generated
View File

@@ -2587,6 +2587,21 @@
}
}
},
"async-mutex": {
"version": "0.2.4",
"resolved": "https://registry.npmjs.org/async-mutex/-/async-mutex-0.2.4.tgz",
"integrity": "sha512-fcQKOXUKMQc57JlmjBCHtkKNrfGpHyR7vu18RfuLfeTAf4hK9PgOadPR5cDrBQ682zasrLUhJFe7EKAHJOduDg==",
"requires": {
"tslib": "^2.0.0"
},
"dependencies": {
"tslib": {
"version": "2.0.3",
"resolved": "https://registry.npmjs.org/tslib/-/tslib-2.0.3.tgz",
"integrity": "sha512-uZtkfKblCEQtZKBF6EBXVZeQNl82yqtDQdv+eck8u7tdPxjLu2/lp5/uPW+um2tpuxINHWy3GhiccY7QgEaVHQ=="
}
}
},
"async-retry": {
"version": "1.3.1",
"resolved": "https://registry.npmjs.org/async-retry/-/async-retry-1.3.1.tgz",

View File

@@ -40,6 +40,7 @@
"apollo-server": "2.18.2",
"apollo-server-express": "2.18.2",
"async-exit-hook": "^2.0.1",
"async-mutex": "^0.2.4",
"bycontract": "^2.0.10",
"bytes": "^3.1.0",
"camelcase": "6.1.0",
@@ -164,6 +165,7 @@
"apollo-server",
"apollo-server-express",
"async-exit-hook",
"async-mutex",
"bycontract",
"bytes",
"camelcase",
@@ -195,7 +197,6 @@
"graphql-type-uuid",
"htpasswd-js",
"ini",
"libvirt",
"lodash.get",
"map-obj",
"merge-graphql-schemas",
@@ -234,4 +235,4 @@
"uuid",
"uuid-apikey"
]
}
}