First version of a subscription API for the dotnet client.

This commit is contained in:
Sebastian Jeltsch
2025-01-13 15:32:37 +01:00
parent acdc888653
commit d295330e12
3 changed files with 187 additions and 10 deletions

View File

@@ -144,16 +144,17 @@ class ThinClient {
string site;
public ThinClient(string site) {
internal ThinClient(string site) {
this.site = site;
}
public async Task<HttpResponseMessage> Fetch(
internal async Task<HttpResponseMessage> Fetch(
String path,
TokenState tokenState,
HttpContent? data,
HttpMethod? method,
Dictionary<string, string>? queryParams
Dictionary<string, string>? queryParams,
HttpCompletionOption completion = HttpCompletionOption.ResponseContentRead
) {
if (path.StartsWith('/')) {
throw new ArgumentException("Path starts with '/'. Relative path expected.");
@@ -183,7 +184,7 @@ class ThinClient {
}
}
return await client.SendAsync(httpRequestMessage);
return await client.SendAsync(httpRequestMessage, completion);
}
}
@@ -309,7 +310,8 @@ public class Client {
string path,
HttpMethod? method,
HttpContent? data,
Dictionary<string, string>? queryParams
Dictionary<string, string>? queryParams,
HttpCompletionOption completion = HttpCompletionOption.ResponseContentRead
) {
var ts = tokenState;
var refreshToken = shouldRefresh(tokenState);
@@ -317,7 +319,7 @@ public class Client {
ts = tokenState = await refreshTokensImpl(refreshToken);
}
var response = await client.Fetch(path, ts, data, method, queryParams);
var response = await client.Fetch(path, ts, data, method, queryParams, completion);
if (response.StatusCode != System.Net.HttpStatusCode.OK) {
string errMsg = await response.Content.ReadAsStringAsync();

View File

@@ -303,4 +303,72 @@ public class ClientTest : IClassFixture<ClientTestFixture> {
Assert.Single(records);
}
}
[Fact]
public async Task RealtimeTest() {
var client = new Client($"http://127.0.0.1:{Constants.Port}", null);
await client.Login("admin@localhost", "secret");
var api = client.Records("simple_strict_table");
var tableEventStream = await api.SubscribeAll();
// Dotnet runs tests for multiple target framework versions in parallel.
// Each test currently brings up its own server but pointing at the same
// underlying database file. We include the runtime version in the filter
// query to avoid a race between both tests. This feels a bit hacky.
// Ideally, we'd run the tests sequentially or with better isolation :/.
var now = DateTimeOffset.Now.ToUnixTimeSeconds();
var suffix = $"{now} {System.Environment.Version} static";
var CreateMessage = $"C# client test 0: =?&{suffix}";
RecordId id = await api.Create(
new SimpleStrict(null, null, null, CreateMessage),
SerializeSimpleStrictContext.Default.SimpleStrict
);
var eventStream = await api.Subscribe(id);
var UpdatedMessage = $"C# client update test 0: =?&{suffix}";
await api.Update(
id,
new SimpleStrict(null, null, null, UpdatedMessage),
SerializeSimpleStrictContext.Default.SimpleStrict
);
await api.Delete(id);
List<Event> events = [];
await foreach (Event msg in eventStream) {
events.Add(msg);
}
Assert.Equal(2, events.Count);
Assert.True(events[0] is UpdateEvent);
Assert.Equal(UpdatedMessage, events[0].Value!["text_not_null"]?.ToString());
Assert.True(events[1] is DeleteEvent);
Assert.Equal(UpdatedMessage, events[1].Value!["text_not_null"]?.ToString());
List<Event> tableEvents = [];
await foreach (Event msg in tableEventStream) {
tableEvents.Add(msg);
if (tableEvents.Count >= 3) {
break;
}
}
Assert.Equal(3, tableEvents.Count);
Assert.True(tableEvents[0] is InsertEvent);
Assert.Equal(CreateMessage, tableEvents[0].Value!["text_not_null"]?.ToString());
Assert.True(tableEvents[1] is UpdateEvent);
Assert.Equal(UpdatedMessage, tableEvents[1].Value!["text_not_null"]?.ToString());
Assert.True(tableEvents[2] is DeleteEvent);
Assert.Equal(UpdatedMessage, tableEvents[2].Value!["text_not_null"]?.ToString());
}
}

View File

@@ -57,6 +57,77 @@ public class Pagination {
}
}
public abstract class Event {
public abstract JsonNode? Value { get; }
internal static Event Parse(string message) {
var obj = (JsonObject?)JsonNode.Parse(message);
if (obj != null) {
var insert = obj["Insert"];
if (insert != null) {
return new InsertEvent(insert);
}
var update = obj["Update"];
if (update != null) {
return new UpdateEvent(update);
}
var delete = obj["Delete"];
if (delete != null) {
return new DeleteEvent(delete);
}
var error = obj["Error"];
if (error != null) {
return new ErrorEvent(error.ToString());
}
}
throw new Exception($"Failed to parse {message}");
}
}
public class InsertEvent : Event {
public override JsonNode? Value { get; }
public InsertEvent(JsonNode? value) {
this.Value = value;
}
public override string ToString() => $"InsertEvent({Value})";
}
public class UpdateEvent : Event {
public override JsonNode? Value { get; }
public UpdateEvent(JsonNode? value) {
this.Value = value;
}
public override string ToString() => $"UpdateEvent({Value})";
}
public class DeleteEvent : Event {
public override JsonNode? Value { get; }
public DeleteEvent(JsonNode? value) {
this.Value = value;
}
public override string ToString() => $"DeleteEvent({Value})";
}
public class ErrorEvent : Event {
public override JsonNode? Value { get { return null; } }
public string ErrorMessage { get; }
public ErrorEvent(string errorMsg) {
this.ErrorMessage = errorMsg;
}
public override string ToString() => $"ErrorEvent({ErrorMessage})";
}
[JsonSourceGenerationOptions(WriteIndented = true)]
[JsonSerializable(typeof(ResponseRecordId))]
@@ -96,7 +167,7 @@ public class RecordApi {
public async Task<T?> Read<T>(string id, JsonTypeInfo<T> jsonTypeInfo) => await Read<T>(new UuidRecordId(id), jsonTypeInfo);
public async Task<T?> Read<T>(long id, JsonTypeInfo<T> jsonTypeInfo) => await Read<T>(new IntegerRecordId(id), jsonTypeInfo);
public async Task<HttpContent> ReadImpl(RecordId id) {
private async Task<HttpContent> ReadImpl(RecordId id) {
var response = await client.Fetch(
$"{RecordApi._recordApi}/{name}/{id}",
HttpMethod.Get,
@@ -153,7 +224,7 @@ public class RecordApi {
return JsonSerializer.Deserialize<List<T>>(json, jsonTypeInfo) ?? [];
}
public async Task<HttpContent> ListImpl(
private async Task<HttpContent> ListImpl(
Pagination? pagination,
List<string>? order,
List<string>? filters
@@ -219,7 +290,7 @@ public class RecordApi {
await UpdateImpl(id, recordJson);
}
public async Task UpdateImpl(
private async Task UpdateImpl(
RecordId id,
HttpContent recordJson
) {
@@ -232,11 +303,47 @@ public class RecordApi {
}
public async Task Delete(RecordId id) {
await client.Fetch(
var response = await client.Fetch(
$"{RecordApi._recordApi}/{name}/{id}",
HttpMethod.Delete,
null,
null
);
}
public async Task<IAsyncEnumerable<Event>> Subscribe(RecordId id) {
var response = await SubscribeImpl(id.ToString()!);
return StreamToEnumerableImpl(await response.ReadAsStreamAsync());
}
public async Task<IAsyncEnumerable<Event>> SubscribeAll() {
var response = await SubscribeImpl("*");
return StreamToEnumerableImpl(await response.ReadAsStreamAsync());
}
private async Task<HttpContent> SubscribeImpl(string id) {
var response = await client.Fetch(
$"{RecordApi._recordApi}/{name}/subscribe/{id}",
HttpMethod.Get,
null,
null,
HttpCompletionOption.ResponseHeadersRead
);
return response.Content;
}
private static async IAsyncEnumerable<Event> StreamToEnumerableImpl(Stream stream) {
using (var streamReader = new StreamReader(stream)) {
while (!streamReader.EndOfStream) {
var message = await streamReader.ReadLineAsync();
if (message != null) {
message.Trim();
if (message.StartsWith("data: ")) {
yield return Event.Parse(message.Substring(6));
}
}
}
}
}
}