fix: nchan oom

This commit is contained in:
Alexis Tyler
2021-02-15 14:50:02 +10:30
parent 595c764c11
commit b8fd55579f
4 changed files with 122 additions and 97 deletions
+3 -19
View File
@@ -7,12 +7,11 @@ import path from 'path';
import glob from 'glob';
import camelCase from 'camelcase';
import globby from 'globby';
import pWaitFor from 'p-wait-for';
import pIteration from 'p-iteration';
import clearModule from 'clear-module';
import { coreLogger } from './log';
import { paths } from './paths';
import { subscribeToNchanEndpoint, isNchanUp } from './utils';
import { subscribeToNchanEndpoint } from './utils';
import { config } from './config';
import { pluginManager } from './plugin-manager';
import * as watchers from './watchers';
@@ -160,23 +159,8 @@ const loadNchan = async (): Promise<void> => {
coreLogger.debug('Trying to connect to nchan');
// Wait for nchan to be up.
await pWaitFor(isNchanUp, {
timeout: TEN_SECONDS,
interval: ONE_SECOND
})
// Once connected open a connection to each known endpoint
.then(async () => connectToNchanEndpoints(endpoints))
.catch(error => {
// Nchan is likely unreachable
if (error.message.includes('Promise timed out')) {
coreLogger.error('Nchan timed out while trying to establish a connection.');
return;
}
// Some other error occured
throw error;
});
// Connect to each known endpoint
await connectToNchanEndpoints(endpoints);
};
/**
+38 -77
View File
@@ -3,38 +3,31 @@
* Written by: Alexis Tyler
*/
import path from 'path';
import fetch from 'node-fetch';
import { debugTimer, parseConfig, sleep } from '..';
import xhr2 from 'xhr2';
import windowPolyFill from 'node-window-polyfill';
import { EventSource } from 'launchdarkly-eventsource';
import { parseConfig } from '..';
import * as states from '../../states';
import { coreLogger } from '../../log';
import { log } from '../../log';
import { AppError } from '../../errors';
const data = {};
const nchanLogger = log.createChild({
prefix: 'nchan'
});
// Load polyfills for nchan
windowPolyFill.register(false);
global.XMLHttpRequest = xhr2;
global.EventSource = EventSource;
// eslint-disable-next-line @typescript-eslint/no-var-requires
const NchanSubscriber = require('nchan');
const getSubEndpoint = () => {
const httpPort: string = states.varState.data?.port;
return `http://localhost:${httpPort}/sub`;
};
export const isNchanUp = async () => {
const isUp = await fetch(`${getSubEndpoint()}/non-existant`, {
method: 'HEAD'
})
.then(() => true)
.catch(error => {
// Socket is up but this endpoint is invalid
// That's to be expected though.
if (error.code === 'ECONNRESET') {
return true;
}
return false;
});
return isUp;
};
const endpointToStateMapping = {
// Cpuload: ,
devs: states.devicesState,
@@ -49,66 +42,34 @@ const endpointToStateMapping = {
var: states.varState
};
const subscribe = async (endpoint: string) => {
// Wait 1s before subscribing
await sleep(1000);
debugTimer(`subscribe(${endpoint})`);
const response = await fetch(`${getSubEndpoint()}/${endpoint}`).catch(async () => {
// If we throw then let's check if nchan is down
// or if it's an actual error
const isUp = await isNchanUp();
if (isUp) {
throw new AppError(`Cannot connect to nchan at ${getSubEndpoint()}/${endpoint}`);
}
throw new AppError('Cannot connect to nchan');
const subscribe = async (endpoint: string) => new Promise<void>(resolve => {
const sub = new NchanSubscriber(`${getSubEndpoint()}/${endpoint}`, {
subscriber: 'eventsource'
});
if (response.status === 502) {
// Status 502 is a connection timeout error,
// may happen when the connection was pending for too long,
// and the remote server or a proxy closed it
// let's reconnect
await subscribe(endpoint);
} else if (response.status === 200) {
// Get and show the message
const message = await response.text();
sub.on('connect', function (_event) {
nchanLogger.debug('Connected!');
resolve();
});
// Create endpoint field on data
if (!data[endpoint]) {
const fileName = endpoint + '.js';
data[endpoint] = {
handlerPath: path.resolve(__dirname, '../../states', fileName)
};
}
sub.on('message', function (message, _messageMetadata) {
try {
const state = parseConfig({
file: message,
type: 'ini'
});
// Only re-run parser if the message changed
if (data[endpoint].message !== message) {
data[endpoint].updated = new Date();
data[endpoint].message = message;
// Update state
endpointToStateMapping[endpoint].parse(state);
} catch {}
});
try {
const state = parseConfig({
file: message,
type: 'ini'
});
sub.on('error', function (error, error_description) {
nchanLogger.error('Error: "%s" \nDescription: "%s"', error, error_description);
});
// Update state
endpointToStateMapping[endpoint].parse(state);
} catch { }
}
debugTimer(`subscribe(${endpoint})`);
} else {
// An error - let's show it
coreLogger.error(JSON.stringify(response));
}
// Re-subscribe
await subscribe(endpoint);
};
sub.start();
});
export const subscribeToNchanEndpoint = async (endpoint: string) => {
if (!Object.keys(endpointToStateMapping).includes(endpoint)) {