mirror of
https://github.com/trailbaseio/trailbase.git
synced 2026-05-11 20:09:41 -05:00
Go client: support structured change event errors and sequence numbers.
This commit is contained in:
@@ -94,39 +94,6 @@ func NewTokenState(tokens *Tokens) (*TokenState, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
type ValueEvent interface {
|
||||
Value() *map[string]any
|
||||
}
|
||||
|
||||
type InsertEvent struct {
|
||||
value map[string]any
|
||||
}
|
||||
|
||||
func (ev *InsertEvent) Value() *map[string]any {
|
||||
return &ev.value
|
||||
}
|
||||
|
||||
type UpdateEvent struct {
|
||||
value map[string]any
|
||||
}
|
||||
|
||||
func (ev *UpdateEvent) Value() *map[string]any {
|
||||
return &ev.value
|
||||
}
|
||||
|
||||
type DeleteEvent struct {
|
||||
value map[string]any
|
||||
}
|
||||
|
||||
func (ev *DeleteEvent) Value() *map[string]any {
|
||||
return &ev.value
|
||||
}
|
||||
|
||||
type Event struct {
|
||||
Value ValueEvent
|
||||
Error *string
|
||||
}
|
||||
|
||||
func NewClient(baseUrl string) (*Client, error) {
|
||||
return NewClientWithTokens(baseUrl, nil)
|
||||
}
|
||||
@@ -417,41 +384,13 @@ func (c *Client) stream(method string, path string, body []byte, queryParams []Q
|
||||
defer close(stream)
|
||||
|
||||
for scanner.Scan() {
|
||||
b := scanner.Bytes()
|
||||
if !bytes.HasPrefix(b, []byte("data: ")) {
|
||||
continue
|
||||
}
|
||||
|
||||
var evMap map[string]any
|
||||
err = json.Unmarshal(b[6:], &evMap)
|
||||
event, err := parseEvent(scanner.Bytes())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if val, ok := evMap["Error"]; ok {
|
||||
var errString string = val.(string)
|
||||
stream <- Event{
|
||||
Error: &errString,
|
||||
}
|
||||
continue
|
||||
} else if val, ok := evMap["Insert"]; ok {
|
||||
stream <- Event{
|
||||
Value: &InsertEvent{
|
||||
value: val.(map[string]any),
|
||||
},
|
||||
}
|
||||
} else if val, ok := evMap["Update"]; ok {
|
||||
stream <- Event{
|
||||
Value: &UpdateEvent{
|
||||
value: val.(map[string]any),
|
||||
},
|
||||
}
|
||||
} else if val, ok := evMap["Delete"]; ok {
|
||||
stream <- Event{
|
||||
Value: &DeleteEvent{
|
||||
value: val.(map[string]any),
|
||||
},
|
||||
}
|
||||
if event != nil {
|
||||
stream <- *event
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -0,0 +1,109 @@
|
||||
package trailbase
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
||||
"encoding/json"
|
||||
)
|
||||
|
||||
type ValueEvent interface {
|
||||
Value() *map[string]any
|
||||
}
|
||||
|
||||
type InsertEvent struct {
|
||||
value map[string]any
|
||||
}
|
||||
|
||||
func (ev *InsertEvent) Value() *map[string]any {
|
||||
return &ev.value
|
||||
}
|
||||
|
||||
type UpdateEvent struct {
|
||||
value map[string]any
|
||||
}
|
||||
|
||||
func (ev *UpdateEvent) Value() *map[string]any {
|
||||
return &ev.value
|
||||
}
|
||||
|
||||
type DeleteEvent struct {
|
||||
value map[string]any
|
||||
}
|
||||
|
||||
func (ev *DeleteEvent) Value() *map[string]any {
|
||||
return &ev.value
|
||||
}
|
||||
|
||||
type ErrorEvent struct {
|
||||
Status int64
|
||||
Message *string
|
||||
}
|
||||
|
||||
type Event struct {
|
||||
Seq *int64
|
||||
Value ValueEvent
|
||||
Error *ErrorEvent
|
||||
}
|
||||
|
||||
func parseEvent(msg []byte) (*Event, error) {
|
||||
if !bytes.HasPrefix(msg, []byte("data: ")) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var evMap map[string]any
|
||||
err := json.Unmarshal(msg[6:], &evMap)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var seq *int64 = nil
|
||||
seqf, ok := evMap["seq"].(float64)
|
||||
if ok {
|
||||
seqi := int64(seqf)
|
||||
seq = &seqi
|
||||
}
|
||||
|
||||
if val, ok := evMap["Error"]; ok {
|
||||
var errObj = val.(map[string]any)
|
||||
var msg, ok = errObj["message"].(string)
|
||||
if ok {
|
||||
return &Event{
|
||||
Seq: seq,
|
||||
Error: &ErrorEvent{
|
||||
Status: int64(errObj["status"].(float64)),
|
||||
Message: &msg,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
return &Event{
|
||||
Seq: seq,
|
||||
Error: &ErrorEvent{
|
||||
Status: int64(errObj["status"].(float64)),
|
||||
},
|
||||
}, nil
|
||||
} else if val, ok := evMap["Insert"]; ok {
|
||||
return &Event{
|
||||
Seq: seq,
|
||||
Value: &InsertEvent{
|
||||
value: val.(map[string]any),
|
||||
},
|
||||
}, nil
|
||||
} else if val, ok := evMap["Update"]; ok {
|
||||
return &Event{
|
||||
Seq: seq,
|
||||
Value: &UpdateEvent{
|
||||
value: val.(map[string]any),
|
||||
},
|
||||
}, nil
|
||||
} else if val, ok := evMap["Delete"]; ok {
|
||||
return &Event{
|
||||
Seq: seq,
|
||||
Value: &DeleteEvent{
|
||||
value: val.(map[string]any),
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
package trailbase
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
)
|
||||
|
||||
@@ -22,7 +23,7 @@ func TestFilter(t *testing.T) {
|
||||
Value: "value",
|
||||
}.toParams("filter")
|
||||
expected0 := []QueryParam{
|
||||
QueryParam{key: "filter[col]", value: "value"},
|
||||
{key: "filter[col]", value: "value"},
|
||||
}
|
||||
if !testEq(got0, expected0) {
|
||||
t.Fatal("unexpected filter, got:", got0, " expected: ", expected0)
|
||||
@@ -51,12 +52,93 @@ func TestFilter(t *testing.T) {
|
||||
},
|
||||
}.toParams("filter")
|
||||
expected1 := []QueryParam{
|
||||
QueryParam{key: "filter[$and][0][col0]", value: "val0"},
|
||||
QueryParam{key: "filter[$and][1][$or][0][col1][$ne]", value: "val1"},
|
||||
QueryParam{key: "filter[$and][1][$or][1][col2][$lt]", value: "val2"},
|
||||
{key: "filter[$and][0][col0]", value: "val0"},
|
||||
{key: "filter[$and][1][$or][0][col1][$ne]", value: "val1"},
|
||||
{key: "filter[$and][1][$or][1][col2][$lt]", value: "val2"},
|
||||
}
|
||||
|
||||
if !testEq(got1, expected1) {
|
||||
t.Fatal("unexpected filter, got:", got1, " expected: ", expected1)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventParsing(t *testing.T) {
|
||||
{
|
||||
errJson := `
|
||||
{
|
||||
"Error": {
|
||||
"status": 1,
|
||||
"message": "test"
|
||||
},
|
||||
"seq": 3
|
||||
}`
|
||||
|
||||
errEvent, err := parseEvent(fmt.Append([]byte("data: "), errJson))
|
||||
if err != nil {
|
||||
t.Fatal("Got err", err)
|
||||
}
|
||||
if errEvent == nil {
|
||||
t.Fatal("Expected event, got nil")
|
||||
}
|
||||
|
||||
msg := errEvent.Error.Message
|
||||
if *msg != "test" {
|
||||
t.Fatal("Expected message is 'test'")
|
||||
}
|
||||
|
||||
if *errEvent.Seq != 3 {
|
||||
t.Fatal("Expected Seq of 3, got:", errEvent.Seq)
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
errJson := `
|
||||
{
|
||||
"Error": {
|
||||
"status": 1
|
||||
}
|
||||
}`
|
||||
|
||||
errEvent, err := parseEvent(fmt.Append([]byte("data: "), errJson))
|
||||
if err != nil {
|
||||
t.Fatal("Got err", err)
|
||||
}
|
||||
if errEvent == nil {
|
||||
t.Fatal("Expected event, got nil")
|
||||
}
|
||||
|
||||
msg := errEvent.Error.Message
|
||||
if msg != nil {
|
||||
t.Fatal("expected nil message, got ", msg)
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
updateJson := `
|
||||
{
|
||||
"Update": {
|
||||
"col0": 5
|
||||
},
|
||||
"seq": 4
|
||||
}`
|
||||
|
||||
updateEvent, err := parseEvent(fmt.Append([]byte("data: "), updateJson))
|
||||
if err != nil {
|
||||
t.Fatal("Got err", err)
|
||||
}
|
||||
if updateEvent == nil {
|
||||
t.Fatal("Expected event, got nil")
|
||||
}
|
||||
|
||||
if updateEvent.Error != nil {
|
||||
t.Fatal("expected event got error", updateEvent.Error)
|
||||
}
|
||||
|
||||
if *updateEvent.Seq != 4 {
|
||||
t.Fatal("expected Seq=4")
|
||||
}
|
||||
if updateEvent.Value == nil {
|
||||
t.Fatal("expected update value")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user