fix: ensure we're using the websocketId for the subcriptions count check

This commit is contained in:
Alexis Tyler
2020-01-26 14:45:05 +10:30
parent a10bedfa3b
commit 9e5207c12d
3 changed files with 30 additions and 55 deletions

View File

@@ -298,17 +298,18 @@ export const graphql = {
types,
resolvers,
subscriptions: {
onConnect: connectionParams => {
onConnect: (connectionParams, websocket) => {
const apiKey = connectionParams['x-api-key'];
ensureApiKey(apiKey);
const user = usersState.findOne({apiKey}) || { name: 'guest', apiKey, role: 'guest' };
const websocketId = websocket.upgradeReq.headers['sec-websocket-key'];
log.info(`<ws> ${user.name} connected.`);
log.info(`<ws> ${user.name}[${websocketId}] connected.`);
// Update ws connection count and other needed values
wsHasConnected();
wsHasConnected(websocketId);
return {
user
@@ -316,7 +317,7 @@ export const graphql = {
},
onDisconnect: async (_, websocketContext) => {
const { user, websocketId } = await websocketContext.initPromise;
log.info(`<ws> ${user.name} disconnected.`);
log.info(`<ws> ${user.name}[${websocketId}] disconnected.`);
// Update ws connection count and other needed values
wsHasDisconnected(websocketId);

View File

@@ -85,7 +85,7 @@ const createSubscription = (channel, resource?) => ({
possession: 'any'
});
hasSubscribedToChannel(context.user, channel);
hasSubscribedToChannel(context.websocketId, channel);
return pubsub.asyncIterator(channel);
}
});
@@ -118,7 +118,7 @@ export const resolvers = {
ping: {
// subscribe: (_, __, context) => {
// // startPing();
// hasSubscribedToChannel(context.user, 'ping');
// hasSubscribedToChannel(context.websocketId, 'ping');
// return pubsub.asyncIterator('ping');
// }
},
@@ -156,7 +156,7 @@ export const resolvers = {
// It's up to the plugin to publish new data as needed
// so we'll just return the Iterator
hasSubscribedToChannel(context.user, channel);
hasSubscribedToChannel(context.websocketId, channel);
return pubsub.asyncIterator(channel);
}
}

View File

@@ -2,60 +2,37 @@ import core from '@unraid/core';
const { log } = core;
let connectionCount = 0;
const channelSubscriptions = {};
const wsSubscriptions = {};
interface subscription {
total: number
channels: string[]
}
const subscriptions: {
[key: string]: subscription
} = {};
/**
* Return current ws connection count.
*/
export const getWsConectionCount = () => connectionCount;
export const getWsConectionCount = () => {
return Object.values(subscriptions).filter(subscription => subscription.total >= 1).length;
};
/**
* Return current ws connection count in channel.
*/
export const getWsConectionCountInChannel = (channel: string) => {
return channelSubscriptions[channel].total;
};
/**
* Increase ws connection count by 1.
*/
export const increaseWsConectionCount = () => {
connectionCount++;
return connectionCount;
};
/**
* Decrease ws connection count by 1.
*/
export const decreaseWsConectionCount = () => {
connectionCount--;
return connectionCount;
return Object.values(subscriptions).filter(subscription => subscription.channels.includes(channel)).length;
};
export const hasSubscribedToChannel = (id: string, channel: string) => {
// Total ws connections per channel
if (!channelSubscriptions[channel]) {
channelSubscriptions[channel] = {
total: 0
};
}
channelSubscriptions[channel].total++;
// All subscriptions for this websocket
if (!wsSubscriptions[id]) {
wsSubscriptions[id] = [channel];
}
wsSubscriptions[id] = [
...wsSubscriptions[id],
channel
];
subscriptions[id].total++;
subscriptions[id].channels.push(channel);
};
export const hasUnsubscribedFromChannel = (id: string, channel: string) => {
channelSubscriptions[channel].total--;
wsSubscriptions[id] = wsSubscriptions[id].filter(existingChannel => existingChannel !== channel);
subscriptions[id].total--;
subscriptions[id].channels = subscriptions[id].channels.filter(existingChannel => existingChannel !== channel);
};
/**
@@ -63,8 +40,9 @@ export const hasUnsubscribedFromChannel = (id: string, channel: string) => {
*
* @param ws
*/
export const wsHasConnected = () => {
increaseWsConectionCount();
export const wsHasConnected = (id: string) => {
subscriptions[id].total = 0;
subscriptions[id].channels = [];
};
/**
@@ -73,12 +51,8 @@ export const wsHasConnected = () => {
* @param ws
*/
export const wsHasDisconnected = (id: string) => {
decreaseWsConectionCount();
// Update the total for each channel
wsSubscriptions[id].forEach((channel: string) => {
hasUnsubscribedFromChannel(id, channel);
});
subscriptions[id].total = 0;
subscriptions[id].channels = [];
};
// Only allows function to publish to pubsub when clients are online and are connected to the specific channel
@@ -96,6 +70,6 @@ export const canPublishToChannel = (channel: string) => {
}
const plural = channelConnectionCount !== 1;
log.debug(`Allowing publish to "${channel}" as there ${plural ? 'are' : 'is'} ${channelConnectionCount} connection ${plural ? 's' : ''} in that channel.`);
log.debug(`Allowing publish to "${channel}" as there ${plural ? 'are' : 'is'} ${channelConnectionCount} connection${plural ? 's' : ''} in that channel.`);
return true;
};