TS client: add an onLoss callback option to trigger some response to event loss.

This commit is contained in:
Sebastian Jeltsch
2026-04-07 11:24:21 +02:00
parent f044969298
commit 30d1920ce5
+67 -16
View File
@@ -74,11 +74,20 @@ export type RecordId = string | number;
export const ChangeEventStatusUnknown = 0 as const;
export const ChangeEventStatusForbidden = 1 as const;
export const ChangeEventStatusLoss = 2 as const;
export type ChangeEventStatus =
| typeof ChangeEventStatusUnknown
| typeof ChangeEventStatusForbidden
| typeof ChangeEventStatusLoss;
export type ChangeErrorEvent = {
seq?: number;
Error: {
status: ChangeEventStatus;
message?: string;
};
};
export type ChangeEvent =
| {
seq?: number;
@@ -92,13 +101,7 @@ export type ChangeEvent =
seq?: number;
Delete: object;
}
| {
seq?: number;
Error: {
status: ChangeEventStatus;
message?: string;
};
};
| ChangeErrorEvent;
// Re-export type publicly as `Event`. We cannot use `Event` to prevent rollup
// from renaming to `Event_2` to avoid a possible collision with the DOM
@@ -313,6 +316,10 @@ export class ListOperation<
}
export interface SubscribeOpts {
onLoss?: () => void;
}
export interface SubscribeFilterOpts {
filters?: FilterOrComposite[];
}
@@ -339,8 +346,13 @@ export interface RecordApi<T = Record<string, unknown>> {
delete(id: RecordId): Promise<void>;
deleteOp(id: RecordId): DeleteOperation;
subscribe(id: RecordId): Promise<ReadableStream<ChangeEvent>>;
subscribeAll(opts?: SubscribeOpts): Promise<ReadableStream<ChangeEvent>>;
subscribe(
id: RecordId,
opts?: SubscribeOpts,
): Promise<ReadableStream<ChangeEvent>>;
subscribeAll(
opts?: SubscribeOpts & SubscribeFilterOpts,
): Promise<ReadableStream<ChangeEvent>>;
}
/// Provides CRUD access to records through TrailBase's record API.
@@ -421,19 +433,22 @@ export class RecordApiImpl<
return new DeleteOperation(this.client, this.name, id);
}
public async subscribe(id: RecordId): Promise<ReadableStream<ChangeEvent>> {
return await this.subscribeImpl(id);
public async subscribe(
id: RecordId,
opts?: SubscribeOpts,
): Promise<ReadableStream<ChangeEvent>> {
return await this.subscribeImpl(id, opts);
}
public async subscribeAll(
opts?: SubscribeOpts,
opts?: SubscribeOpts & SubscribeFilterOpts,
): Promise<ReadableStream<ChangeEvent>> {
return await this.subscribeImpl("*", opts);
}
private async subscribeImpl(
id: RecordId,
opts?: SubscribeOpts,
opts?: SubscribeOpts & SubscribeFilterOpts,
): Promise<ReadableStream<ChangeEvent>> {
const params = new URLSearchParams();
const filters = opts?.filters ?? [];
@@ -457,9 +472,35 @@ export class RecordApiImpl<
const transformStream = new TransformStream<Uint8Array, ChangeEvent>({
transform(chunk: Uint8Array, controller) {
const messages = decoder.decode(chunk).trimEnd().split("\n\n");
const onLoss = opts?.onLoss;
let prevSeq: number | undefined;
for (const msg of messages) {
if (msg.startsWith("data: ")) {
controller.enqueue(parseChangeEvent(msg));
const ev = parseChangeEvent(msg);
if (onLoss !== undefined) {
// Check for losses between client and TrailBase server, e.g. unreliable network connection.
const seq = ev.seq;
if (
prevSeq !== undefined &&
seq !== undefined &&
prevSeq + 1 !== seq
) {
onLoss();
}
prevSeq = seq;
// Check for server-side losses, e.g. buffer limits exceeded.
const err = asError(ev);
if (err !== undefined) {
if (err.Error.status === ChangeEventStatusLoss) {
onLoss();
}
}
}
controller.enqueue(ev);
}
}
},
@@ -473,7 +514,7 @@ export class RecordApiImpl<
async subscribeWs(
id: RecordId,
opts?: SubscribeOpts,
opts?: SubscribeOpts & SubscribeFilterOpts,
): Promise<ReadableStream<ChangeEvent>> {
const params = new URLSearchParams();
params.append("ws", "true");
@@ -581,11 +622,21 @@ function parseChangeEvent(message: string): ChangeEvent {
return parseJSON(message.substring(6)) as ChangeEvent;
}
function asError(ev: ChangeEvent): ChangeErrorEvent | undefined {
if ("Error" in ev) {
return ev as ChangeErrorEvent;
}
}
const recordApiBasePath = "/api/records/v1";
export const exportedForTesting = isDev
? {
subscribeWs: (api: RecordApiImpl, id: RecordId) => api.subscribeWs(id),
subscribeWs: (
api: RecordApiImpl,
id: RecordId,
opts?: SubscribeOpts & SubscribeFilterOpts,
) => api.subscribeWs(id, opts),
parseChangeEvent,
}
: undefined;