fix(connect): fatal race-condition in websocket disposal (#1462)

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **Bug Fixes**
* Improved error handling during client shutdown to prevent unhandled
exceptions and ensure smoother cleanup.
* Optimized identity state change detection to avoid redundant event
emissions, reducing unnecessary updates when no actual changes occur.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---
- To see the specific tasks where the Asana app for GitHub is being
used, see below:
  - https://app.asana.com/0/0/1210701591479200
This commit is contained in:
Pujit Mehrotra
2025-07-03 13:47:48 -04:00
committed by GitHub
parent a2807864ac
commit 0ec0de982f
2 changed files with 28 additions and 9 deletions

View File

@@ -3,10 +3,11 @@ import { ConfigService } from '@nestjs/config';
import { EventEmitter2 } from '@nestjs/event-emitter'; import { EventEmitter2 } from '@nestjs/event-emitter';
import type { OutgoingHttpHeaders } from 'node:http2'; import type { OutgoingHttpHeaders } from 'node:http2';
import { isEqual } from 'lodash-es';
import { Subscription } from 'rxjs'; import { Subscription } from 'rxjs';
import { bufferTime, filter } from 'rxjs/operators'; import { debounceTime, filter } from 'rxjs/operators';
import { ConnectionMetadata, MinigraphStatus, MyServersConfig } from '../config/connect.config.js'; import { ConnectionMetadata, MinigraphStatus } from '../config/connect.config.js';
import { EVENTS } from '../helper/nest-tokens.js'; import { EVENTS } from '../helper/nest-tokens.js';
interface MothershipWebsocketHeaders extends OutgoingHttpHeaders { interface MothershipWebsocketHeaders extends OutgoingHttpHeaders {
@@ -58,6 +59,7 @@ export class MothershipConnectionService implements OnModuleInit, OnModuleDestro
}; };
private identitySubscription: Subscription | null = null; private identitySubscription: Subscription | null = null;
private lastIdentity: Partial<IdentityState> | null = null;
private metadataChangedSubscription: Subscription | null = null; private metadataChangedSubscription: Subscription | null = null;
constructor( constructor(
@@ -83,12 +85,22 @@ export class MothershipConnectionService implements OnModuleInit, OnModuleDestro
this.identitySubscription = this.configService.changes$ this.identitySubscription = this.configService.changes$
.pipe( .pipe(
filter((change) => Object.values(this.configKeys).includes(change.path)), filter((change) => Object.values(this.configKeys).includes(change.path)),
bufferTime(25) // debouncing is necessary here (instead of buffering/batching) to prevent excess emissions
// because the store.* config values will change frequently upon api boot
debounceTime(25)
) )
.subscribe({ .subscribe({
next: () => { next: () => {
const { state } = this.getIdentityState();
if (isEqual(state, this.lastIdentity)) {
this.logger.debug('Identity unchanged; skipping event emission');
return;
}
this.lastIdentity = structuredClone(state);
const success = this.eventEmitter.emit(EVENTS.IDENTITY_CHANGED); const success = this.eventEmitter.emit(EVENTS.IDENTITY_CHANGED);
if (!success) { if (success) {
this.logger.debug('Emitted IDENTITY_CHANGED event');
} else {
this.logger.warn('Failed to emit IDENTITY_CHANGED event'); this.logger.warn('Failed to emit IDENTITY_CHANGED event');
} }
}, },

View File

@@ -116,16 +116,23 @@ export class MothershipGraphqlClientService implements OnModuleInit, OnModuleDes
*/ */
async clearInstance(): Promise<void> { async clearInstance(): Promise<void> {
if (this.apolloClient) { if (this.apolloClient) {
await this.apolloClient.clearStore(); try {
// some race condition causes apolloClient to be null here upon api shutdown? await this.apolloClient.clearStore();
this.apolloClient?.stop(); // some race condition causes apolloClient to be null here upon api shutdown?
this.apolloClient?.stop();
} catch (error) {
this.logger.warn(error, 'Error clearing apolloClient');
}
this.apolloClient = null; this.apolloClient = null;
} }
if (this.wsClient) { if (this.wsClient) {
this.clearClientEventHandlers(); this.clearClientEventHandlers();
this.wsClient.terminate(); try {
await this.wsClient.dispose(); await this.wsClient.dispose();
} catch (error) {
this.logger.warn(error, 'Error disposing of wsClient');
}
this.wsClient = null; this.wsClient = null;
} }