chore: fix lint issues

This commit is contained in:
Alexis Tyler
2021-01-28 17:15:06 +10:30
parent 4e1b0bd72c
commit 6a8f4ffd33
32 changed files with 342 additions and 289 deletions

View File

@@ -11,9 +11,9 @@ import dotProp from 'dot-prop';
import { Cache as MemoryCache } from 'clean-cache';
import { validate as validateArgument } from 'bycontract';
import { Mutex, MutexInterface } from 'async-mutex';
import { validateApiKeyFormat, loadState, validateApiKey } from './utils';
import { validateApiKeyFormat, loadState, validateApiKey, isNodeError } from './utils';
import { paths } from './paths';
import { coreLogger } from './log';
import { coreLogger, log } from './log';
export interface CacheItem {
/** Machine readable name of the key. */
@@ -54,53 +54,6 @@ export class ApiManager extends EventEmitter {
private readonly keys = new MemoryCache<CacheItem>(Number(toMillisecond('1y')));
private lock?: MutexInterface;
private async getLock() {
if (!this.lock) {
this.lock = new Mutex();
}
const release = await this.lock.acquire();
return {
release
};
}
private async checkKey(filePath: string, force = false) {
const lock = await this.getLock();
try {
const file = loadState<{ remote: { apikey: string } }>(filePath);
const apiKey = dotProp.get(file, 'remote.apikey')! as string;
// Same key as current
if (!force && (apiKey === this.getKey('my_servers')?.key)) {
coreLogger.debug('%s was updated but the API key didn\'t change', filePath);
return;
}
// Ensure key format is valid before validating
validateApiKeyFormat(apiKey);
// Ensure key is valid before connecting
await validateApiKey(apiKey);
// Add the new key
this.replace('my_servers', apiKey, {
userId: '0'
});
} catch (error) {
// File was deleted
if (error.code === 'ENOENT') {
coreLogger.debug('%s was deleted, removing "my_servers" API key.', filePath);
} else {
coreLogger.debug('%s, removing "my_servers" API key.', error.message);
}
// Reset key as it's not valid anymore
this.expire('my_servers');
} finally {
lock.release();
}
}
constructor(options: Options = { watch: true }) {
super({
@@ -109,7 +62,7 @@ export class ApiManager extends EventEmitter {
// Return or create the singleton class
if (ApiManager.instance) {
// @eslint-disable-next-line no-constructor-return
// eslint-disable-next-line no-constructor-return
return ApiManager.instance;
}
@@ -130,7 +83,9 @@ export class ApiManager extends EventEmitter {
}
// Load inital keys in
this.checkKey(configPath, true);
this.checkKey(configPath, true).catch(error => {
log.debug('Failing loading inital keys');
});
}
/**
@@ -268,7 +223,6 @@ export class ApiManager extends EventEmitter {
.filter(([, item]) => this.isValid(item.value.key))
.map(([name, item]) => ({
name,
// @ts-expect-error
key: item.value.key,
userId: item.value.userId,
expiresAt: item.expiresAt
@@ -287,7 +241,6 @@ export class ApiManager extends EventEmitter {
const keyObject = Object
.entries(this.keys.items)
// @ts-expect-error
.find(([_, item]) => item.value.key === key);
if (!keyObject) {
@@ -296,6 +249,56 @@ export class ApiManager extends EventEmitter {
return keyObject[0];
}
private async getLock() {
if (!this.lock) {
this.lock = new Mutex();
}
const release = await this.lock.acquire();
return {
release
};
}
private async checkKey(filePath: string, force = false) {
const lock = await this.getLock();
try {
const file = loadState<{ remote: { apikey: string } }>(filePath);
const apiKey = dotProp.get(file, 'remote.apikey')! as string;
// Same key as current
if (!force && (apiKey === this.getKey('my_servers')?.key)) {
coreLogger.debug('%s was updated but the API key didn\'t change', filePath);
return;
}
// Ensure key format is valid before validating
validateApiKeyFormat(apiKey);
// Ensure key is valid before connecting
await validateApiKey(apiKey);
// Add the new key
this.replace('my_servers', apiKey, {
userId: '0'
});
} catch (error: unknown) {
if (isNodeError(error)) {
// File was deleted
if (error?.code === 'ENOENT') {
coreLogger.debug('%s was deleted, removing "my_servers" API key.', filePath);
} else {
coreLogger.debug('%s, removing "my_servers" API key.', error.message);
}
}
// Reset key as it's not valid anymore
this.expire('my_servers');
} finally {
lock.release();
}
}
}
export const apiManager = new ApiManager();

View File

@@ -243,7 +243,7 @@ export const loadServer = async (name: string, server: typeof Server): Promise<v
coreLogger.debug('Stopping server');
// Stop the server
await server.stop();
server.stop();
});
};

View File

@@ -45,7 +45,7 @@ export const user = {
extends: 'guest',
permissions: [
{ resource: 'apikey', action: 'read:own', attributes: '*' },
{ resource: 'permission', action: 'read:any', attributes: '*' }
{ resource: 'permission', action: 'read:any', attributes: '*' }
]
};

View File

@@ -7,9 +7,9 @@ import { AppError } from '../errors';
* Announce to the local network via mDNS.
*/
export const announce = async (): Promise<void> => {
const name = varState.data?.name;
const localTld = varState.data?.localTld;
const version = varState.data?.version;
const name: string = varState.data?.name;
const localTld: string = varState.data?.localTld;
const version: string = varState.data?.version;
if (!name || !localTld || !version) {
throw new AppError('Missing require fields to announce.');

View File

@@ -4,14 +4,20 @@ import { log, discoveryLogger } from '../log';
/**
* Listen to devices on the local network via mDNS.
*/
export const listen = (): void => {
export const listen = async () => {
stw
.on('up', service => {
if (service.type === 'unraid') {
if (service.txt?.is_setup === 'false') {
const ipv4 = service.addresses.find(address => address.includes('.'));
const ipv6 = service.addresses.find(address => address.includes(':'));
discoveryLogger.info(`Found a new local server [${ipv4 ?? ipv6}], visit your my servers dashboard to claim.`);
const ipAddress = ipv4 ?? ipv6;
// No ip?
if (!ipAddress) {
return;
}
discoveryLogger.info(`Found a new local server [${ipAddress}], visit your my servers dashboard to claim.`);
}
}
// Console.log(`${service.name} is up! (from ${referrer.address}`);
@@ -20,5 +26,5 @@ export const listen = (): void => {
discoveryLogger.debug(`${remoteService.name} is down! (from ${referrer.address})`);
});
stw.listen();
await stw.listen();
};

View File

@@ -9,6 +9,7 @@ import { AppError } from './app-error';
* API key error.
*/
export class ApiKeyError extends AppError {
// eslint-disable-next-line @typescript-eslint/no-useless-constructor
constructor(message: string) {
super(message);
}

View File

@@ -8,10 +8,10 @@
*/
export class AppError extends Error {
/** The HTTP status associated with this error. */
status: number;
public status: number;
/** Should we kill the application when thrown. */
fatal = false;
public fatal = false;
constructor(message: string, status?: number) {
// Calling parent constructor of base Error class.

View File

@@ -3,14 +3,15 @@
* Written by: Alexis Tyler
*/
import { format } from 'util';
import { AppError } from './app-error';
/**
* Invalid param provided to module
*/
export class ParamInvalidError extends AppError {
constructor(parameterName: string, parameter) {
constructor(parameterName: string, parameter: any) {
// Overriding both message and status code.
super(`Param invalid: ${parameterName} = ${parameter}`, 500);
super(format('Param invalid: %s = %s', parameterName, parameter), 500);
}
}

View File

@@ -5,9 +5,8 @@
import { CoreContext, CoreResult } from '../types';
import { ParamInvalidError } from '../errors';
import { pluginManager } from '../plugin-manager';
import { Plugin, pluginManager } from '../plugin-manager';
import { ensurePermission } from '../utils';
import { Plugin } from '../plugin-manager';
interface Context extends CoreContext {
readonly query: {

View File

@@ -179,7 +179,7 @@ export class PluginManager {
try {
coreLogger.debug('Plugin "%s" loading main file.', pluginName);
plugin = require(packageMainPath);
} catch (error) {
} catch (error: unknown) {
coreLogger.error('Plugin "%s" failed to load: %s', pluginName, error);
// Disable plugin as it failed to load it's init file

View File

@@ -64,8 +64,8 @@ const parse = (state: NetworkIni) => {
* Network
*/
class Network extends ArrayState {
public channel = 'network';
private static instance: Network;
public channel = 'network';
constructor() {
super();

View File

@@ -70,8 +70,8 @@ const parse = (state: SlotIni[]) => {
* Slots
*/
class Slots extends ArrayState {
public channel = 'slots';
private static instance: Slots;
public channel = 'slots';
constructor() {
super();

View File

@@ -59,8 +59,8 @@ const parse = (state: SmbSecIni[]) => {
};
class SmbSec extends ArrayState {
public channel = 'smb-sec';
private static instance: SmbSec;
public channel = 'smb-sec';
constructor() {
super();

View File

@@ -36,8 +36,8 @@ const parseUser = (state: UserIni): User => {
const parse = (states: UserIni[]): User[] => Object.values(states).map(parseUser);
class Users extends ArrayState {
public channel = 'users';
private static instance: Users;
public channel = 'users';
constructor() {
super();

View File

@@ -277,13 +277,14 @@ interface ParseOptions {
}
class VarState extends State {
public channel = 'var';
private static instance: VarState;
public channel = 'var';
constructor() {
super();
if (VarState.instance) {
// eslint-ignore-next-line no-constructor-return
return VarState.instance;
}

View File

@@ -25,4 +25,4 @@ const client = new Docker({
/**
* Docker client
*/
export const docker = (pify(client) as Promisify<typeof client>);
export const docker = (pify(client));

View File

@@ -8,13 +8,12 @@ import fetch from 'node-fetch';
import { debugTimer, parseConfig, sleep } from '..';
import * as states from '../../states';
import { coreLogger } from '../../log';
import { varState } from '../../states';
import { AppError } from '../../errors';
const data = {};
const getSubEndpoint = () => {
const httpPort = varState.data?.port;
const httpPort = states.varState.data?.port;
return `http://localhost:${httpPort}/sub`;
};

View File

@@ -5,9 +5,9 @@ import { FileMissingError } from '../../errors';
const cache = new CacheManager('unraid:utils:misc/get-machine-id');
export const getMachineId = async () => {
export const getMachineId = async (): Promise<string> => {
const path = paths.get('machine-id');
let machineId = cache.get('machine-id');
let machineId: string = cache.get('machine-id');
if (!path) {
const error = new FileMissingError('/etc/machine-id');

View File

@@ -15,13 +15,13 @@ import { exitApp } from '..';
export const globalErrorHandler = (error: Error) => {
try {
exitApp(error, 1);
} catch (error_) {
} catch (error: unknown) {
// We should only end up here if `Errors` or `Core.log` have an issue loading.
// Log last error
console.error(error_);
console.error(error);
// Kill application
process.exit(1); // eslint-disable-line unicorn/no-process-exit
process.exit(1);
}
};

View File

@@ -101,7 +101,6 @@ export const parseConfig = <T>(options: Options): T => {
let data: Record<string, any>;
if (filePath) {
data = multiIniRead(filePath, {
// eslint-disable-next-line camelcase
keep_quotes: false
});
} else {
@@ -117,7 +116,7 @@ export const parseConfig = <T>(options: Options): T => {
// Remove quotes around keys
const dataWithoutQuoteKeys = mapObject(data, (key, value) => {
// @SEE: https://stackoverflow.com/a/19156197/2311366
return [(key as string).replace(/^"(.+(?="$))"$/, '$1'), value];
return [(key).replace(/^"(.+(?="$))"$/, '$1'), value];
});
// Result object with array items as actual arrays

View File

@@ -2,3 +2,4 @@
export * from './context';
export * from './has-fields';
export * from './is-node-error';

View File

@@ -0,0 +1,6 @@
/**
* A typeguarded version of `instanceof Error` for NodeJS.
*/
export function isNodeError<T extends new (...args: any[]) => Error>(value: unknown, errorType?: T): value is InstanceType<T> & NodeJS.ErrnoException {
return value instanceof (errorType ? errorType : Error);
}

View File

@@ -37,7 +37,7 @@ export const states = () => {
// Reload state
try {
state.reset();
} catch (error) {
} catch (error: unknown) {
coreLogger.error('failed resetting state', error);
}
};

View File

@@ -19,6 +19,7 @@ import { typeDefs } from './schema';
import * as resolvers from './resolvers';
import { wsHasConnected, wsHasDisconnected } from '../ws';
import { MOTHERSHIP_RELAY_WS_LINK } from '../consts';
import { isNodeError } from '../core/utils';
const baseTypes = [gql`
scalar JSON
@@ -198,10 +199,12 @@ class FuncDirective extends SchemaDirectiveVisitor {
} else {
func = getCoreModule(moduleName);
}
} catch (error) {
// Rethrow clean error message about module being missing
if (error.code === 'MODULE_NOT_FOUND') {
throw new AppError(`Cannot find ${pluginName ? 'Plugin: "' + pluginName + '" ' : ''}Module: "${pluginName ? pluginModuleName : moduleName}"`);
} catch (error: unknown) {
if (isNodeError(error, AppError)) {
// Rethrow clean error message about module being missing
if (error.code === 'MODULE_NOT_FOUND') {
throw new AppError(`Cannot find ${pluginName ? 'Plugin: "' + pluginName + '" ' : ''}Module: "${pluginName ? pluginModuleName : moduleName}"`);
}
}
// In production let's just throw an internal error
@@ -389,7 +392,7 @@ export const graphql = {
user,
websocketId
}); return;
} catch (error) {
} catch (error: unknown) {
reject(error);
}
}),

View File

@@ -125,7 +125,7 @@ export default async () => {
url: serverCase
}
};
} catch (error) {
} catch (error: unknown) {
return {
case: states.couldNotReadImage
};

View File

@@ -3,7 +3,7 @@ import { Mutex, MutexInterface } from 'async-mutex';
import { ONE_SECOND, ONE_MINUTE } from '../consts';
import { log } from '../core';
import { AppError } from '../core/errors';
import { sleep } from '../core/utils';
import { isNodeError, sleep } from '../core/utils';
import { backoff } from './utils';
export interface WebSocketWithHeartBeat extends WebSocket {
@@ -52,7 +52,11 @@ export class CustomSocket {
// Connect right away
if (!options.lazy) {
this.connect();
this.connect().catch((error: unknown) => {
if (isNodeError(error)) {
log.error('Failed connecting with error %s', error.message);
}
});
}
}
@@ -83,19 +87,66 @@ export class CustomSocket {
// Reset connection attempts
customSocket.connectionAttempts = 0;
} catch (error) {
this.close(error.code.length === 4 ? error.code : `4${error.code}`, JSON.stringify({
message: error.message ?? 'Internal Server Error'
}));
} catch (error: unknown) {
if (isNodeError(error, AppError)) {
this.close(Number(error.code?.length === 4 ? error.code : `4${error.code}`), JSON.stringify({
message: error.message ?? 'Internal Server Error'
}));
}
}
};
}
protected onDisconnect() {
const responses = {
// OK
4200: async () => {
// This is usually because the API key is updated
// Let's reset the reconnect count so we reconnect instantly
customSocket.connectionAttempts = 0;
},
// Unauthorized - Invalid/missing API key.
4401: async () => {
customSocket.logger.debug('Invalid API key, waiting for new key...');
},
// Rate limited
4429: async message => {
try {
let interval: NodeJS.Timeout | undefined;
const retryAfter = parseInt(message['Retry-After'], 10) || 30;
customSocket.logger.debug('Rate limited, retrying after %ss', retryAfter);
// Less than 30s
if (retryAfter <= 30) {
let seconds = retryAfter;
// Print retry once per second
interval = setInterval(() => {
seconds--;
customSocket.logger.debug('Retrying connection in %ss', seconds);
}, ONE_SECOND);
}
if (retryAfter >= 1) {
await sleep(ONE_SECOND * retryAfter);
}
if (interval) {
clearInterval(interval);
}
} catch {}
},
// Server Error
4500: async () => {
// Something went wrong on the connection
// Let's wait an extra bit
await sleep(ONE_SECOND * 5);
}
};
const customSocket = this;
return async function (this: WebSocketWithHeartBeat, code: number, _message: string) {
try {
const message = _message.trim() === '' ? { message: '' } : JSON.parse(_message);
const message: { message?: string } = _message.trim() === '' ? { message: '' } : JSON.parse(_message);
customSocket.logger.debug('Connection closed with code=%s reason="%s"', code, code === 1006 ? 'Terminated' : message.message);
// Stop ws heartbeat
@@ -103,58 +154,17 @@ export class CustomSocket {
clearTimeout(this.heartbeat);
}
// Http 4XX error
if (code >= 4400 && code <= 4499) {
// Unauthorized - Invalid/missing API key.
if (code === 4401) {
customSocket.logger.debug('Invalid API key, waiting for new key...');
return;
}
// Rate limited
if (code === 4429) {
try {
let interval: NodeJS.Timeout | undefined;
const retryAfter = parseInt(message['Retry-After'], 10) || 30;
customSocket.logger.debug('Rate limited, retrying after %ss', retryAfter);
// Less than 30s
if (retryAfter <= 30) {
let seconds = retryAfter;
// Print retry once per second
interval = setInterval(() => {
seconds--;
customSocket.logger.debug('Retrying connection in %ss', seconds);
}, ONE_SECOND);
}
if (retryAfter >= 1) {
await sleep(ONE_SECOND * retryAfter);
}
if (interval) {
clearInterval(interval);
}
} catch {}
}
// Known status code
if (Object.keys(responses).includes(`${code}`)) {
await responses[code](message);
} else {
// Unknown status code
await responses[4500]();
}
// We likely closed this
// This is usually because the API key is updated
if (code === 4200) {
// Reconnect
customSocket.connect();
return;
} catch (error: unknown) {
if (isNodeError(error, AppError)) {
customSocket.logger.debug('Connection closed with code=%s reason="%s"', code, error.message);
}
// Something went wrong on the connection
// Let's wait an extra bit
if (code === 4500) {
await sleep(ONE_SECOND * 5);
}
} catch (error) {
customSocket.logger.debug('Connection closed with code=%s reason="%s"', code, error.message);
}
try {
@@ -163,15 +173,17 @@ export class CustomSocket {
// Reconnect
await customSocket.connect(customSocket.connectionAttempts + 1);
} catch (error) {
customSocket.logger.debug('Failed reconnecting to %s reason="%s"', customSocket.uri, error.message);
} catch (error: unknown) {
if (isNodeError(error, AppError)) {
customSocket.logger.debug('Failed reconnecting to %s reason="%s"', customSocket.uri, error.message);
}
}
};
}
public onMessage() {
const customSocket = this;
return async function (message: string, ...args) {
return async function (message: string, ...args: any[]) {
customSocket.logger.silly('message="%s" args="%s"', message, ...args);
};
}
@@ -217,8 +229,10 @@ export class CustomSocket {
// Failed replying as socket isn't open
this.logger.error('Failed replying to %s. state=%s message="%s"', client?.url, client.readyState, message);
} catch (error) {
this.logger.error('Failed replying to %s.', client?.url, error);
} catch (error: unknown) {
if (isNodeError(error, AppError)) {
this.logger.error('Failed replying to %s.', client?.url, error);
}
}
}
@@ -276,8 +290,10 @@ export class CustomSocket {
// Log we connected
this.logger.debug('Connected to %s', this.uri);
} catch (error) {
this.logger.error('Failed connecting reason=%s', error.message);
} catch (error: unknown) {
if (isNodeError(error, AppError)) {
this.logger.error('Failed connecting reason=%s', error.message);
}
} finally {
lock.release();
}
@@ -290,8 +306,10 @@ export class CustomSocket {
// 4200 === ok
this.connection.close(4200);
}
} catch (error) {
this.logger.error('Failed disconnecting reason=%s', error.message);
} catch (error: unknown) {
if (isNodeError(error, AppError)) {
this.logger.error('Failed disconnecting reason=%s', error.message);
}
} finally {
lock.release();
}

View File

@@ -1,6 +1,6 @@
import { INTERNAL_WS_LINK } from '../../consts';
import { apiManager, relayLogger } from '../../core';
import { sleep } from '../../core/utils';
import { isNodeError, sleep } from '../../core/utils';
import { AppError } from '../../core/errors';
import { CustomSocket, WebSocketWithHeartBeat } from '../custom-socket';
import { MothershipSocket } from './mothership';
@@ -17,33 +17,27 @@ export class InternalGraphql extends CustomSocket {
});
}
protected async getApiKey() {
const key = apiManager.getKey('my_servers');
if (!key) {
throw new AppError('No API key found.');
}
return key.key;
}
onMessage() {
const internalGraphql = this;
const mothership = this.mothership;
return async function (this: WebSocketWithHeartBeat, data: string) {
try {
internalGraphql.mothership?.connection?.send(data);
} catch (error) {
// Relay socket is closed, close internal one
if (error.message.includes('WebSocket is not open')) {
this.close(4200, JSON.stringify({
message: error.emss
}));
mothership?.connection?.send(data);
} catch (error: unknown) {
if (isNodeError(error, AppError)) {
// Relay socket is closed, close internal one
if (error.message.includes('WebSocket is not open')) {
this.close(4200, JSON.stringify({
message: error.message
}));
}
}
}
};
}
onError() {
const internalGraphql = this;
const connect = this.connect.bind(this);
const logger = this.logger;
return async function (error: NodeJS.ErrnoException) {
if (error.message === 'WebSocket was closed before the connection was established') {
// Likely the internal relay-ws connection was started but then mothership
@@ -59,19 +53,19 @@ export class InternalGraphql extends CustomSocket {
await sleep(1000);
// Re-connect to internal graphql server
internalGraphql.connect();
connect();
return;
}
internalGraphql.logger.error(error);
logger.error(error);
};
}
onConnect() {
const internalGraphql = this;
const apiKey = this.apiKey;
return async function (this: WebSocketWithHeartBeat) {
// No API key, close internal connection
if (!internalGraphql.apiKey) {
if (!apiKey) {
this.close(4200, JSON.stringify({
message: 'No API key'
}));
@@ -81,9 +75,18 @@ export class InternalGraphql extends CustomSocket {
this.send(JSON.stringify({
type: 'connection_init',
payload: {
'x-api-key': internalGraphql.apiKey
'x-api-key': apiKey
}
}));
};
}
protected async getApiKey() {
const key = apiManager.getKey('my_servers');
if (!key) {
throw new AppError('No API key found.');
}
return key.key;
}
}

View File

@@ -1,6 +1,6 @@
import { MOTHERSHIP_RELAY_WS_LINK, ONE_MINUTE } from '../../consts';
import { mothershipLogger, apiManager } from '../../core';
import { getMachineId, sleep } from '../../core/utils';
import { getMachineId, isNodeError, sleep } from '../../core/utils';
import { varState, networkState } from '../../core/states';
import { subscribeToServers } from '../subscribe-to-servers';
import { AppError } from '../../core/errors';
@@ -24,16 +24,107 @@ export class MothershipSocket extends CustomSocket {
});
}
private connectToInternalGraphql(options: InternalGraphql['options'] = {}) {
this.internalGraphqlSocket = new InternalGraphql(options);
onConnect() {
const connectToInternalGraphql = this.connectToInternalGraphql.bind(this);
const connectToMothershipsGraphql = this.connectToMothershipsGraphql.bind(this);
const onConnect = super.onConnect.bind(this);
return async function (this: WebSocketWithHeartBeat) {
try {
// Run super
onConnect();
// Connect to local graphql
connectToInternalGraphql();
// Sub to /servers on mothership
await connectToMothershipsGraphql();
} catch (error: unknown) {
if (isNodeError(error, AppError)) {
const code = (error.code) ?? 500;
this.close(`${code}`.length === 4 ? Number(code) : Number(`4${code}`), JSON.stringify({
message: error.message ?? 'Internal Server Error'
}));
}
}
};
}
private async connectToMothershipsGraphql() {
this.mothershipServersEndpoint = await subscribeToServers(this.apiKey);
onDisconnect() {
const internalGraphqlSocket = this.internalGraphqlSocket;
const disconnectFromMothershipsGraphql = this.disconnectFromMothershipsGraphql.bind(this);
const logger = this.logger;
const onDisconnect = super.onDisconnect.bind(this);
return async function (this: WebSocketWithHeartBeat, code: number, _message: string) {
try {
// Close connection to local graphql endpoint
internalGraphqlSocket?.connection?.close(200);
// Close connection to motherships's server's endpoint
await disconnectFromMothershipsGraphql();
// Process disconnection
onDisconnect();
} catch (error: unknown) {
if (isNodeError(error, AppError)) {
logger.debug('Connection closed with code=%s reason="%s"', code, error.message);
}
}
};
}
private async disconnectFromMothershipsGraphql() {
this.mothershipServersEndpoint?.unsubscribe();
// When we get a message from relay send it through to our local graphql instance
onMessage() {
const internalGraphqlSocket = this.internalGraphqlSocket;
const sendMessage = this.sendMessage.bind(this);
const logger = this.logger;
return async function (this: WebSocketWithHeartBeat, data: string) {
try {
await sendMessage(internalGraphqlSocket?.connection, data);
} catch (error: unknown) {
if (isNodeError(error, AppError)) {
// Something weird happened while processing the message
// This is likely a malformed message
logger.error('Failed sending message to relay.', error);
}
}
};
}
onError() {
const logger = this.logger;
return async function (this: WebSocketWithHeartBeat, error: NodeJS.ErrnoException) {
try {
logger.error(error);
// The relay is down
if (error.message.includes('502')) {
// Sleep for 30 seconds
await sleep(ONE_MINUTE / 2);
}
// Connection refused, aka couldn't connect
// This is usually because the address is wrong or offline
if (error.code === 'ECONNREFUSED') {
// @ts-expect-error
logger.debug('Couldn\'t connect to %s:%s', error.address, error.port);
return;
}
// Closed before connection started
if (error.message.toString().includes('WebSocket was closed before the connection was established')) {
logger.debug(error.message);
return;
}
throw error;
} catch {
// Unknown error
logger.error('socket error', error);
} finally {
// Kick the connection
this.close(4500, JSON.stringify({ message: error.message }));
}
};
}
protected async getApiKey() {
@@ -48,8 +139,8 @@ export class MothershipSocket extends CustomSocket {
protected async getHeaders() {
const apiKey = apiManager.getKey('my_servers')?.key!;
const keyFile = varState.data?.regFile ? readFileIfExists(varState.data?.regFile).toString('base64') : '';
const serverName = `${varState.data?.name}`;
const lanIp = networkState.data.find(network => network.ipaddr[0]).ipaddr[0] || '';
const serverName = `${varState.data?.name as string}`;
const lanIp: string = networkState.data.find(network => network.ipaddr[0]).ipaddr[0] || '';
const machineId = `${await getMachineId()}`;
return {
@@ -62,94 +153,15 @@ export class MothershipSocket extends CustomSocket {
};
}
onConnect() {
const mothership = this;
const onConnect = super.onConnect;
return async function (this: WebSocketWithHeartBeat) {
try {
// Run super
onConnect();
// Connect to local graphql
mothership.connectToInternalGraphql();
// Sub to /servers on mothership
mothership.connectToMothershipsGraphql();
} catch (error) {
this.close(error.code.length === 4 ? error.code : `4${error.code}`, JSON.stringify({
message: error.message ?? 'Internal Server Error'
}));
}
};
private connectToInternalGraphql(options: InternalGraphql['options'] = {}) {
this.internalGraphqlSocket = new InternalGraphql(options);
}
onDisconnect() {
const mothership = this;
const onDisconnect = super.onDisconnect;
return async function (this: WebSocketWithHeartBeat, code: number, _message: string) {
try {
// Close connection to local graphql endpoint
mothership.internalGraphqlSocket?.connection?.close(200);
// Close connection to motherships's server's endpoint
mothership.disconnectFromMothershipsGraphql();
// Process disconnection
onDisconnect();
} catch (error) {
mothership.logger.debug('Connection closed with code=%s reason="%s"', code, error.message);
}
};
private async connectToMothershipsGraphql() {
this.mothershipServersEndpoint = await subscribeToServers(this.apiKey);
}
// When we get a message from relay send it through to our local graphql instance
onMessage() {
const mothership = this;
return async function (this: WebSocketWithHeartBeat, data: string) {
try {
await mothership.sendMessage(mothership.internalGraphqlSocket?.connection, data);
} catch (error) {
// Something weird happened while processing the message
// This is likely a malformed message
mothership.logger.error('Failed sending message to relay.', error);
}
};
}
onError() {
const mothership = this;
return async function (this: WebSocketWithHeartBeat, error: NodeJS.ErrnoException) {
try {
mothership.logger.error(error);
// The relay is down
if (error.message.includes('502')) {
// Sleep for 30 seconds
await sleep(ONE_MINUTE / 2);
}
// Connection refused, aka couldn't connect
// This is usually because the address is wrong or offline
if (error.code === 'ECONNREFUSED') {
// @ts-expect-error
mothership.logger.debug('Couldn\'t connect to %s:%s', error.address, error.port);
return;
}
// Closed before connection started
if (error.toString().includes('WebSocket was closed before the connection was established')) {
mothership.logger.debug(error.message);
return;
}
throw error;
} catch {
// Unknown error
mothership.logger.error('socket error', error);
} finally {
// Kick the connection
this.close(4500, JSON.stringify({ message: error.message }));
}
};
private async disconnectFromMothershipsGraphql() {
this.mothershipServersEndpoint?.unsubscribe();
}
}

View File

@@ -1,8 +1,7 @@
import { pubsub } from '../core';
import { pubsub, log as logger } from '../core';
import { SubscriptionClient } from 'graphql-subscriptions-client';
import { MOTHERSHIP_GRAPHQL_LINK, ONE_SECOND } from '../consts';
import { userCache, CachedServers } from '../cache';
import { log as logger } from '../core';
const log = logger.createChild({ prefix: 'subscribe-to-servers' });
const client = new SubscriptionClient(MOTHERSHIP_GRAPHQL_LINK, {

View File

@@ -16,7 +16,7 @@ export const applyJitter = (value: number) => {
};
export const backoff = (attempt: number, maxDelay: number, multiplier: number) => {
const delay = applyJitter(Math.pow(2.0, attempt - 1.0) * 0.5);
const delay = applyJitter(2.0 ** (attempt - 1.0) * 0.5);
return Math.round(Math.min(delay * multiplier, maxDelay));
};

View File

@@ -1,6 +1,7 @@
import type { CoreResult } from './core/types';
import type { CoreContext, CoreResult } from './core/types';
import { pubsub, coreLogger } from './core';
import { debugTimer } from './core/utils';
import { debugTimer, isNodeError } from './core/utils';
import { AppError } from './core/errors';
/**
* Publish update to topic channel.
@@ -26,7 +27,7 @@ export const publish = async (channel: string, mutation: string, node?: Record<s
interface RunOptions {
node?: Record<string, unknown>;
moduleToRun?: (context: any) => CoreResult;
moduleToRun?: (context: CoreContext) => Promise<CoreResult>;
context?: any;
}
@@ -57,14 +58,16 @@ export const run = async (channel: string, mutation: string, options: RunOptions
coreLogger.silly(`run:${moduleToRun.name}`, JSON.stringify(result.json));
// Save result
publish(channel, mutation, result.json);
} catch (error: any) {
// Ensure we aren't leaking anything in production
if (process.env.NODE_ENV === 'production') {
coreLogger.debug('Error:', error.message);
} else {
const logger = coreLogger[error.status && error.status >= 400 ? 'error' : 'warn'].bind(coreLogger);
logger('Error:', error.message);
await publish(channel, mutation, result.json);
} catch (error: unknown) {
if (isNodeError(error, AppError)) {
// Ensure we aren't leaking anything in production
if (process.env.NODE_ENV === 'production') {
coreLogger.debug('Error:', error.message);
} else {
const logger = coreLogger[error.status && error.status >= 400 ? 'error' : 'warn'].bind(coreLogger);
logger('Error:', error.message);
}
}
}

View File

@@ -84,7 +84,6 @@ app.get('/', (_, res) => {
});
// Handle errors by logging them and returning a 500.
// eslint-disable-next-line @typescript-eslint/no-var-requires
app.use((error, _, res, __) => {
log.error(error);
if (error.stack) {