mirror of
https://github.com/trailbaseio/trailbase.git
synced 2026-05-19 07:49:57 -05:00
Python client: add event abstraction, parsing of programmatic error and sequence numbers.
This commit is contained in:
@@ -1,6 +1,22 @@
|
||||
from trailbase import Client, CompareOp, FetchException, Filter, RecordId, JSON, JSON_OBJECT
|
||||
from trailbase import (
|
||||
EVENT_ERROR_STATUS_FORBIDDEN,
|
||||
ErrorEvent,
|
||||
parseEvent,
|
||||
Client,
|
||||
CompareOp,
|
||||
FetchException,
|
||||
Filter,
|
||||
InsertEvent,
|
||||
UpdateEvent,
|
||||
DeleteEvent,
|
||||
RecordId,
|
||||
JSON,
|
||||
JSON_OBJECT,
|
||||
EVENT,
|
||||
)
|
||||
|
||||
import httpx
|
||||
import json
|
||||
import logging
|
||||
import mintotp # type: ignore
|
||||
import os
|
||||
@@ -8,7 +24,7 @@ import pytest
|
||||
import subprocess
|
||||
|
||||
from time import time, sleep
|
||||
from typing import List
|
||||
from typing import List, cast
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
@@ -283,26 +299,77 @@ def test_expand_foreign_records(trailbase: TrailBaseFixture):
|
||||
assert second == offset_comments.records[0]
|
||||
|
||||
|
||||
def test_parse_event():
|
||||
err_json: str = """
|
||||
{
|
||||
"Error": {
|
||||
"status": 1,
|
||||
"message": "test"
|
||||
},
|
||||
"seq": 3
|
||||
}
|
||||
"""
|
||||
|
||||
err_event = cast(ErrorEvent | None, parseEvent(json.loads(err_json)))
|
||||
assert err_event is not None
|
||||
assert err_event.seq == 3
|
||||
assert err_event.status == EVENT_ERROR_STATUS_FORBIDDEN
|
||||
assert err_event.message == "test"
|
||||
|
||||
update_json: str = """
|
||||
{
|
||||
"Update": {
|
||||
"col0": "val0",
|
||||
"col1": 4
|
||||
},
|
||||
"seq": 4
|
||||
}
|
||||
"""
|
||||
|
||||
update_event = cast(UpdateEvent | None, parseEvent(json.loads(update_json)))
|
||||
assert update_event is not None
|
||||
assert update_event.seq == 4
|
||||
|
||||
|
||||
def test_subscriptions(trailbase: TrailBaseFixture):
|
||||
assert trailbase.isUp()
|
||||
|
||||
client = connect()
|
||||
api = client.records("simple_strict_table")
|
||||
|
||||
table_subscription = api.subscribe("*")
|
||||
table_subscription = api.subscribe_all()
|
||||
|
||||
now = int(time())
|
||||
create_message = f"python client test 0: =?&{now}"
|
||||
api.create({"text_not_null": create_message})
|
||||
create_message = f"python client subscription test 0: =?&{now}"
|
||||
id = api.create({"text_not_null": create_message})
|
||||
|
||||
events: List[dict[str, JSON]] = []
|
||||
update_message = f"python client subscription test 1: =?&{now}"
|
||||
api.update(id, {"text_not_null": update_message})
|
||||
|
||||
api.delete(id)
|
||||
|
||||
events: List[EVENT] = []
|
||||
for ev in table_subscription:
|
||||
events.append(ev)
|
||||
break
|
||||
if len(events) == 3:
|
||||
break
|
||||
|
||||
table_subscription.close()
|
||||
|
||||
assert "Insert" in events[0]
|
||||
ev0 = events[0]
|
||||
assert type(ev0) is InsertEvent
|
||||
assert ev0.seq == 1
|
||||
assert ev0.value["text_not_null"] == create_message
|
||||
|
||||
ev1 = events[1]
|
||||
assert type(ev1) is UpdateEvent
|
||||
assert ev1.seq == 2
|
||||
assert ev1.value["text_not_null"] == update_message
|
||||
|
||||
ev2 = events[2]
|
||||
assert type(ev2) is DeleteEvent
|
||||
assert ev2.seq == 3
|
||||
assert ev2.value["text_not_null"] == update_message
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -203,6 +203,77 @@ class TokenState:
|
||||
return base
|
||||
|
||||
|
||||
class Event:
|
||||
seq: int | None
|
||||
|
||||
def __init__(self, seq: int | None):
|
||||
self.seq = seq
|
||||
|
||||
|
||||
class InsertEvent(Event):
|
||||
value: JSON_OBJECT
|
||||
|
||||
def __init__(self, seq: int | None, value: JSON_OBJECT):
|
||||
super().__init__(seq)
|
||||
self.value = value
|
||||
|
||||
|
||||
class UpdateEvent(Event):
|
||||
value: JSON_OBJECT
|
||||
|
||||
def __init__(self, seq: int | None, value: JSON_OBJECT):
|
||||
super().__init__(seq)
|
||||
self.value = value
|
||||
|
||||
|
||||
class DeleteEvent(Event):
|
||||
value: JSON_OBJECT
|
||||
|
||||
def __init__(self, seq: int | None, value: JSON_OBJECT):
|
||||
super().__init__(seq)
|
||||
self.value = value
|
||||
|
||||
|
||||
class ErrorEvent(Event):
|
||||
status: int
|
||||
message: str | None
|
||||
|
||||
def __init__(self, seq: int | None, status: int, message: str | None):
|
||||
super().__init__(seq)
|
||||
self.status = status
|
||||
self.message = message
|
||||
|
||||
|
||||
EVENT_ERROR_STATUS_UNKNOWN = 0
|
||||
EVENT_ERROR_STATUS_FORBIDDEN = 1
|
||||
EVENT_ERROR_STATUS_LOSS = 2
|
||||
|
||||
|
||||
EVENT: TypeAlias = UpdateEvent | InsertEvent | DeleteEvent | ErrorEvent
|
||||
|
||||
|
||||
def parseEvent(obj: JSON_OBJECT) -> EVENT | None:
|
||||
seq = cast(int | None, obj["seq"])
|
||||
|
||||
insert = obj.get("Insert")
|
||||
if insert is not None:
|
||||
return InsertEvent(seq, cast(JSON_OBJECT, insert))
|
||||
|
||||
update = obj.get("Update")
|
||||
if update is not None:
|
||||
return UpdateEvent(seq, cast(JSON_OBJECT, update))
|
||||
|
||||
delete = obj.get("Delete")
|
||||
if delete is not None:
|
||||
return DeleteEvent(seq, cast(JSON_OBJECT, delete))
|
||||
|
||||
error = cast(JSON_OBJECT | None, obj.get("Error"))
|
||||
if error is not None:
|
||||
return ErrorEvent(seq, cast(int, error["status"]), cast(str | None, error.get("message")))
|
||||
|
||||
raise Exception(f"Failed to parse event: {obj}")
|
||||
|
||||
|
||||
class Transport(ABC):
|
||||
@abstractmethod
|
||||
def fetch(
|
||||
@@ -639,23 +710,28 @@ class RecordApi:
|
||||
method="DELETE",
|
||||
)
|
||||
|
||||
def subscribe(self, recordId: RecordId | str | int) -> typing.Generator[JSON_OBJECT]:
|
||||
def subscribe(self, recordId: RecordId | str | int) -> typing.Generator[EVENT]:
|
||||
id = repr(recordId) if isinstance(recordId, RecordId) else f"{recordId}"
|
||||
context = self._client.stream(
|
||||
f"{self._recordApi}/{self._name}/subscribe/{id}", timeout=httpx.Timeout(None)
|
||||
)
|
||||
|
||||
def impl() -> typing.Generator[JSON_OBJECT]:
|
||||
def impl() -> typing.Generator[EVENT]:
|
||||
with context as response:
|
||||
if response.status_code > 200:
|
||||
raise FetchException(response.status_code, response.text)
|
||||
|
||||
for line in response.iter_lines():
|
||||
if line.startswith("data: "):
|
||||
yield json.loads(line.rstrip("\n")[6:])
|
||||
ev = parseEvent(json.loads(line.rstrip("\n")[6:]))
|
||||
if ev is not None:
|
||||
yield ev
|
||||
|
||||
return impl()
|
||||
|
||||
def subscribe_all(self) -> typing.Generator[EVENT]:
|
||||
return self.subscribe("*")
|
||||
|
||||
|
||||
def _refreshTokensImpl(transport: Transport, refreshToken: str) -> TokenState:
|
||||
response = transport.fetch(
|
||||
|
||||
Reference in New Issue
Block a user