refactor(module:schema/resolvers): switch to event based system

This commit is contained in:
Alexis Tyler
2019-10-20 18:00:18 +10:30
parent 89b2dd2064
commit df3d350ba3

View File

@@ -3,8 +3,11 @@
* Written by: Alexis Tyler
*/
module.exports = function ($injector, GraphQLJSON, GraphQLLong, GraphQLUUID, pubsub, PluginManager, log, PluginError, dee) {
const sleep = ms => new Promise(resolve => setTimeout(resolve, ms));
module.exports = function ($injector, GraphQLJSON, GraphQLLong, GraphQLUUID, pubsub, PluginManager, log, PluginError, dee, Bottleneck, sleep, debugTimer, setIntervalAsync) {
// Once per second
const limiter = new Bottleneck({
minTime: 1
});
/**
* Run a module and update pubsub
@@ -17,13 +20,13 @@ module.exports = function ($injector, GraphQLJSON, GraphQLLong, GraphQLUUID, pub
* @param {String} [options.filePath]
* @param {Number} [options.interval = 1000]
* @param {Number} [options.total = 1]
* @param {Boolean} [options.forever = false]
* @param {Object} [options.context = {}]
*/
const run = async (channel, mutation, {
node,
moduleToRun,
filePath,
forever = false
context = {}
}) => {
/**
* @type {Map}
@@ -49,18 +52,37 @@ module.exports = function ($injector, GraphQLJSON, GraphQLLong, GraphQLUUID, pub
try {
// Run module
let result = await (filePath ? $injector.resolvePath(filePath) : $injector.resolveModule(`module:${moduleToRun}`));
const result = await new Promise(async resolve => {
if (filePath) {
debugTimer(`run:${filePath}`);
const result = await $injector.resolvePath(filePath, {
context
});
// Run resolved function if it returns one.
if (typeof result === 'function') {
result = result();
}
return resolve(result);
}
debugTimer(`run:${moduleToRun}`);
const result = await $injector.resolveModule(`module:${moduleToRun}`, {
context
});
return resolve(result);
}).then(async possibleResult => {
// Await resolved function if it returns one.
if (typeof possibleResult === 'function') {
const result = await possibleResult();
return result;
}
return possibleResult;
});
if (filePath) {
const [pluginName, moduleName] = channel.split('/');
log.debug('Plugin:', pluginName, 'Module:', moduleName, 'Result:', result);
} else {
log.debug('Module:', channel, 'Result:', result);
log.debug('Module:', channel, 'Result:', result.json);
}
// Update pubsub channel
@@ -80,21 +102,7 @@ module.exports = function ($injector, GraphQLJSON, GraphQLLong, GraphQLUUID, pub
}
}
// Bail
if (!forever) {
return;
}
// Wait for CPU to chill
await sleep(1000);
// Re-run
await run(channel, mutation, {
node,
moduleToRun,
filePath,
forever
});
debugTimer(filePath ? `run:${filePath}` : `run:${moduleToRun}`);
};
// Send test message every 1 second for 10 seconds.
@@ -111,19 +119,22 @@ module.exports = function ($injector, GraphQLJSON, GraphQLLong, GraphQLUUID, pub
// console.log(`CHANNEL: ping DATA: ${JSON.stringify(rest, null, 2)}`);
// });
const processDisks = async () => {
await run('array', 'UPDATED', {
moduleToRun: 'get-array',
context: {}
});
};
pubsub.subscribe('disks', limiter.wrap(processDisks));
const {withFilter} = $injector.resolve('graphql-subscriptions');
const createBasicSubscription = (name, moduleToRun) => {
return {
subscribe: async () => {
run(name, 'UPDATED', {
moduleToRun,
forever: true
});
return pubsub.asyncIterator(name);
}
};
};
const createBasicSubscription = name => ({
subscribe: async () => {
return pubsub.asyncIterator(name);
}
});
// On Docker event update info with { apps: { installed, started } }
const updateIterator = async () => {
@@ -150,81 +161,23 @@ module.exports = function ($injector, GraphQLJSON, GraphQLLong, GraphQLUUID, pub
},
Subscription: {
apikeys: {
subscribe: () => {
// Not sure how we're going to secure this
return pubsub.asyncIterator('apikeys');
}
// Not sure how we're going to secure this
...createBasicSubscription('apikeys')
},
array: {
...createBasicSubscription('array', 'get-array')
...createBasicSubscription('array')
},
devices: {
...createBasicSubscription('devices', 'get-devices')
...createBasicSubscription('devices')
},
dockerContainers: {
...createBasicSubscription('docker/containers', 'docker/get-containers')
...createBasicSubscription('docker/containers')
},
dockerNetworks: {
...createBasicSubscription('docker/networks', 'docker/get-networks')
...createBasicSubscription('docker/networks')
},
info: {
subscribe: () => pubsub.asyncIterator('info')
// Close() {
// console.debug('Clearing info subscription timers');
// // Clear all info subscription timers
// Object.entries($injector._graph).filter(([ name ]) => {
// return name.startsWith('timer:info');
// }).map(([name, timer]) => {
// console.debug(`Clearing ${name} subscription timer`);
// clearInterval(timer);
// });
// }
// subscribe: async () => {
// const infoFields = [
// 'apps',
// 'cpu',
// 'devices',
// 'display',
// 'os',
// 'versions'
// ];
// const infoModules = infoFields.map(field => [field, $injector.resolveModule(`module:info/get-${field}`)]);
// let run = 0;
// return {
// async next() {
// // Await each field to get new value
// const values = fromEntries(await asyncMap(infoModules, async ([field, _module]) => {
// return [field, await _module.then(result => result.json)];
// }));
// const result = {
// value: {
// info: {
// mutation: 'UPDATED',
// node: {
// ...values
// }
// }
// },
// done: false
// };
// run = run + 1;
// console.log(`Run ${run}, ${Object.keys(values)}`);
// // Kill after 10
// if (run >= 10) {
// return { value: null, done: true };
// }
// return result;
// },
// [Symbol.asyncIterator]() {
// return this;
// }
// };
// }
...createBasicSubscription('info')
},
me: {
subscribe: withFilter(() => pubsub.asyncIterator('user'), (payload, _, context) => payload.user.node.id === context.user.id),
@@ -237,22 +190,31 @@ module.exports = function ($injector, GraphQLJSON, GraphQLLong, GraphQLUUID, pub
}
},
services: {
...createBasicSubscription('services', 'get-services')
// ...createBasicSubscription('services', 'get-services')
subscribe: () => {
// This needs to be fixed to run from events
setInterval(limiter.wrap(async () => {
await run('services', 'UPDATED', {
moduleToRun: 'get-services'
});
}), 100);
return pubsub.asyncIterator('services');
}
},
shares: {
...createBasicSubscription('shares', 'get-shares')
...createBasicSubscription('shares')
},
unassignedDevices: {
...createBasicSubscription('devices/unassigned', 'get-unassigned-devices')
...createBasicSubscription('devices/unassigned')
},
users: {
...createBasicSubscription('users', 'get-users')
...createBasicSubscription('users')
},
vars: {
...createBasicSubscription('vars', 'get-vars')
...createBasicSubscription('vars')
},
vms: {
...createBasicSubscription('vms/domains', 'vms/get-domains')
...createBasicSubscription('vms/domains')
},
pluginModule: {
subscribe: async (rootValue, directiveArgs) => {
@@ -270,10 +232,13 @@ module.exports = function ($injector, GraphQLJSON, GraphQLLong, GraphQLUUID, pub
const {filePath} = PluginManager.get(pluginName, pluginModuleName);
await run(name, 'UPDATED', {
filePath,
forever: true
// Ensure we start the run after we return
process.nextTick(() => {
run(name, 'UPDATED', {
filePath
});
});
return pubsub.asyncIterator(name);
}
}