fix: two processes mounting to same port

This commit is contained in:
Alexis Tyler
2020-11-25 09:53:24 +10:30
parent 090734b5e1
commit 27d971bfee
7 changed files with 111 additions and 72 deletions
+9 -22
View File
@@ -3,7 +3,6 @@
* Written by: Alexis Tyler
*/
import { StoppableServer } from 'stoppable';
import path from 'path';
import glob from 'glob';
import exitHook from 'async-exit-hook';
@@ -19,6 +18,7 @@ import { subscribeToNchanEndpoint, isNchanUp } from './utils';
import { config } from './config';
import { pluginManager } from './plugin-manager';
import * as watchers from './watchers';
import { server as Server } from '../server';
// Have plugins loaded at least once
let pluginsLoaded = false;
@@ -197,7 +197,6 @@ const loaders = {
* @name core.load
*/
const load = async(): Promise<void> => {
coreLogger.debug('Starting...');
await loadStatePaths();
await loadPlugins();
await loadWatchers();
@@ -207,32 +206,18 @@ const load = async(): Promise<void> => {
if (process.env.NCHAN !== 'disable') {
await loadNchan();
}
coreLogger.debug('Loaded!');
};
/**
* A server instance.
*/
interface Server {
server: StoppableServer;
start: () => Promise<StoppableServer> | StoppableServer;
stop: () => Promise<void> | void;
}
/**
* Loads a server.
*
* @name core.loadServer
* @param name The name of the server instance to load.
*/
export const loadServer = async(name: string, server: Server): Promise<void> => {
export const loadServer = async(name: string, server: typeof Server): Promise<void> => {
// Set process title
process.title = name;
// Human readable name
const serverName = `@unraid/${name}`;
// Start the server.
coreLogger.debug('Starting server');
@@ -242,18 +227,20 @@ export const loadServer = async(name: string, server: Server): Promise<void> =>
});
// Start server
await server.start();
coreLogger.debug(`Started ${name}`);
await server.start().catch(error => {
log.error(error);
});
// On process exit
exitHook(async() => {
if (process.env.DEBUG) {
// Only do this when there's a TTY present
if (process.stdout.isTTY) {
// Ensure we go back to the start of the line
// this causes the ^C the be overridden on a CTRL+C
process.stdout.write('\r');
coreLogger.info('Stopping server');
}
coreLogger.debug('Stopping server');
// Stop the server
await server.stop();
+18 -4
View File
@@ -6,6 +6,7 @@
import { format } from 'util';
import chalk from 'chalk';
import SysLogger from 'ain2';
import getCurrentLine from 'get-current-line';
import getHex from 'number-to-color/hexMap.js';
import { redactSecrets } from 'redact-secrets';
@@ -13,7 +14,7 @@ const levels = ['error', 'warn', 'info', 'debug', 'trace'] as const;
const transports = ['console', 'syslog'] as const;
class Logger {
public level = 'info' as typeof levels[number];
public level = (process.env.LOG_LEVEL ?? 'info') as typeof levels[number];
public levels = levels;
public transport = (process.env.DEBUG ? 'console' : 'syslog') as typeof transports[number];
public transports = transports;
@@ -35,6 +36,19 @@ class Logger {
return chalk.hex(hex)(string);
}
private _getLineInfo(offset = 0) {
// Bail unless we're in debug mode and we have line info enabled
if (!process.env.DEBUG && process.env.LINE_INFO) return;
const { line: lineNumber, file } = getCurrentLine({
frames: 3 + offset
});
const cwd = process.cwd();
const filePath = file.startsWith(cwd) ? file.replace(cwd, '.') : file;
const lineInfo = `${filePath}:${lineNumber}`;
return `[${chalk.hex('FF4500')(lineInfo)}]`;
}
constructor(private prefix: string = '') {
try {
this.syslog = new SysLogger({
@@ -100,7 +114,7 @@ class Logger {
}
debug(message: string, ...args: any[]): void {
this.log('debug', message, args);
this.log('debug', message, [...args, this._getLineInfo()]);
}
info(message: string, ...args: any[]): void {
@@ -115,9 +129,9 @@ class Logger {
error(message: string, ...args: any[]): void;
error(message: any, ...args: any[]): void {
if (message instanceof Error) {
this.log('error', message.message, args);
this.log('error', message.message, [...args, this._getLineInfo()]);
} else {
this.log('error', message, args);
this.log('error', message, [...args, this._getLineInfo()]);
}
}
+1 -1
View File
@@ -32,6 +32,6 @@ export const exitApp = (error?: Error, exitCode?: number) => {
coreLogger.error(error);
// Kill application
process.exitCode = exitCode;
process.exit(exitCode);
}
};
+1 -1
View File
@@ -49,7 +49,7 @@ export const getNodeService = async (user: User, namespace: string): Promise<Nod
return pid;
}
coreLogger.trace(`No PID found in cache for ${namespace}`);
coreLogger.debug('No PID found in cache for %s', namespace);
pid = await execa.command(`pidof ${namespace}`)
.then(output => {
const pids = cleanStdout(output).split('\n');
+76 -44
View File
@@ -6,6 +6,7 @@
import fs from 'fs';
import net from 'net';
import path from 'path';
import execa from 'execa';
import stoppable from 'stoppable';
import chokidar from 'chokidar';
import express from 'express';
@@ -13,7 +14,7 @@ import http from 'http';
import WebSocket from 'ws';
import { ApolloServer } from 'apollo-server-express';
import { log, config, utils, paths, pubsub, apiManager, coreLogger } from './core';
import { getEndpoints, globalErrorHandler, exitApp } from './core/utils';
import { getEndpoints, globalErrorHandler, exitApp, cleanStdout, sleep } from './core/utils';
import { graphql } from './graphql';
import { mothership } from './mothership';
import display from './graphql/resolvers/query/display';
@@ -96,38 +97,6 @@ app.use((error, _, res, __) => {
const httpServer = http.createServer(app);
const stoppableServer = stoppable(httpServer);
const handleError = error => {
if (error.code !== 'EADDRINUSE') {
throw error;
}
if (!isNaN(parseInt(port, 10))) {
throw error;
}
stoppableServer.close();
net.connect({
path: port
}, () => {
// Really in use: re-throw
throw error;
}).on('error', (error: NodeJS.ErrnoException) => {
if (error.code !== 'ECONNREFUSED') {
log.error(error);
process.exitCode = 1;
}
// Not in use: delete it and re-listen
fs.unlinkSync(port);
setTimeout(() => {
stoppableServer.listen(port);
}, 1000);
});
};
// Port is a UNIX socket file
if (isNaN(parseInt(port, 10))) {
stoppableServer.on('listening', () => {
@@ -135,7 +104,64 @@ if (isNaN(parseInt(port, 10))) {
return fs.chmodSync(port, 660);
});
stoppableServer.on('error', handleError);
stoppableServer.on('error', async (error: NodeJS.ErrnoException) => {
if (error.code !== 'EADDRINUSE') {
coreLogger.error(error);
throw error;
}
// Check if port is unix socket or numbered port
// If it's a numbered port then throw
if (!isNaN(parseInt(port, 10))) {
throw error;
}
// Check if the process that made this file is still alive
const pid = await execa.command(`lsof -t ${port}`)
.then(output => {
const pids = cleanStdout(output).split('\n');
return pids[0];
});
// Try to kill it?
if (pid) {
await execa.command(`kill -9 ${pid}`);
await sleep(2000);
}
// No pid found or we just killed the old process
// Now let's retry
// Stop the server
stoppableServer.close();
// Restart the server
net.connect({
path: port
}, () => {
exitApp();
}).on('error', (error: NodeJS.ErrnoException) => {
// Port was set to a path that already exists and isn't a unix socket
// Let's bail since we don't know if this was intentional
if (error.code === 'ENOTSOCK') {
coreLogger.debug('%s is not a unix socket and already exists', port);
exitApp();
}
if (error.code !== 'ECONNREFUSED') {
log.error(error);
process.exitCode = 1;
}
// Not in use: delete it and re-listen
fs.unlinkSync(port);
setTimeout(() => {
stoppableServer.listen(port);
}, 1000);
});
});
process.on('uncaughtException', (error: NodeJS.ErrnoException) => {
// Skip EADDRINUSE as it's already handled above
@@ -164,22 +190,28 @@ stoppableServer.on('upgrade', (request, socket, head) => {
// Add graphql subscription handlers
graphApp.installSubscriptionHandlers(wsServer);
const attachApiManagerToMothershipListeners = () => {
// If key is in an invalid format disconnect
apiManager.on('expire', async () => {
await mothership.disconnect();
});
// If key looks valid try and connect with it
apiManager.on('replace', async () => {
await mothership.connect(wsServer);
});
};
export const server = {
httpServer,
server: stoppableServer,
async start() {
// If key is in an invalid format disconnect
apiManager.on('expire', async () => {
await mothership.disconnect();
});
// If key looks valid try and connect with it
apiManager.on('replace', async (name, { key }) => {
await mothership.connect(wsServer);
});
// Start http server
return stoppableServer.listen(port, () => {
// Start listening to API key changes
// When the key changes either disconnect or connect
attachApiManagerToMothershipListeners();
// Downgrade process user to owner of this file
return fs.stat(__filename, (error, stats) => {
if (error) {
+5
View File
@@ -8210,6 +8210,11 @@
"resolved": "https://registry.npmjs.org/get-caller-file/-/get-caller-file-2.0.5.tgz",
"integrity": "sha512-DyFP3BM/3YHTQOCUL/w0OZHR0lpKeGrxotcHWcqNEdnltqFwXVfhEBQ94eIo34AfQpo0rGki4cyIiftY06h2Fg=="
},
"get-current-line": {
"version": "6.3.0",
"resolved": "https://registry.npmjs.org/get-current-line/-/get-current-line-6.3.0.tgz",
"integrity": "sha512-BaSlXSJxaWfR8Zk4uqTVl5Mn85iSfDG3i3Fw6t/xccQzn+G83UW5rirXxINJb8Ifqm6QfL3GZNxWAJksQtwo8g=="
},
"get-intrinsic": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/get-intrinsic/-/get-intrinsic-1.0.1.tgz",
+1
View File
@@ -64,6 +64,7 @@
"express-list-endpoints": "^5.0.0",
"filter-obj": "^2.0.1",
"flatten": "^1.0.3",
"get-current-line": "^6.3.0",
"get-server-address": "^1.0.1",
"glob": "^7.1.6",
"globby": "^11.0.1",