Add non-transaction bulk support and expose bulk/transaction API as client.execute on JS/TS client.

This commit is contained in:
Bilux
2025-08-23 09:20:47 +00:00
committed by Sebastian Jeltsch
parent f317434e01
commit 698e38d8ae
4 changed files with 381 additions and 131 deletions
+1
View File
@@ -3,6 +3,7 @@ email {}
server {
application_name: "TrailBase"
logs_retention_sec: 604800
enable_record_transactions: true
}
auth {
oauth_providers: [{
+260 -90
View File
@@ -186,54 +186,167 @@ export type Or = {
export type FilterOrComposite = Filter | And | Or;
export interface RecordApi<T = Record<string, unknown>> {
list(opts?: {
pagination?: Pagination;
order?: string[];
filters?: FilterOrComposite[];
count?: boolean;
expand?: string[];
}): Promise<ListResponse<T>>;
export type RecordId = string | number;
read(
id: string | number,
opt?: {
expand?: string[];
},
): Promise<T>;
create(record: T): Promise<string | number>;
createBulk(records: T[]): Promise<(string | number)[]>;
update(id: string | number, record: Partial<T>): Promise<void>;
delete(id: string | number): Promise<void>;
subscribe(id: string | number): Promise<ReadableStream<Event>>;
// TODO: Use `ts-rs` generated types.
interface CreateOp {
Create: {
api_name: string;
value: Record<string, unknown>;
};
}
/// Provides CRUD access to records through TrailBase's record API.
export class RecordApiImpl<T = Record<string, unknown>>
implements RecordApi<T>
{
private readonly _path: string;
interface UpdateOp {
Update: {
api_name: string;
record_id: RecordId;
value: Record<string, unknown>;
};
}
interface DeleteOp {
Delete: {
api_name: string;
record_id: RecordId;
};
}
export interface DeferredOperation<ResponseType> {
query(): Promise<ResponseType>;
}
// eslint-disable-next-line @typescript-eslint/no-empty-object-type
export interface DeferredMutation<ResponseType>
extends DeferredOperation<ResponseType> {}
export class CreateOperation<T = Record<string, unknown>>
implements DeferredMutation<RecordId>
{
constructor(
private readonly client: Client,
private readonly name: string,
) {
this._path = `${recordApiBasePath}/${this.name}`;
private readonly apiName: string,
private readonly record: Partial<T>,
) {}
async query(): Promise<RecordId> {
const response = await this.client.fetch(
`${recordApiBasePath}/${this.apiName}`,
{
method: "POST",
body: JSON.stringify(this.record),
headers: jsonContentTypeHeader,
},
);
return (await response.json()).ids[0];
}
public async list<T = Record<string, unknown>>(opts?: {
pagination?: Pagination;
order?: string[];
filters?: FilterOrComposite[];
count?: boolean;
expand?: string[];
}): Promise<ListResponse<T>> {
protected toJSON(): CreateOp {
return {
Create: {
api_name: this.apiName,
value: this.record,
},
};
}
}
export class UpdateOperation<T = Record<string, unknown>>
implements DeferredMutation<void>
{
constructor(
private readonly client: Client,
private readonly apiName: string,
private readonly id: RecordId,
private readonly record: Partial<T>,
) {}
async query(): Promise<void> {
await this.client.fetch(`${recordApiBasePath}/${this.apiName}/${this.id}`, {
method: "PATCH",
body: JSON.stringify(this.record),
headers: jsonContentTypeHeader,
});
}
protected toJSON(): UpdateOp {
return {
Update: {
api_name: this.apiName,
record_id: this.id,
value: this.record,
},
};
}
}
export class DeleteOperation implements DeferredMutation<void> {
constructor(
private readonly client: Client,
private readonly apiName: string,
private readonly id: RecordId,
) {}
async query(): Promise<void> {
await this.client.fetch(`${recordApiBasePath}/${this.apiName}/${this.id}`, {
method: "DELETE",
});
}
protected toJSON(): DeleteOp {
return {
Delete: {
api_name: this.apiName,
record_id: this.id,
},
};
}
}
export interface ReadOpts {
expand?: string[];
}
export class ReadOperation<T = Record<string, unknown>>
implements DeferredOperation<T>
{
constructor(
private readonly client: Client,
private readonly apiName: string,
private readonly id: RecordId,
private readonly opt?: ReadOpts,
) {}
async query(): Promise<T> {
const expand = this.opt?.expand;
const response = await this.client.fetch(
expand
? `${recordApiBasePath}/${this.apiName}/${this.id}?expand=${expand.join(",")}`
: `${recordApiBasePath}/${this.apiName}/${this.id}`,
);
return (await response.json()) as T;
}
}
export interface ListOpts {
pagination?: Pagination;
order?: string[];
filters?: FilterOrComposite[];
count?: boolean;
expand?: string[];
}
export class ListOperation<T = Record<string, unknown>>
implements DeferredOperation<ListResponse<T>>
{
constructor(
private readonly client: Client,
private readonly apiName: string,
private readonly opts?: ListOpts,
) {}
async query(): Promise<ListResponse<T>> {
const params = new URLSearchParams();
const pagination = opts?.pagination;
const pagination = this.opts?.pagination;
if (pagination) {
const cursor = pagination.cursor;
if (cursor) params.append("cursor", cursor);
@@ -244,12 +357,12 @@ export class RecordApiImpl<T = Record<string, unknown>>
const offset = pagination.offset;
if (offset) params.append("offset", offset.toString());
}
const order = opts?.order;
const order = this.opts?.order;
if (order) params.append("order", order.join(","));
if (opts?.count) params.append("count", "true");
if (this.opts?.count) params.append("count", "true");
const expand = opts?.expand;
const expand = this.opts?.expand;
if (expand) params.append("expand", expand.join(","));
function traverseFilters(path: string, filter: FilterOrComposite) {
@@ -275,75 +388,111 @@ export class RecordApiImpl<T = Record<string, unknown>>
}
}
const filters = opts?.filters;
const filters = this.opts?.filters;
if (filters) {
for (const filter of filters) {
traverseFilters("filter", filter);
}
}
const response = await this.client.fetch(`${this._path}?${params}`);
const response = await this.client.fetch(
`${recordApiBasePath}/${this.apiName}?${params}`,
);
return (await response.json()) as ListResponse<T>;
}
}
export interface RecordApi<T = Record<string, unknown>> {
list(opts?: ListOpts): Promise<ListResponse<T>>;
listOp(opts?: ListOpts): ListOperation<T>;
read(id: RecordId, opt?: ReadOpts): Promise<T>;
readOp(id: RecordId, opt?: ReadOpts): ReadOperation<T>;
create(record: T): Promise<RecordId>;
createOp(record: T): CreateOperation<T>;
// TODO: Retire in favor of `client.execute`.
createBulk(records: T[]): Promise<RecordId[]>;
update(id: RecordId, record: Partial<T>): Promise<void>;
updateOp(id: RecordId, record: Partial<T>): UpdateOperation;
delete(id: RecordId): Promise<void>;
deleteOp(id: RecordId): DeleteOperation;
subscribe(id: RecordId): Promise<ReadableStream<Event>>;
}
/// Provides CRUD access to records through TrailBase's record API.
export class RecordApiImpl<T = Record<string, unknown>>
implements RecordApi<T>
{
constructor(
private readonly client: Client,
private readonly name: string,
) {}
public async list(opts?: ListOpts): Promise<ListResponse<T>> {
return new ListOperation<T>(this.client, this.name, opts).query();
}
public listOp(opts?: ListOpts): ListOperation<T> {
return new ListOperation<T>(this.client, this.name, opts);
}
public async read<T = Record<string, unknown>>(
id: string | number,
opt?: {
expand?: string[];
},
id: RecordId,
opt?: ReadOpts,
): Promise<T> {
const expand = opt?.expand;
const response = await this.client.fetch(
expand
? `${this._path}/${id}?expand=${expand.join(",")}`
: `${this._path}/${id}`,
);
return (await response.json()) as T;
return new ReadOperation<T>(this.client, this.name, id, opt).query();
}
public async create<T = Record<string, unknown>>(
record: T,
): Promise<string | number> {
const response = await this.client.fetch(this._path, {
method: "POST",
body: JSON.stringify(record),
headers: jsonContentTypeHeader,
});
return (await response.json()).ids[0];
public readOp(id: RecordId, opt?: ReadOpts): ReadOperation<T> {
return new ReadOperation<T>(this.client, this.name, id, opt);
}
public async create(record: T): Promise<RecordId> {
return new CreateOperation<T>(this.client, this.name, record).query();
}
public createOp(record: T): CreateOperation<T> {
return new CreateOperation<T>(this.client, this.name, record);
}
public async createBulk<T = Record<string, unknown>>(
records: T[],
): Promise<(string | number)[]> {
const response = await this.client.fetch(this._path, {
method: "POST",
body: JSON.stringify(records),
headers: jsonContentTypeHeader,
});
): Promise<RecordId[]> {
const response = await this.client.fetch(
`${recordApiBasePath}/${this.name}`,
{
method: "POST",
body: JSON.stringify(records),
headers: jsonContentTypeHeader,
},
);
return (await response.json()).ids;
}
public async update<T = Record<string, unknown>>(
id: string | number,
record: Partial<T>,
): Promise<void> {
await this.client.fetch(`${this._path}/${id}`, {
method: "PATCH",
body: JSON.stringify(record),
headers: jsonContentTypeHeader,
});
public async update(id: RecordId, record: Partial<T>): Promise<void> {
return new UpdateOperation<T>(this.client, this.name, id, record).query();
}
public async delete(id: string | number): Promise<void> {
await this.client.fetch(`${this._path}/${id}`, {
method: "DELETE",
});
public updateOp(id: RecordId, record: Partial<T>): UpdateOperation<T> {
return new UpdateOperation<T>(this.client, this.name, id, record);
}
public async subscribe(id: string | number): Promise<ReadableStream<Event>> {
const response = await this.client.fetch(`${this._path}/subscribe/${id}`);
public async delete(id: RecordId): Promise<void> {
return new DeleteOperation(this.client, this.name, id).query();
}
public deleteOp(id: RecordId): DeleteOperation {
return new DeleteOperation(this.client, this.name, id);
}
public async subscribe(id: RecordId): Promise<ReadableStream<Event>> {
const response = await this.client.fetch(
`${recordApiBasePath}/${this.name}/subscribe/${id}`,
);
const body = response.body;
if (!body) {
throw Error("Subscription reader is null.");
@@ -352,8 +501,8 @@ export class RecordApiImpl<T = Record<string, unknown>>
const decoder = new TextDecoder();
const transformStream = new TransformStream<Uint8Array, Event>({
transform(chunk: Uint8Array, controller) {
const msgs = decoder.decode(chunk).trimEnd().split("\n\n");
for (const msg of msgs) {
const messages = decoder.decode(chunk).trimEnd().split("\n\n");
for (const msg of messages) {
if (msg.startsWith("data: ")) {
controller.enqueue(JSON.parse(msg.substring(6)));
}
@@ -428,6 +577,12 @@ export interface Client {
///
/// Unlike native fetch, will throw in case !response.ok.
fetch(path: string, init?: FetchOptions): Promise<Response>;
/// Execute a batch query.
execute(
operations: (CreateOperation | UpdateOperation | DeleteOperation)[],
transaction?: boolean,
): Promise<RecordId[]>;
}
/// Client for interacting with TrailBase auth and record APIs.
@@ -485,6 +640,20 @@ class ClientImpl implements Client {
return new RecordApiImpl<T>(this, name);
}
/// Execute a batch query.
async execute(
operations: (CreateOperation | UpdateOperation | DeleteOperation)[],
transaction: boolean = true,
): Promise<RecordId[]> {
const response = await this.fetch(transactionApiBasePath, {
method: "POST",
body: JSON.stringify({ operations, transaction }),
headers: jsonContentTypeHeader,
});
return (await response.json()).ids;
}
public avatarUrl(userId?: string): string | undefined {
const id = userId ?? this.user()?.id;
if (id) {
@@ -681,10 +850,11 @@ export async function initClientFromCookies(
const recordApiBasePath = "/api/records/v1";
const authApiBasePath = "/api/auth/v1";
const transactionApiBasePath = "/api/transaction/v1/execute";
export function filePath(
apiName: string,
recordId: string | number,
recordId: RecordId,
columnName: string,
): string {
return `${recordApiBasePath}/${apiName}/${recordId}/file/${columnName}`;
@@ -692,7 +862,7 @@ export function filePath(
export function filesPath(
apiName: string,
recordId: string | number,
recordId: RecordId,
columnName: string,
index: number,
): string {
@@ -64,7 +64,8 @@ test("auth integration tests", async () => {
test("Record integration tests", async () => {
const client = await connect();
const api = client.records<NewSimpleStrict>("simple_strict_table");
const apiName = "simple_strict_table";
const api = client.records<NewSimpleStrict>(apiName);
const now = new Date().getTime();
// Throw in some url characters for good measure.
@@ -86,6 +87,27 @@ test("Record integration tests", async () => {
expect(bulkIds.length).toBe(2);
}
{
const op: {
Create: {
api_name: string;
value: Record<string, unknown>;
};
} = JSON.parse(JSON.stringify(api.createOp({ text_not_null: "test" })));
expect(op.Create.api_name).toBe(apiName);
expect(op.Create.value.text_not_null).toBe("test");
const bulkIds = await client.execute(
[
api.createOp({ text_not_null: "ts bulk execute 0" }),
api.createOp({ text_not_null: "ts bulk execute 1" }),
],
false,
);
expect(bulkIds.length).toBe(2);
}
{
const response = await api.list({
filters: [
@@ -121,7 +143,7 @@ test("Record integration tests", async () => {
}
{
const response = await api.list<SimpleStrict>({
const response = await api.list({
filters: [
{
column: "text_not_null",
@@ -136,20 +158,20 @@ test("Record integration tests", async () => {
).toStrictEqual(messages);
}
const record: SimpleStrict = await api.read(ids[0]);
const record = await api.read(ids[0]);
expect(record.id).toStrictEqual(ids[0]);
expect(record.text_not_null).toStrictEqual(messages[0]);
// Test 1:1 view-bases record API.
const view_record: SimpleCompleteView = await client
.records("simple_complete_view")
.records<SimpleCompleteView>("simple_complete_view")
.read(ids[0]);
expect(view_record.id).toStrictEqual(ids[0]);
expect(view_record.text_not_null).toStrictEqual(messages[0]);
// Test view-based record API with column renames.
const subset_view_record: SimpleSubsetView = await client
.records("simple_subset_view")
.records<SimpleSubsetView>("simple_subset_view")
.read(ids[0]);
expect(subset_view_record.id).toStrictEqual(ids[0]);
expect(subset_view_record.t_not_null).toStrictEqual(messages[0]);
@@ -160,7 +182,7 @@ test("Record integration tests", async () => {
text_null: "updated null",
};
await api.update(ids[1], updated_value);
const updated_record: SimpleStrict = await api.read(ids[1]);
const updated_record = await api.read(ids[1]);
expect(updated_record).toEqual(
expect.objectContaining({
id: ids[1],
@@ -173,9 +195,7 @@ test("Record integration tests", async () => {
expect(await client.logout()).toBe(true);
expect(client.user()).toBe(undefined);
await expect(
async () => await api.read<SimpleStrict>(ids[0]),
).rejects.toThrowError(
await expect(async () => await api.read(ids[0])).rejects.toThrowError(
expect.objectContaining({
status: status.FORBIDDEN,
}),
@@ -206,10 +226,10 @@ type Comment = {
test("expand foreign records", async () => {
const client = await connect();
const api = client.records("comment");
const api = client.records<Comment>("comment");
{
const comment = await api.read<Comment>(1);
const comment = await api.read(1);
expect(comment.id).toBe(1);
expect(comment.body).toBe("first comment");
expect(comment.author.data).toBeUndefined();
@@ -217,7 +237,7 @@ test("expand foreign records", async () => {
}
{
const comment = await api.read<Comment>(1, { expand: ["post"] });
const comment = await api.read(1, { expand: ["post"] });
expect(comment.id).toBe(1);
expect(comment.body).toBe("first comment");
expect(comment.author.data).toBeUndefined();
@@ -225,7 +245,7 @@ test("expand foreign records", async () => {
}
{
const response = await api.list<Comment>({
const response = await api.list({
expand: ["author", "post"],
order: ["-id"],
pagination: {
@@ -243,7 +263,7 @@ test("expand foreign records", async () => {
}
{
const response = await api.list<Comment>({
const response = await api.list({
expand: ["author", "post"],
order: ["-id"],
pagination: {
@@ -254,7 +274,7 @@ test("expand foreign records", async () => {
expect(response.records.length).toBe(2);
const second = response.records[1];
const offsetResponse = await api.list<Comment>({
const offsetResponse = await api.list({
expand: ["author", "post"],
order: ["-id"],
pagination: {
@@ -278,7 +298,7 @@ test("record error tests", async () => {
);
const nonExistantApi = client.records("non-existant");
await expect(
async () => await nonExistantApi.read<SimpleStrict>(nonExistantId),
async () => await nonExistantApi.read(nonExistantId),
).rejects.toThrowError(
expect.objectContaining({
status: status.METHOD_NOT_ALLOWED,
@@ -286,16 +306,12 @@ test("record error tests", async () => {
);
const api = client.records("simple_strict_table");
await expect(
async () => await api.read<SimpleStrict>("invalid id"),
).rejects.toThrowError(
await expect(async () => await api.read("invalid id")).rejects.toThrowError(
expect.objectContaining({
status: status.BAD_REQUEST,
}),
);
await expect(
async () => await api.read<SimpleStrict>(nonExistantId),
).rejects.toThrowError(
await expect(async () => await api.read(nonExistantId)).rejects.toThrowError(
expect.objectContaining({
status: status.NOT_FOUND,
}),
@@ -304,11 +320,11 @@ test("record error tests", async () => {
test("realtime subscribe specific record tests", async () => {
const client = await connect();
const api = client.records("simple_strict_table");
const api = client.records<NewSimpleStrict>("simple_strict_table");
const now = new Date().getTime();
const createMessage = `ts client realtime test 0: =?&${now}`;
const id = (await api.create<NewSimpleStrict>({
const id = (await api.create({
text_not_null: createMessage,
})) as string;
@@ -331,14 +347,57 @@ test("realtime subscribe specific record tests", async () => {
expect(events[1]["Delete"]["text_not_null"]).equals(updatedMessage);
});
test("transaction tests", async () => {
const client = await connect();
const api = client.records<NewSimpleStrict>("simple_strict_table");
const now = new Date().getTime();
// Test transaction with create operation
{
const record = { text_not_null: `ts transaction create test: =?&${now}` };
const ids = await client.execute([api.createOp(record)]);
expect(ids).toHaveLength(1);
// Verify record was created
const createdRecord = await api.read(ids[0]);
expect(createdRecord.text_not_null).toBe(record.text_not_null);
}
// Test transaction with update operation
{
const record = {
text_not_null: `ts transaction update test original: =?&${now}`,
};
const id = await api.create(record);
const updatedRecord = {
text_not_null: `ts transaction update test modified: =?&${now}`,
};
await client.execute([api.updateOp(id, updatedRecord)]);
const readRecord = await api.read(id);
expect(readRecord.text_not_null).toBe(updatedRecord.text_not_null);
}
// Test transaction with delete operation
{
const record = { text_not_null: `ts transaction delete test: =?&${now}` };
const id = await api.create(record);
await client.execute([api.deleteOp(id)]);
await expect(api.read(id)).rejects.toThrow();
}
});
test("realtime subscribe table tests", async () => {
const client = await connect();
const api = client.records("simple_strict_table");
const api = client.records<NewSimpleStrict>("simple_strict_table");
const eventStream = await api.subscribe("*");
const now = new Date().getTime();
const createMessage = `ts client realtime test 0: =?&${now}`;
const id = (await api.create<NewSimpleStrict>({
const id = (await api.create({
text_not_null: createMessage,
})) as string;
+35 -15
View File
@@ -31,6 +31,7 @@ pub enum Operation {
#[derive(Clone, Debug, Deserialize, Serialize, ToSchema)]
pub struct TransactionRequest {
operations: Vec<Operation>,
transaction: Option<bool>,
}
#[derive(Clone, Debug, Deserialize, Serialize, ToSchema)]
@@ -187,25 +188,42 @@ pub async fn record_transactions_handler(
})
.collect::<Result<Vec<_>, _>>()?;
let ids = state
.conn()
.call(
move |conn: &mut rusqlite::Connection| -> Result<Vec<String>, trailbase_sqlite::Error> {
let tx = conn.transaction()?;
let ids = if request.transaction.unwrap_or(true) {
state
.conn()
.call(
move |conn: &mut rusqlite::Connection| -> Result<Vec<String>, trailbase_sqlite::Error> {
let tx = conn.transaction()?;
let mut ids: Vec<String> = vec![];
for op in operations {
if let Some(id) = op(&tx).map_err(|err| trailbase_sqlite::Error::Other(err.into()))? {
ids.push(id);
let mut ids: Vec<String> = vec![];
for op in operations {
if let Some(id) = op(&tx).map_err(|err| trailbase_sqlite::Error::Other(err.into()))? {
ids.push(id);
}
}
}
tx.commit()?;
tx.commit()?;
return Ok(ids);
},
)
.await?;
return Ok(ids);
},
)
.await?
} else {
state
.conn()
.call(
move |conn: &mut rusqlite::Connection| -> Result<Vec<String>, trailbase_sqlite::Error> {
let mut ids: Vec<String> = vec![];
for op in operations {
if let Some(id) = op(conn).map_err(|err| trailbase_sqlite::Error::Other(err.into()))? {
ids.push(id);
}
}
return Ok(ids);
},
)
.await?
};
return Ok(Json(TransactionResponse { ids }));
}
@@ -304,6 +322,7 @@ mod tests {
value: json!({"value": 2}),
},
],
transaction: None,
}),
)
.await
@@ -325,6 +344,7 @@ mod tests {
value: json!({"value": 3}),
},
],
transaction: None,
}),
)
.await