From df3d350ba3a1ca9583ef154b9fb0ef51b204971a Mon Sep 17 00:00:00 2001 From: Alexis Tyler Date: Sun, 20 Oct 2019 18:00:18 +1030 Subject: [PATCH] refactor(module:schema/resolvers): switch to event based system --- app/graphql/schema/resolvers.js | 185 +++++++++++++------------------- 1 file changed, 75 insertions(+), 110 deletions(-) diff --git a/app/graphql/schema/resolvers.js b/app/graphql/schema/resolvers.js index b13fc5272..9f59d06bb 100644 --- a/app/graphql/schema/resolvers.js +++ b/app/graphql/schema/resolvers.js @@ -3,8 +3,11 @@ * Written by: Alexis Tyler */ -module.exports = function ($injector, GraphQLJSON, GraphQLLong, GraphQLUUID, pubsub, PluginManager, log, PluginError, dee) { - const sleep = ms => new Promise(resolve => setTimeout(resolve, ms)); +module.exports = function ($injector, GraphQLJSON, GraphQLLong, GraphQLUUID, pubsub, PluginManager, log, PluginError, dee, Bottleneck, sleep, debugTimer, setIntervalAsync) { + // Once per second + const limiter = new Bottleneck({ + minTime: 1 + }); /** * Run a module and update pubsub @@ -17,13 +20,13 @@ module.exports = function ($injector, GraphQLJSON, GraphQLLong, GraphQLUUID, pub * @param {String} [options.filePath] * @param {Number} [options.interval = 1000] * @param {Number} [options.total = 1] - * @param {Boolean} [options.forever = false] + * @param {Object} [options.context = {}] */ const run = async (channel, mutation, { node, moduleToRun, filePath, - forever = false + context = {} }) => { /** * @type {Map} @@ -49,18 +52,37 @@ module.exports = function ($injector, GraphQLJSON, GraphQLLong, GraphQLUUID, pub try { // Run module - let result = await (filePath ? $injector.resolvePath(filePath) : $injector.resolveModule(`module:${moduleToRun}`)); + const result = await new Promise(async resolve => { + if (filePath) { + debugTimer(`run:${filePath}`); + const result = await $injector.resolvePath(filePath, { + context + }); - // Run resolved function if it returns one. - if (typeof result === 'function') { - result = result(); - } + return resolve(result); + } + + debugTimer(`run:${moduleToRun}`); + const result = await $injector.resolveModule(`module:${moduleToRun}`, { + context + }); + + return resolve(result); + }).then(async possibleResult => { + // Await resolved function if it returns one. + if (typeof possibleResult === 'function') { + const result = await possibleResult(); + return result; + } + + return possibleResult; + }); if (filePath) { const [pluginName, moduleName] = channel.split('/'); log.debug('Plugin:', pluginName, 'Module:', moduleName, 'Result:', result); } else { - log.debug('Module:', channel, 'Result:', result); + log.debug('Module:', channel, 'Result:', result.json); } // Update pubsub channel @@ -80,21 +102,7 @@ module.exports = function ($injector, GraphQLJSON, GraphQLLong, GraphQLUUID, pub } } - // Bail - if (!forever) { - return; - } - - // Wait for CPU to chill - await sleep(1000); - - // Re-run - await run(channel, mutation, { - node, - moduleToRun, - filePath, - forever - }); + debugTimer(filePath ? `run:${filePath}` : `run:${moduleToRun}`); }; // Send test message every 1 second for 10 seconds. @@ -111,19 +119,22 @@ module.exports = function ($injector, GraphQLJSON, GraphQLLong, GraphQLUUID, pub // console.log(`CHANNEL: ping DATA: ${JSON.stringify(rest, null, 2)}`); // }); + const processDisks = async () => { + await run('array', 'UPDATED', { + moduleToRun: 'get-array', + context: {} + }); + }; + + pubsub.subscribe('disks', limiter.wrap(processDisks)); + const {withFilter} = $injector.resolve('graphql-subscriptions'); - const createBasicSubscription = (name, moduleToRun) => { - return { - subscribe: async () => { - run(name, 'UPDATED', { - moduleToRun, - forever: true - }); - return pubsub.asyncIterator(name); - } - }; - }; + const createBasicSubscription = name => ({ + subscribe: async () => { + return pubsub.asyncIterator(name); + } + }); // On Docker event update info with { apps: { installed, started } } const updateIterator = async () => { @@ -150,81 +161,23 @@ module.exports = function ($injector, GraphQLJSON, GraphQLLong, GraphQLUUID, pub }, Subscription: { apikeys: { - subscribe: () => { - // Not sure how we're going to secure this - return pubsub.asyncIterator('apikeys'); - } + // Not sure how we're going to secure this + ...createBasicSubscription('apikeys') }, array: { - ...createBasicSubscription('array', 'get-array') + ...createBasicSubscription('array') }, devices: { - ...createBasicSubscription('devices', 'get-devices') + ...createBasicSubscription('devices') }, dockerContainers: { - ...createBasicSubscription('docker/containers', 'docker/get-containers') + ...createBasicSubscription('docker/containers') }, dockerNetworks: { - ...createBasicSubscription('docker/networks', 'docker/get-networks') + ...createBasicSubscription('docker/networks') }, info: { - subscribe: () => pubsub.asyncIterator('info') - // Close() { - // console.debug('Clearing info subscription timers'); - - // // Clear all info subscription timers - // Object.entries($injector._graph).filter(([ name ]) => { - // return name.startsWith('timer:info'); - // }).map(([name, timer]) => { - // console.debug(`Clearing ${name} subscription timer`); - // clearInterval(timer); - // }); - // } - // subscribe: async () => { - // const infoFields = [ - // 'apps', - // 'cpu', - // 'devices', - // 'display', - // 'os', - // 'versions' - // ]; - - // const infoModules = infoFields.map(field => [field, $injector.resolveModule(`module:info/get-${field}`)]); - // let run = 0; - - // return { - // async next() { - // // Await each field to get new value - // const values = fromEntries(await asyncMap(infoModules, async ([field, _module]) => { - // return [field, await _module.then(result => result.json)]; - // })); - - // const result = { - // value: { - // info: { - // mutation: 'UPDATED', - // node: { - // ...values - // } - // } - // }, - // done: false - // }; - - // run = run + 1; - // console.log(`Run ${run}, ${Object.keys(values)}`); - // // Kill after 10 - // if (run >= 10) { - // return { value: null, done: true }; - // } - // return result; - // }, - // [Symbol.asyncIterator]() { - // return this; - // } - // }; - // } + ...createBasicSubscription('info') }, me: { subscribe: withFilter(() => pubsub.asyncIterator('user'), (payload, _, context) => payload.user.node.id === context.user.id), @@ -237,22 +190,31 @@ module.exports = function ($injector, GraphQLJSON, GraphQLLong, GraphQLUUID, pub } }, services: { - ...createBasicSubscription('services', 'get-services') + // ...createBasicSubscription('services', 'get-services') + subscribe: () => { + // This needs to be fixed to run from events + setInterval(limiter.wrap(async () => { + await run('services', 'UPDATED', { + moduleToRun: 'get-services' + }); + }), 100); + return pubsub.asyncIterator('services'); + } }, shares: { - ...createBasicSubscription('shares', 'get-shares') + ...createBasicSubscription('shares') }, unassignedDevices: { - ...createBasicSubscription('devices/unassigned', 'get-unassigned-devices') + ...createBasicSubscription('devices/unassigned') }, users: { - ...createBasicSubscription('users', 'get-users') + ...createBasicSubscription('users') }, vars: { - ...createBasicSubscription('vars', 'get-vars') + ...createBasicSubscription('vars') }, vms: { - ...createBasicSubscription('vms/domains', 'vms/get-domains') + ...createBasicSubscription('vms/domains') }, pluginModule: { subscribe: async (rootValue, directiveArgs) => { @@ -270,10 +232,13 @@ module.exports = function ($injector, GraphQLJSON, GraphQLLong, GraphQLUUID, pub const {filePath} = PluginManager.get(pluginName, pluginModuleName); - await run(name, 'UPDATED', { - filePath, - forever: true + // Ensure we start the run after we return + process.nextTick(() => { + run(name, 'UPDATED', { + filePath + }); }); + return pubsub.asyncIterator(name); } }