Add support for record-filtered subscriptions to dotnet client.

This commit is contained in:
Sebastian Jeltsch
2025-09-27 10:46:37 +02:00
parent 280ec8c5df
commit 56c78c2ad8
3 changed files with 121 additions and 64 deletions
+3 -3
View File
@@ -435,17 +435,17 @@ class RecordApi {
}
Future<Stream<Event>> subscribe(RecordId id) async {
return await _subscribeImpl(id: id);
return await _subscribeImpl(id: id.toString());
}
Future<Stream<Event>> subscribeAll({
List<FilterBase>? filters,
}) async {
return await _subscribeImpl(id: '*'.id(), filters: filters);
return await _subscribeImpl(id: '*', filters: filters);
}
Future<Stream<Event>> _subscribeImpl({
required RecordId id,
required String id,
List<FilterBase>? filters,
}) async {
final params = <String, String>{};
+61 -13
View File
@@ -167,9 +167,7 @@ public class ClientTest : IClassFixture<ClientTestFixture> {
null,
false
)!;
Console.WriteLine("FFFFIIII");
Assert.Single(response.records);
Console.WriteLine("FFFFIIII AFTER");
Assert.Null(response.total_count);
Assert.Equal(messages[0], response.records[0].text_not_null);
}
@@ -382,21 +380,20 @@ public class ClientTest : IClassFixture<ClientTestFixture> {
// underlying database file. We include the runtime version in the filter
// query to avoid a race between both tests. This feels a bit hacky.
// Ideally, we'd run the tests sequentially or with better isolation :/.
var now = DateTimeOffset.Now.ToUnixTimeSeconds();
var suffix = $"{now} {System.Environment.Version} static";
var CreateMessage = $"C# client test 0: =?&{suffix}";
var suffix = $"{DateTimeOffset.Now.ToUnixTimeSeconds()} {System.Environment.Version} static";
var createMessage = $"C# client realtime test 0: =?&{suffix}";
RecordId id = await api.Create(
new SimpleStrict(null, null, null, CreateMessage),
new SimpleStrict(null, null, null, createMessage),
SerializeSimpleStrictContext.Default.SimpleStrict
);
var eventStream = await api.Subscribe(id);
var UpdatedMessage = $"C# client update test 0: =?&{suffix}";
var updatedMessage = $"C# client realtime update test 0: =?&{suffix}";
await api.Update(
id,
new SimpleStrict(null, null, null, UpdatedMessage),
new SimpleStrict(null, null, null, updatedMessage),
SerializeSimpleStrictContext.Default.SimpleStrict
);
@@ -410,15 +407,16 @@ public class ClientTest : IClassFixture<ClientTestFixture> {
Assert.Equal(2, events.Count);
Assert.True(events[0] is UpdateEvent);
Assert.Equal(UpdatedMessage, events[0].Value!["text_not_null"]?.ToString());
Assert.Equal(updatedMessage, events[0].Value!["text_not_null"]?.ToString());
Assert.True(events[1] is DeleteEvent);
Assert.Equal(UpdatedMessage, events[1].Value!["text_not_null"]?.ToString());
Assert.Equal(updatedMessage, events[1].Value!["text_not_null"]?.ToString());
List<Event> tableEvents = [];
await foreach (Event msg in tableEventStream) {
tableEvents.Add(msg);
// TODO: Maybe use a timeout instead.
if (tableEvents.Count >= 3) {
break;
}
@@ -427,12 +425,62 @@ public class ClientTest : IClassFixture<ClientTestFixture> {
Assert.Equal(3, tableEvents.Count);
Assert.True(tableEvents[0] is InsertEvent);
Assert.Equal(CreateMessage, tableEvents[0].Value!["text_not_null"]?.ToString());
Assert.Equal(createMessage, tableEvents[0].Value!["text_not_null"]?.ToString());
Assert.True(tableEvents[1] is UpdateEvent);
Assert.Equal(UpdatedMessage, tableEvents[1].Value!["text_not_null"]?.ToString());
Assert.Equal(updatedMessage, tableEvents[1].Value!["text_not_null"]?.ToString());
Assert.True(tableEvents[2] is DeleteEvent);
Assert.Equal(UpdatedMessage, tableEvents[2].Value!["text_not_null"]?.ToString());
Assert.Equal(updatedMessage, tableEvents[2].Value!["text_not_null"]?.ToString());
}
[Fact]
public async Task RealtimeTableSubscriptionWithFilterTest() {
var client = await ClientTest.Connect();
var api = client.Records("simple_strict_table");
// Dotnet runs tests for multiple target framework versions in parallel.
// Each test currently brings up its own server but pointing at the same
// underlying database file. We include the runtime version in the filter
// query to avoid a race between both tests. This feels a bit hacky.
// Ideally, we'd run the tests sequentially or with better isolation :/.
var suffix = $"{DateTimeOffset.Now.ToUnixTimeSeconds()} {System.Environment.Version} static";
var updatedMessage = $"C# client updated realtime test 42: {suffix}";
var tableEventStream = await api.SubscribeAll(filters: [new Filter(column: "text_not_null", value: updatedMessage)]);
var createMessage = $"C# client realtime test 42: =?&{suffix}";
RecordId id = await api.Create(
new SimpleStrict(null, null, null, createMessage),
SerializeSimpleStrictContext.Default.SimpleStrict
);
var eventStream = await api.Subscribe(id);
await api.Update(
id,
new SimpleStrict(null, null, null, updatedMessage),
SerializeSimpleStrictContext.Default.SimpleStrict
);
await api.Delete(id);
List<Event> events = [];
await foreach (Event msg in tableEventStream) {
events.Add(msg);
// TODO: Maybe use a timeout instead.
if (events.Count >= 2) {
break;
}
}
Assert.Equal(2, events.Count);
Assert.True(events[0] is UpdateEvent);
Assert.Equal(updatedMessage, events[0].Value!["text_not_null"]?.ToString());
Assert.True(events[1] is DeleteEvent);
Assert.Equal(updatedMessage, events[1].Value!["text_not_null"]?.ToString());
}
}
+57 -48
View File
@@ -227,7 +227,50 @@ public enum CompareOp {
}
/// <summary>Abstract base class for filters.</summary>
public abstract class FilterBase { }
public abstract class FilterBase {
/// <summary>Helper function to traverse nested filters and add them to the "queryParams".</summary>
internal static void addFiltersToParams(ref Dictionary<string, string> queryParams, String path, FilterBase filter) {
String op(CompareOp op) {
return op switch {
CompareOp.Equal => "$eq",
CompareOp.NotEqual => "$eq",
CompareOp.LessThan => "$lt",
CompareOp.LessThanEqual => "$lte",
CompareOp.GreaterThan => "$gt",
CompareOp.GreaterThanEqual => "$gte",
CompareOp.Like => "$like",
CompareOp.Regexp => "$re",
_ => "??",
};
}
switch (filter) {
case Filter f:
if (f.op != null) {
var o = op((CompareOp)f.op);
queryParams.Add($"{path}[{f.column}][{o}]", f.value);
}
else {
queryParams.Add($"{path}[{f.column}]", f.value);
}
break;
case And f:
var i = 0;
foreach (var fil in f.filters) {
addFiltersToParams(ref queryParams, $"{path}[$and][{i++}]", fil);
}
break;
case Or f:
var j = 0;
foreach (var fil in f.filters) {
addFiltersToParams(ref queryParams, $"{path}[$or][{j++}]", fil);
}
break;
default:
break;
}
}
}
/// <summary>Column filters.</summary>
sealed public class Filter : FilterBase {
@@ -473,50 +516,8 @@ public class RecordApi {
param.Add("expand", String.Join(",", expand.ToArray()));
}
String op(CompareOp op) {
return op switch {
CompareOp.Equal => "$eq",
CompareOp.NotEqual => "$eq",
CompareOp.LessThan => "$lt",
CompareOp.LessThanEqual => "$lte",
CompareOp.GreaterThan => "$gt",
CompareOp.GreaterThanEqual => "$gte",
CompareOp.Like => "$like",
CompareOp.Regexp => "$re",
_ => "??",
};
}
void traverseFilters(String path, FilterBase filter) {
switch (filter) {
case Filter f:
if (f.op != null) {
var o = op((CompareOp)f.op);
param.Add($"{path}[{f.column}][{o}]", f.value);
}
else {
param.Add($"{path}[{f.column}]", f.value);
}
break;
case And f:
var i = 0;
foreach (var fil in f.filters) {
traverseFilters($"{path}[$and][{i++}]", fil);
}
break;
case Or f:
var j = 0;
foreach (var fil in f.filters) {
traverseFilters($"{path}[$or][{j++}]", fil);
}
break;
default:
break;
}
}
foreach (var filter in filters ?? []) {
traverseFilters("filter", filter);
FilterBase.addFiltersToParams(ref param, "filter", filter);
}
var response = await client.Fetch(
@@ -582,17 +583,25 @@ public class RecordApi {
}
/// <summary>Listen for all accessible changes to this Record API.</summary>
public async Task<IAsyncEnumerable<Event>> SubscribeAll() {
var response = await SubscribeImpl("*");
public async Task<IAsyncEnumerable<Event>> SubscribeAll(List<FilterBase>? filters = null) {
var response = await SubscribeImpl("*", filters);
return StreamToEnumerableImpl(await response.ReadAsStreamAsync());
}
private async Task<HttpContent> SubscribeImpl(string id) {
private async Task<HttpContent> SubscribeImpl(string id, List<FilterBase>? filters = null) {
Dictionary<string, string>? queryParams = null;
if (filters != null) {
queryParams = new Dictionary<string, string>();
foreach (var filter in filters ?? []) {
FilterBase.addFiltersToParams(ref queryParams, "filter", filter);
}
}
var response = await client.Fetch(
$"{RecordApi._recordApi}/{name}/subscribe/{id}",
HttpMethod.Get,
null,
null,
queryParams,
HttpCompletionOption.ResponseHeadersRead
);