fix: add better socket lookup

This commit is contained in:
Alexis Tyler
2021-02-12 10:43:20 +10:30
parent f6a4b77380
commit b5044e5e2f
2 changed files with 23 additions and 20 deletions

View File

@@ -5,6 +5,7 @@ import { log } from '../core';
import { AppError } from '../core/errors';
import { isNodeError, sleep } from '../core/utils';
import { backoff } from './utils';
import { sockets } from '../sockets';
export interface WebSocketWithHeartBeat extends WebSocket {
heartbeat?: NodeJS.Timeout;
@@ -128,9 +129,7 @@ export class CustomSocket {
// Log we connected
this.logger.debug('Connected to %s', this.uri);
} catch (error: unknown) {
if (isNodeError(error, AppError)) {
this.logger.error('Failed connecting reason=%s', error.message);
}
this.logger.error('Failed connecting reason=%s', (error as Error).message);
} finally {
lock.release();
}
@@ -144,9 +143,7 @@ export class CustomSocket {
this.connection.close(4200);
}
} catch (error: unknown) {
if (isNodeError(error, AppError)) {
this.logger.error('Failed disconnecting reason=%s', error.message);
}
this.logger.error('Failed disconnecting reason=%s', (error as Error).message);
} finally {
lock.release();
}
@@ -154,11 +151,7 @@ export class CustomSocket {
public async reconnect() {
this.logger.debug('Reconnecting...');
await this.disconnect();
this.logger.debug('Disconnected, waiting 1s before reconnecting.');
await sleep(1000);
await this.connect();
this.logger.debug('Reconnected');
return this.disconnect();
}
protected onDisconnect() {
@@ -177,6 +170,17 @@ export class CustomSocket {
4401: async () => {
this.logger.debug('Invalid API key, waiting for new key...');
},
// Request Timeout - Mothership disconnected us.
4408: async () => {
// Mothership kicked this connection for any number of reasons
this.logger.debug('Kicked by mothership, reconnecting...');
// Wait for 5s before allowing reconnection
await sleep(ONE_SECOND * 5);
// Let's reset the reconnect count so we reconnect instantly
this.connectionAttempts = 0;
},
// Rate limited
4429: async message => {
try {
@@ -269,16 +273,17 @@ export class CustomSocket {
return true;
}
protected async sendMessage(client?: WebSocketWithHeartBeat, message?: string, timeout = 1000) {
protected async sendMessage(clientName: string, message?: string, timeout = 1000) {
const client = sockets.get(clientName)?.connection;
try {
if (!client || client.readyState === 0 || client.readyState === 3) {
if (!client || client.readyState === 0 || client.readyState === 2 || client.readyState === 3) {
this.logger.silly('Waiting %ss to retry sending to %s.', timeout / 1000, client?.url);
// Wait for $timeout seconds
await sleep(timeout);
// Retry sending
await this.sendMessage(client, message, timeout);
return;
return this.sendMessage(clientName, message, timeout);
}
// Only send when socket is open
@@ -291,9 +296,7 @@ export class CustomSocket {
// Failed replying as socket isn't open
this.logger.error('Failed replying to %s. state=%s message="%s"', client?.url, client.readyState, message);
} catch (error: unknown) {
if (isNodeError(error, AppError)) {
this.logger.error('Failed replying to %s.', client?.url, error);
}
this.logger.error('Failed replying to %s with %s.', client?.url, (error as Error).message);
}
}
@@ -319,6 +322,7 @@ export class CustomSocket {
this.connection = new WebSocket(this.uri, ['graphql-ws'], {
headers: await this.getHeaders()
});
this.connection.on('ping', heartbeat.bind(this.connection));
this.connection.on('error', this.onError());
this.connection.on('close', this.onDisconnect());

View File

@@ -3,7 +3,6 @@ import { apiManager, relayLogger } from '../../core';
import { isNodeError, sleep } from '../../core/utils';
import { AppError } from '../../core/errors';
import { CustomSocket, WebSocketWithHeartBeat } from '../custom-socket';
import { sockets } from '../../sockets';
export class InternalGraphql extends CustomSocket {
constructor(options: CustomSocket['options'] = {}) {
@@ -28,7 +27,7 @@ export class InternalGraphql extends CustomSocket {
logger.silly('Received message from the internal API, forwarding to the relay');
// Forward message
await sendMessage(sockets.get('relay')?.connection, data);
await sendMessage('relay', data);
logger.silly('Message sent to the API successfully.');
} catch (error: unknown) {
if (isNodeError(error, AppError)) {