fix(connect): fatal race-condition in websocket disposal (#1462)

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **Bug Fixes**
* Improved error handling during client shutdown to prevent unhandled
exceptions and ensure smoother cleanup.
* Optimized identity state change detection to avoid redundant event
emissions, reducing unnecessary updates when no actual changes occur.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---
- To see the specific tasks where the Asana app for GitHub is being
used, see below:
  - https://app.asana.com/0/0/1210701591479200
This commit is contained in:
Pujit Mehrotra
2025-07-03 13:47:48 -04:00
committed by GitHub
parent a2807864ac
commit 0ec0de982f
2 changed files with 28 additions and 9 deletions

View File

@@ -3,10 +3,11 @@ import { ConfigService } from '@nestjs/config';
import { EventEmitter2 } from '@nestjs/event-emitter';
import type { OutgoingHttpHeaders } from 'node:http2';
import { isEqual } from 'lodash-es';
import { Subscription } from 'rxjs';
import { bufferTime, filter } from 'rxjs/operators';
import { debounceTime, filter } from 'rxjs/operators';
import { ConnectionMetadata, MinigraphStatus, MyServersConfig } from '../config/connect.config.js';
import { ConnectionMetadata, MinigraphStatus } from '../config/connect.config.js';
import { EVENTS } from '../helper/nest-tokens.js';
interface MothershipWebsocketHeaders extends OutgoingHttpHeaders {
@@ -58,6 +59,7 @@ export class MothershipConnectionService implements OnModuleInit, OnModuleDestro
};
private identitySubscription: Subscription | null = null;
private lastIdentity: Partial<IdentityState> | null = null;
private metadataChangedSubscription: Subscription | null = null;
constructor(
@@ -83,12 +85,22 @@ export class MothershipConnectionService implements OnModuleInit, OnModuleDestro
this.identitySubscription = this.configService.changes$
.pipe(
filter((change) => Object.values(this.configKeys).includes(change.path)),
bufferTime(25)
// debouncing is necessary here (instead of buffering/batching) to prevent excess emissions
// because the store.* config values will change frequently upon api boot
debounceTime(25)
)
.subscribe({
next: () => {
const { state } = this.getIdentityState();
if (isEqual(state, this.lastIdentity)) {
this.logger.debug('Identity unchanged; skipping event emission');
return;
}
this.lastIdentity = structuredClone(state);
const success = this.eventEmitter.emit(EVENTS.IDENTITY_CHANGED);
if (!success) {
if (success) {
this.logger.debug('Emitted IDENTITY_CHANGED event');
} else {
this.logger.warn('Failed to emit IDENTITY_CHANGED event');
}
},

View File

@@ -116,16 +116,23 @@ export class MothershipGraphqlClientService implements OnModuleInit, OnModuleDes
*/
async clearInstance(): Promise<void> {
if (this.apolloClient) {
await this.apolloClient.clearStore();
// some race condition causes apolloClient to be null here upon api shutdown?
this.apolloClient?.stop();
try {
await this.apolloClient.clearStore();
// some race condition causes apolloClient to be null here upon api shutdown?
this.apolloClient?.stop();
} catch (error) {
this.logger.warn(error, 'Error clearing apolloClient');
}
this.apolloClient = null;
}
if (this.wsClient) {
this.clearClientEventHandlers();
this.wsClient.terminate();
await this.wsClient.dispose();
try {
await this.wsClient.dispose();
} catch (error) {
this.logger.warn(error, 'Error disposing of wsClient');
}
this.wsClient = null;
}