diff --git a/crates/assets/js/client/src/record_api.ts b/crates/assets/js/client/src/record_api.ts index 2926d7ca..fbb36009 100644 --- a/crates/assets/js/client/src/record_api.ts +++ b/crates/assets/js/client/src/record_api.ts @@ -74,11 +74,20 @@ export type RecordId = string | number; export const ChangeEventStatusUnknown = 0 as const; export const ChangeEventStatusForbidden = 1 as const; export const ChangeEventStatusLoss = 2 as const; + export type ChangeEventStatus = | typeof ChangeEventStatusUnknown | typeof ChangeEventStatusForbidden | typeof ChangeEventStatusLoss; +export type ChangeErrorEvent = { + seq?: number; + Error: { + status: ChangeEventStatus; + message?: string; + }; +}; + export type ChangeEvent = | { seq?: number; @@ -92,13 +101,7 @@ export type ChangeEvent = seq?: number; Delete: object; } - | { - seq?: number; - Error: { - status: ChangeEventStatus; - message?: string; - }; - }; + | ChangeErrorEvent; // Re-export type publicly as `Event`. We cannot use `Event` to prevent rollup // from renaming to `Event_2` to avoid a possible collision with the DOM @@ -313,6 +316,10 @@ export class ListOperation< } export interface SubscribeOpts { + onLoss?: () => void; +} + +export interface SubscribeFilterOpts { filters?: FilterOrComposite[]; } @@ -339,8 +346,13 @@ export interface RecordApi> { delete(id: RecordId): Promise; deleteOp(id: RecordId): DeleteOperation; - subscribe(id: RecordId): Promise>; - subscribeAll(opts?: SubscribeOpts): Promise>; + subscribe( + id: RecordId, + opts?: SubscribeOpts, + ): Promise>; + subscribeAll( + opts?: SubscribeOpts & SubscribeFilterOpts, + ): Promise>; } /// Provides CRUD access to records through TrailBase's record API. @@ -421,19 +433,22 @@ export class RecordApiImpl< return new DeleteOperation(this.client, this.name, id); } - public async subscribe(id: RecordId): Promise> { - return await this.subscribeImpl(id); + public async subscribe( + id: RecordId, + opts?: SubscribeOpts, + ): Promise> { + return await this.subscribeImpl(id, opts); } public async subscribeAll( - opts?: SubscribeOpts, + opts?: SubscribeOpts & SubscribeFilterOpts, ): Promise> { return await this.subscribeImpl("*", opts); } private async subscribeImpl( id: RecordId, - opts?: SubscribeOpts, + opts?: SubscribeOpts & SubscribeFilterOpts, ): Promise> { const params = new URLSearchParams(); const filters = opts?.filters ?? []; @@ -457,9 +472,35 @@ export class RecordApiImpl< const transformStream = new TransformStream({ transform(chunk: Uint8Array, controller) { const messages = decoder.decode(chunk).trimEnd().split("\n\n"); + const onLoss = opts?.onLoss; + + let prevSeq: number | undefined; for (const msg of messages) { if (msg.startsWith("data: ")) { - controller.enqueue(parseChangeEvent(msg)); + const ev = parseChangeEvent(msg); + + if (onLoss !== undefined) { + // Check for losses between client and TrailBase server, e.g. unreliable network connection. + const seq = ev.seq; + if ( + prevSeq !== undefined && + seq !== undefined && + prevSeq + 1 !== seq + ) { + onLoss(); + } + prevSeq = seq; + + // Check for server-side losses, e.g. buffer limits exceeded. + const err = asError(ev); + if (err !== undefined) { + if (err.Error.status === ChangeEventStatusLoss) { + onLoss(); + } + } + } + + controller.enqueue(ev); } } }, @@ -473,7 +514,7 @@ export class RecordApiImpl< async subscribeWs( id: RecordId, - opts?: SubscribeOpts, + opts?: SubscribeOpts & SubscribeFilterOpts, ): Promise> { const params = new URLSearchParams(); params.append("ws", "true"); @@ -581,11 +622,21 @@ function parseChangeEvent(message: string): ChangeEvent { return parseJSON(message.substring(6)) as ChangeEvent; } +function asError(ev: ChangeEvent): ChangeErrorEvent | undefined { + if ("Error" in ev) { + return ev as ChangeErrorEvent; + } +} + const recordApiBasePath = "/api/records/v1"; export const exportedForTesting = isDev ? { - subscribeWs: (api: RecordApiImpl, id: RecordId) => api.subscribeWs(id), + subscribeWs: ( + api: RecordApiImpl, + id: RecordId, + opts?: SubscribeOpts & SubscribeFilterOpts, + ) => api.subscribeWs(id, opts), parseChangeEvent, } : undefined;