fix: ensure we connect to the correct sockets

This commit is contained in:
Alexis Tyler
2021-02-03 09:31:16 +10:30
parent 1b50dfb1c0
commit 434a4d25e4
8 changed files with 103 additions and 66 deletions
+11 -3
View File
@@ -82,9 +82,12 @@ export class ApiManager extends EventEmitter {
});
}
// Load inital keys in
this.checkKey(configPath, true).catch(error => {
log.debug('Failing loading inital keys');
// Load my_servers key
log.debug('Loading MyServers API key...');
this.checkKey(configPath, true).then(() => {
log.debug('Loaded MyServers API key!');
}).catch(error => {
log.debug('Failing loading MyServers API key with %s', error);
});
}
@@ -102,6 +105,7 @@ export class ApiManager extends EventEmitter {
this.add(name, key, options);
// Emit update
log.debug('Emitting "replace" event');
this.emit('replace', name, this.getKey(name));
}
@@ -130,6 +134,7 @@ export class ApiManager extends EventEmitter {
this.keys.add(name, keyObject, ttl);
// Emit update
log.debug('Emitting "add" event');
this.emit('add', name, this.getKey(name));
}
@@ -264,6 +269,7 @@ export class ApiManager extends EventEmitter {
private async checkKey(filePath: string, force = false) {
const lock = await this.getLock();
try {
coreLogger.debug('Checking API key for validity.');
const file = loadState<{ remote: { apikey: string } }>(filePath);
const apiKey = dotProp.get(file, 'remote.apikey')! as string;
@@ -275,9 +281,11 @@ export class ApiManager extends EventEmitter {
// Ensure key format is valid before validating
validateApiKeyFormat(apiKey);
coreLogger.debug('API key is in the correct format, checking key\'s validity...');
// Ensure key is valid before connecting
await validateApiKey(apiKey);
coreLogger.debug('API key is valid.');
// Add the new key
this.replace('my_servers', apiKey, {
+4
View File
@@ -26,6 +26,10 @@ export class State {
return this._data;
}
get data() {
return this._data;
}
set data(data: any) {
this._data = data;
}
+1
View File
@@ -380,6 +380,7 @@ setIntervalAsync(async () => {
}, 1000);
export const graphql = {
debug,
introspection: debug,
playground: debug ? {
subscriptionEndpoint: '/graphql'
+48 -16
View File
@@ -8,10 +8,11 @@ import am from 'am';
import * as Sentry from '@sentry/node';
import exitHook from 'async-exit-hook';
import getServerAddress from 'get-server-address';
import { core, states, coreLogger, log } from './core';
import { core, states, coreLogger, log, apiManager } from './core';
import { server } from './server';
import { apiManager } from './core';
import { InternalGraphql, MothershipSocket } from './mothership';
import { sockets } from './sockets';
import { sleep } from './core/utils';
// eslint-disable-next-line @typescript-eslint/no-var-requires
const { version } = require('../package.json') as { version: string };
@@ -33,10 +34,15 @@ Sentry.setUser({
// Boot app
am(async () => {
let lastknownApiKey: string;
const apiManagerLogger = log.createChild({
prefix: 'ApiManager'
});
// Load core
await core.load();
// Let's try and load the HTTP server
// Try and load the HTTP server
coreLogger.debug('Starting HTTP server');
// Log only if the server actually binds to the port
@@ -57,12 +63,6 @@ am(async () => {
});
});
// Load nchan
await core.loadNchan();
const sockets = new Map<string, MothershipSocket | InternalGraphql>();
let lastknownApiKey: string;
// If key is removed then disconnect our sockets
apiManager.on('expire', async name => {
try {
@@ -71,14 +71,21 @@ am(async () => {
return;
}
// Disconnect relay
apiManagerLogger.debug('Disconnecting relay');
await sockets.get('relay')?.disconnect();
// Disconnect internal graphql
apiManagerLogger.debug('Disconnecting internalGraphql');
await sockets.get('internalGraphql')?.disconnect();
} catch (error: unknown) {
log.error('Failed updating sockets on apiKey "expire" event with error %s.', error);
apiManagerLogger.error('Failed updating sockets on apiKey "expire" event with error %s.', error);
}
});
// If the key changes try to (re)connect to Mothership
// The internal graphql check needs to be done
// first so it'll be up before relay connects
apiManager.on('replace', async (name, newApiKey) => {
try {
// Bail if this isn't our key
@@ -86,15 +93,35 @@ am(async () => {
return;
}
// If we're missing our sockets let's create them
if (!sockets.has('relay') || !sockets.has('internalGraphql')) {
sockets.set('relay', new MothershipSocket({ apiKey: lastknownApiKey }));
sockets.set('internalGraphql', new InternalGraphql({ apiKey: lastknownApiKey }));
// If either socket is missing let's connect them
if (!sockets.has('internalGraphql') || !sockets.has('relay')) {
// Create internal graphql socket if it's missing
if (!sockets.has('internalGraphql')) {
// If the graphql server has no address it's likely still
// starting up so let's wait so we don't hit a 1006 error
if (server.server.address() !== null) {
apiManagerLogger.debug('Internal graphql isn\'t started, waiting 2s');
await sleep(2000);
}
// Create internal graphql socket
apiManagerLogger.debug('Creating internal graphql socket');
sockets.set('internalGraphql', new InternalGraphql({ apiKey: lastknownApiKey }));
}
// Create relay socket if it's missing
if (!sockets.has('relay')) {
// Create relay socket
apiManagerLogger.debug('Creating relay socket');
sockets.set('relay', new MothershipSocket({ apiKey: lastknownApiKey }));
}
return;
}
// If the key is the same as the one we're already connected with ignore it.
// Ignore this key if it's the same as our current key.
if (newApiKey === lastknownApiKey) {
apiManagerLogger.debug('API key has\'t changed');
return;
}
@@ -102,9 +129,14 @@ am(async () => {
await sockets.get('relay')?.reconnect();
await sockets.get('internalGraphql')?.reconnect();
} catch (error: unknown) {
log.error('Failed updating sockets on apiKey "replace" event with error %s.', error);
apiManagerLogger.error('Failed updating sockets on apiKey "replace" event with error %s.', error);
}
});
// Load nchan
core.loadNchan().catch(error => {
log.error(error);
});
}, async (error: NodeJS.ErrnoException) => {
// Send error to server for debugging
Sentry.captureException(error);
+4 -1
View File
@@ -153,9 +153,12 @@ export class CustomSocket {
}
public async reconnect() {
this.logger.debug('Reconnecting...');
await this.disconnect();
this.logger.debug('Disconnected, waiting 1s before reconnecting.');
await sleep(1000);
await this.connect();
this.logger.debug('Reconnected');
}
protected onDisconnect() {
@@ -247,7 +250,7 @@ export class CustomSocket {
protected async cleanup() {
// Kill existing socket connection
if (this.connection) {
if (this.connection?.heartbeat) {
this.connection.close(4200, JSON.stringify({
message: 'Reconnecting'
}));
+20 -18
View File
@@ -3,11 +3,9 @@ import { apiManager, relayLogger } from '../../core';
import { isNodeError, sleep } from '../../core/utils';
import { AppError } from '../../core/errors';
import { CustomSocket, WebSocketWithHeartBeat } from '../custom-socket';
import { MothershipSocket } from './mothership';
import { sockets } from '../../sockets';
export class InternalGraphql extends CustomSocket {
private readonly mothership?: MothershipSocket;
constructor(options: CustomSocket['options'] = {}) {
super({
name: 'InternalGraphql',
@@ -18,14 +16,20 @@ export class InternalGraphql extends CustomSocket {
}
onMessage() {
// eslint-disable-next-line @typescript-eslint/no-this-alias
const self = this;
const logger = this.logger;
const sendMessage = this.sendMessage.bind(this);
return async function (this: WebSocketWithHeartBeat, data: string) {
try {
logger.silly('Recieved message for the API forwarding.');
self.mothership?.connection?.send(data);
logger.silly('Message sent to the API successfully.');
// Internal API accepted our authentication message
if (data === '{"type":"connection_ack"}') {
logger.debug('Internal graphql accepted authentication');
return;
}
logger.debug('Received message from the internal API, forwarding to the relay');
// Forward message
await sendMessage(sockets.get('relay')?.connection, data);
logger.debug('Message sent to the API successfully.');
} catch (error: unknown) {
if (isNodeError(error, AppError)) {
// Relay socket is closed, close internal one
@@ -40,10 +44,7 @@ export class InternalGraphql extends CustomSocket {
}
onError() {
// eslint-disable-next-line @typescript-eslint/no-this-alias
const self = this;
const logger = this.logger;
return async function (error: NodeJS.ErrnoException) {
return async (error: NodeJS.ErrnoException) => {
if (error.message === 'WebSocket was closed before the connection was established') {
// Likely the internal relay-ws connection was started but then mothership
// decided the key was invalid so it killed it
@@ -58,30 +59,31 @@ export class InternalGraphql extends CustomSocket {
await sleep(1000);
// Re-connect to internal graphql server
await self.connect();
await this.connect();
return;
}
logger.error(error);
this.logger.error(error);
};
}
onConnect() {
// eslint-disable-next-line @typescript-eslint/no-this-alias
const self = this;
const apiKey = this.apiKey;
const logger = this.logger;
return async function (this: WebSocketWithHeartBeat) {
// No API key, close internal connection
if (!self.apiKey) {
if (!apiKey) {
this.close(4200, JSON.stringify({
message: 'No API key'
}));
}
// Authenticate with ourselves
logger.debug('Authenticating with internal graphql');
this.send(JSON.stringify({
type: 'connection_init',
payload: {
'x-api-key': self.apiKey
'x-api-key': apiKey
}
}));
};
+11 -28
View File
@@ -6,10 +6,9 @@ import { subscribeToServers } from '../subscribe-to-servers';
import { AppError } from '../../core/errors';
import { readFileIfExists } from '../utils';
import { CustomSocket, WebSocketWithHeartBeat } from '../custom-socket';
import { InternalGraphql } from './internal-graphql';
import { sockets } from '../../sockets';
export class MothershipSocket extends CustomSocket {
private internalGraphqlSocket?: CustomSocket;
private mothershipServersEndpoint?: {
unsubscribe: () => void;
};
@@ -25,7 +24,6 @@ export class MothershipSocket extends CustomSocket {
}
onConnect() {
const connectToInternalGraphql = this.connectToInternalGraphql.bind(this);
const connectToMothershipsGraphql = this.connectToMothershipsGraphql.bind(this);
const onConnect = super.onConnect.bind(this);
return async function (this: WebSocketWithHeartBeat) {
@@ -33,9 +31,6 @@ export class MothershipSocket extends CustomSocket {
// Run super
onConnect();
// Connect to local graphql
connectToInternalGraphql();
// Sub to /servers on mothership
await connectToMothershipsGraphql();
} catch (error: unknown) {
@@ -50,22 +45,16 @@ export class MothershipSocket extends CustomSocket {
}
onDisconnect() {
// eslint-disable-next-line @typescript-eslint/no-this-alias
const self = this;
const logger = this.logger;
return async function (this: WebSocketWithHeartBeat, code: number, _message: string) {
return async (code: number, _message: string) => {
try {
// Close connection to local graphql endpoint
self.internalGraphqlSocket?.connection?.close(200);
// Close connection to motherships's server's endpoint
await self.disconnectFromMothershipsGraphql();
await this.disconnectFromMothershipsGraphql();
// Process disconnection
self.onDisconnect();
this.onDisconnect();
} catch (error: unknown) {
if (isNodeError(error, AppError)) {
logger.debug('Connection closed with code=%s reason="%s"', code, error.message);
this.logger.debug('Connection closed with code=%s reason="%s"', code, error.message);
}
}
};
@@ -73,19 +62,17 @@ export class MothershipSocket extends CustomSocket {
// When we get a message from relay send it through to our local graphql instance
onMessage() {
// eslint-disable-next-line @typescript-eslint/no-this-alias
const self = this;
const logger = this.logger;
return async function (this: WebSocketWithHeartBeat, data: string) {
const sendMessage = this.sendMessage.bind(this);
return async (data: string) => {
try {
logger.silly('Recieved message from mothership\'s relay, forwarding to the internal graphql connection');
await self.sendMessage.bind(self)(self.internalGraphqlSocket?.connection, data);
logger.silly('Message sent to the internal graphql connection successfully.');
this.logger.debug('Recieved message from mothership\'s relay, forwarding to the internal graphql connection');
await sendMessage(sockets.get('internalGraphql')?.connection, data);
this.logger.debug('Message sent to the internal graphql connection successfully.');
} catch (error: unknown) {
if (isNodeError(error, AppError)) {
// Something weird happened while processing the message
// This is likely a malformed message
logger.error('Failed sending message to relay.', error);
this.logger.error('Failed sending message to relay.', error);
}
}
};
@@ -154,10 +141,6 @@ export class MothershipSocket extends CustomSocket {
};
}
private connectToInternalGraphql(options: InternalGraphql['options'] = {}) {
this.internalGraphqlSocket = new InternalGraphql(options);
}
private async connectToMothershipsGraphql() {
this.mothershipServersEndpoint = await subscribeToServers(this.apiKey);
}
+4
View File
@@ -0,0 +1,4 @@
import { InternalGraphql } from './mothership/sockets/internal-graphql';
import { MothershipSocket } from './mothership/sockets/mothership';
export const sockets = new Map<string, MothershipSocket | InternalGraphql>();