From d295330e12627e3ac5cbcee8f080eb127410ef0c Mon Sep 17 00:00:00 2001 From: Sebastian Jeltsch Date: Mon, 13 Jan 2025 15:32:37 +0100 Subject: [PATCH] First version of a subscription API for the dotnet client. --- client/trailbase-dotnet/Client.cs | 14 ++-- client/trailbase-dotnet/ClientTest.cs | 68 +++++++++++++++ client/trailbase-dotnet/RecordApi.cs | 115 +++++++++++++++++++++++++- 3 files changed, 187 insertions(+), 10 deletions(-) diff --git a/client/trailbase-dotnet/Client.cs b/client/trailbase-dotnet/Client.cs index e2059e0d..626fe5dc 100644 --- a/client/trailbase-dotnet/Client.cs +++ b/client/trailbase-dotnet/Client.cs @@ -144,16 +144,17 @@ class ThinClient { string site; - public ThinClient(string site) { + internal ThinClient(string site) { this.site = site; } - public async Task Fetch( + internal async Task Fetch( String path, TokenState tokenState, HttpContent? data, HttpMethod? method, - Dictionary? queryParams + Dictionary? queryParams, + HttpCompletionOption completion = HttpCompletionOption.ResponseContentRead ) { if (path.StartsWith('/')) { throw new ArgumentException("Path starts with '/'. Relative path expected."); @@ -183,7 +184,7 @@ class ThinClient { } } - return await client.SendAsync(httpRequestMessage); + return await client.SendAsync(httpRequestMessage, completion); } } @@ -309,7 +310,8 @@ public class Client { string path, HttpMethod? method, HttpContent? data, - Dictionary? queryParams + Dictionary? queryParams, + HttpCompletionOption completion = HttpCompletionOption.ResponseContentRead ) { var ts = tokenState; var refreshToken = shouldRefresh(tokenState); @@ -317,7 +319,7 @@ public class Client { ts = tokenState = await refreshTokensImpl(refreshToken); } - var response = await client.Fetch(path, ts, data, method, queryParams); + var response = await client.Fetch(path, ts, data, method, queryParams, completion); if (response.StatusCode != System.Net.HttpStatusCode.OK) { string errMsg = await response.Content.ReadAsStringAsync(); diff --git a/client/trailbase-dotnet/ClientTest.cs b/client/trailbase-dotnet/ClientTest.cs index d5383925..f1214f75 100644 --- a/client/trailbase-dotnet/ClientTest.cs +++ b/client/trailbase-dotnet/ClientTest.cs @@ -303,4 +303,72 @@ public class ClientTest : IClassFixture { Assert.Single(records); } } + + [Fact] + public async Task RealtimeTest() { + var client = new Client($"http://127.0.0.1:{Constants.Port}", null); + await client.Login("admin@localhost", "secret"); + + var api = client.Records("simple_strict_table"); + + var tableEventStream = await api.SubscribeAll(); + + // Dotnet runs tests for multiple target framework versions in parallel. + // Each test currently brings up its own server but pointing at the same + // underlying database file. We include the runtime version in the filter + // query to avoid a race between both tests. This feels a bit hacky. + // Ideally, we'd run the tests sequentially or with better isolation :/. + var now = DateTimeOffset.Now.ToUnixTimeSeconds(); + var suffix = $"{now} {System.Environment.Version} static"; + var CreateMessage = $"C# client test 0: =?&{suffix}"; + + RecordId id = await api.Create( + new SimpleStrict(null, null, null, CreateMessage), + SerializeSimpleStrictContext.Default.SimpleStrict + ); + + var eventStream = await api.Subscribe(id); + + var UpdatedMessage = $"C# client update test 0: =?&{suffix}"; + await api.Update( + id, + new SimpleStrict(null, null, null, UpdatedMessage), + SerializeSimpleStrictContext.Default.SimpleStrict + ); + + await api.Delete(id); + + List events = []; + await foreach (Event msg in eventStream) { + events.Add(msg); + } + + Assert.Equal(2, events.Count); + + Assert.True(events[0] is UpdateEvent); + Assert.Equal(UpdatedMessage, events[0].Value!["text_not_null"]?.ToString()); + + Assert.True(events[1] is DeleteEvent); + Assert.Equal(UpdatedMessage, events[1].Value!["text_not_null"]?.ToString()); + + List tableEvents = []; + await foreach (Event msg in tableEventStream) { + tableEvents.Add(msg); + + if (tableEvents.Count >= 3) { + break; + } + } + + Assert.Equal(3, tableEvents.Count); + + Assert.True(tableEvents[0] is InsertEvent); + Assert.Equal(CreateMessage, tableEvents[0].Value!["text_not_null"]?.ToString()); + + Assert.True(tableEvents[1] is UpdateEvent); + Assert.Equal(UpdatedMessage, tableEvents[1].Value!["text_not_null"]?.ToString()); + + Assert.True(tableEvents[2] is DeleteEvent); + Assert.Equal(UpdatedMessage, tableEvents[2].Value!["text_not_null"]?.ToString()); + } } diff --git a/client/trailbase-dotnet/RecordApi.cs b/client/trailbase-dotnet/RecordApi.cs index 44b84510..88151425 100644 --- a/client/trailbase-dotnet/RecordApi.cs +++ b/client/trailbase-dotnet/RecordApi.cs @@ -57,6 +57,77 @@ public class Pagination { } } +public abstract class Event { + public abstract JsonNode? Value { get; } + + internal static Event Parse(string message) { + var obj = (JsonObject?)JsonNode.Parse(message); + if (obj != null) { + var insert = obj["Insert"]; + if (insert != null) { + return new InsertEvent(insert); + } + + var update = obj["Update"]; + if (update != null) { + return new UpdateEvent(update); + } + + var delete = obj["Delete"]; + if (delete != null) { + return new DeleteEvent(delete); + } + + var error = obj["Error"]; + if (error != null) { + return new ErrorEvent(error.ToString()); + } + } + + throw new Exception($"Failed to parse {message}"); + } +} + +public class InsertEvent : Event { + public override JsonNode? Value { get; } + + public InsertEvent(JsonNode? value) { + this.Value = value; + } + + public override string ToString() => $"InsertEvent({Value})"; +} + +public class UpdateEvent : Event { + public override JsonNode? Value { get; } + + public UpdateEvent(JsonNode? value) { + this.Value = value; + } + + public override string ToString() => $"UpdateEvent({Value})"; +} + +public class DeleteEvent : Event { + public override JsonNode? Value { get; } + + public DeleteEvent(JsonNode? value) { + this.Value = value; + } + + public override string ToString() => $"DeleteEvent({Value})"; +} + +public class ErrorEvent : Event { + public override JsonNode? Value { get { return null; } } + public string ErrorMessage { get; } + + public ErrorEvent(string errorMsg) { + this.ErrorMessage = errorMsg; + } + + public override string ToString() => $"ErrorEvent({ErrorMessage})"; +} [JsonSourceGenerationOptions(WriteIndented = true)] [JsonSerializable(typeof(ResponseRecordId))] @@ -96,7 +167,7 @@ public class RecordApi { public async Task Read(string id, JsonTypeInfo jsonTypeInfo) => await Read(new UuidRecordId(id), jsonTypeInfo); public async Task Read(long id, JsonTypeInfo jsonTypeInfo) => await Read(new IntegerRecordId(id), jsonTypeInfo); - public async Task ReadImpl(RecordId id) { + private async Task ReadImpl(RecordId id) { var response = await client.Fetch( $"{RecordApi._recordApi}/{name}/{id}", HttpMethod.Get, @@ -153,7 +224,7 @@ public class RecordApi { return JsonSerializer.Deserialize>(json, jsonTypeInfo) ?? []; } - public async Task ListImpl( + private async Task ListImpl( Pagination? pagination, List? order, List? filters @@ -219,7 +290,7 @@ public class RecordApi { await UpdateImpl(id, recordJson); } - public async Task UpdateImpl( + private async Task UpdateImpl( RecordId id, HttpContent recordJson ) { @@ -232,11 +303,47 @@ public class RecordApi { } public async Task Delete(RecordId id) { - await client.Fetch( + var response = await client.Fetch( $"{RecordApi._recordApi}/{name}/{id}", HttpMethod.Delete, null, null ); } + + public async Task> Subscribe(RecordId id) { + var response = await SubscribeImpl(id.ToString()!); + return StreamToEnumerableImpl(await response.ReadAsStreamAsync()); + } + + public async Task> SubscribeAll() { + var response = await SubscribeImpl("*"); + return StreamToEnumerableImpl(await response.ReadAsStreamAsync()); + } + + private async Task SubscribeImpl(string id) { + var response = await client.Fetch( + $"{RecordApi._recordApi}/{name}/subscribe/{id}", + HttpMethod.Get, + null, + null, + HttpCompletionOption.ResponseHeadersRead + ); + + return response.Content; + } + + private static async IAsyncEnumerable StreamToEnumerableImpl(Stream stream) { + using (var streamReader = new StreamReader(stream)) { + while (!streamReader.EndOfStream) { + var message = await streamReader.ReadLineAsync(); + if (message != null) { + message.Trim(); + if (message.StartsWith("data: ")) { + yield return Event.Parse(message.Substring(6)); + } + } + } + } + } }