From 9e5207c12d97026475f591bf04bf397d2793a23c Mon Sep 17 00:00:00 2001 From: Alexis Tyler Date: Sun, 26 Jan 2020 14:45:05 +1030 Subject: [PATCH] fix: ensure we're using the websocketId for the subcriptions count check --- src/graphql/index.ts | 9 +++-- src/graphql/schema/resolvers.ts | 6 +-- src/ws.ts | 70 +++++++++++---------------------- 3 files changed, 30 insertions(+), 55 deletions(-) diff --git a/src/graphql/index.ts b/src/graphql/index.ts index 19ee551f2..44af36509 100644 --- a/src/graphql/index.ts +++ b/src/graphql/index.ts @@ -298,17 +298,18 @@ export const graphql = { types, resolvers, subscriptions: { - onConnect: connectionParams => { + onConnect: (connectionParams, websocket) => { const apiKey = connectionParams['x-api-key']; ensureApiKey(apiKey); const user = usersState.findOne({apiKey}) || { name: 'guest', apiKey, role: 'guest' }; + const websocketId = websocket.upgradeReq.headers['sec-websocket-key']; - log.info(` ${user.name} connected.`); + log.info(` ${user.name}[${websocketId}] connected.`); // Update ws connection count and other needed values - wsHasConnected(); + wsHasConnected(websocketId); return { user @@ -316,7 +317,7 @@ export const graphql = { }, onDisconnect: async (_, websocketContext) => { const { user, websocketId } = await websocketContext.initPromise; - log.info(` ${user.name} disconnected.`); + log.info(` ${user.name}[${websocketId}] disconnected.`); // Update ws connection count and other needed values wsHasDisconnected(websocketId); diff --git a/src/graphql/schema/resolvers.ts b/src/graphql/schema/resolvers.ts index b04c280b9..8c2130df5 100644 --- a/src/graphql/schema/resolvers.ts +++ b/src/graphql/schema/resolvers.ts @@ -85,7 +85,7 @@ const createSubscription = (channel, resource?) => ({ possession: 'any' }); - hasSubscribedToChannel(context.user, channel); + hasSubscribedToChannel(context.websocketId, channel); return pubsub.asyncIterator(channel); } }); @@ -118,7 +118,7 @@ export const resolvers = { ping: { // subscribe: (_, __, context) => { // // startPing(); - // hasSubscribedToChannel(context.user, 'ping'); + // hasSubscribedToChannel(context.websocketId, 'ping'); // return pubsub.asyncIterator('ping'); // } }, @@ -156,7 +156,7 @@ export const resolvers = { // It's up to the plugin to publish new data as needed // so we'll just return the Iterator - hasSubscribedToChannel(context.user, channel); + hasSubscribedToChannel(context.websocketId, channel); return pubsub.asyncIterator(channel); } } diff --git a/src/ws.ts b/src/ws.ts index 800f0e6b8..6f19a4f53 100644 --- a/src/ws.ts +++ b/src/ws.ts @@ -2,60 +2,37 @@ import core from '@unraid/core'; const { log } = core; -let connectionCount = 0; -const channelSubscriptions = {}; -const wsSubscriptions = {}; +interface subscription { + total: number + channels: string[] +} + +const subscriptions: { + [key: string]: subscription +} = {}; /** * Return current ws connection count. */ -export const getWsConectionCount = () => connectionCount; +export const getWsConectionCount = () => { + return Object.values(subscriptions).filter(subscription => subscription.total >= 1).length; +}; /** * Return current ws connection count in channel. */ export const getWsConectionCountInChannel = (channel: string) => { - return channelSubscriptions[channel].total; -}; - -/** - * Increase ws connection count by 1. - */ -export const increaseWsConectionCount = () => { - connectionCount++; - return connectionCount; -}; - -/** - * Decrease ws connection count by 1. - */ -export const decreaseWsConectionCount = () => { - connectionCount--; - return connectionCount; + return Object.values(subscriptions).filter(subscription => subscription.channels.includes(channel)).length; }; export const hasSubscribedToChannel = (id: string, channel: string) => { - // Total ws connections per channel - if (!channelSubscriptions[channel]) { - channelSubscriptions[channel] = { - total: 0 - }; - } - channelSubscriptions[channel].total++; - - // All subscriptions for this websocket - if (!wsSubscriptions[id]) { - wsSubscriptions[id] = [channel]; - } - wsSubscriptions[id] = [ - ...wsSubscriptions[id], - channel - ]; + subscriptions[id].total++; + subscriptions[id].channels.push(channel); }; export const hasUnsubscribedFromChannel = (id: string, channel: string) => { - channelSubscriptions[channel].total--; - wsSubscriptions[id] = wsSubscriptions[id].filter(existingChannel => existingChannel !== channel); + subscriptions[id].total--; + subscriptions[id].channels = subscriptions[id].channels.filter(existingChannel => existingChannel !== channel); }; /** @@ -63,8 +40,9 @@ export const hasUnsubscribedFromChannel = (id: string, channel: string) => { * * @param ws */ -export const wsHasConnected = () => { - increaseWsConectionCount(); +export const wsHasConnected = (id: string) => { + subscriptions[id].total = 0; + subscriptions[id].channels = []; }; /** @@ -73,12 +51,8 @@ export const wsHasConnected = () => { * @param ws */ export const wsHasDisconnected = (id: string) => { - decreaseWsConectionCount(); - - // Update the total for each channel - wsSubscriptions[id].forEach((channel: string) => { - hasUnsubscribedFromChannel(id, channel); - }); + subscriptions[id].total = 0; + subscriptions[id].channels = []; }; // Only allows function to publish to pubsub when clients are online and are connected to the specific channel @@ -96,6 +70,6 @@ export const canPublishToChannel = (channel: string) => { } const plural = channelConnectionCount !== 1; - log.debug(`Allowing publish to "${channel}" as there ${plural ? 'are' : 'is'} ${channelConnectionCount} connection ${plural ? 's' : ''} in that channel.`); + log.debug(`Allowing publish to "${channel}" as there ${plural ? 'are' : 'is'} ${channelConnectionCount} connection${plural ? 's' : ''} in that channel.`); return true; }; \ No newline at end of file