diff --git a/services/eventhistory/pkg/service/service.go b/services/eventhistory/pkg/service/service.go index 7f972ad79e..b95a24d386 100644 --- a/services/eventhistory/pkg/service/service.go +++ b/services/eventhistory/pkg/service/service.go @@ -2,6 +2,7 @@ package service import ( "context" + "encoding/json" "fmt" "regexp" @@ -13,6 +14,13 @@ import ( "go-micro.dev/v4/store" ) +// StoreEvent is data structure in the store +type StoreEvent struct { + ID string + Type string + Event []byte +} + // EventHistoryService is the service responsible for event history type EventHistoryService struct { ch <-chan events.Event @@ -41,9 +49,18 @@ func NewEventHistoryService(cfg *config.Config, consumer events.Consumer, store // StoreEvents consumes all events and stores them in the store. Will block func (eh *EventHistoryService) StoreEvents() { for event := range eh.ch { + ev, err := json.Marshal(StoreEvent{ + ID: event.ID, + Type: event.Type, + Event: event.Event.([]byte), + }) + if err != nil { + eh.log.Error().Err(err).Str("eventid", event.ID).Msg("could not marshal event") + continue + } if err := eh.store.Write(&store.Record{ Key: event.ID, - Value: event.Event.([]byte), + Value: ev, Expiry: eh.cfg.Store.RecordExpiry, Metadata: map[string]interface{}{ "type": event.Type, @@ -51,7 +68,7 @@ func (eh *EventHistoryService) StoreEvents() { }); err != nil { // we can't store. That's it for us. eh.log.Error().Err(err).Str("eventid", event.ID).Msg("could not store event") - return + continue } } } @@ -59,19 +76,12 @@ func (eh *EventHistoryService) StoreEvents() { // GetEvents allows to retrieve events from the eventstore by id func (eh *EventHistoryService) GetEvents(ctx context.Context, req *ehsvc.GetEventsRequest, resp *ehsvc.GetEventsResponse) error { for _, id := range req.Ids { - evs, err := eh.store.Read(id) + ev, err := eh.getEvent(id) if err != nil { - if err != store.ErrNotFound { - eh.log.Error().Err(err).Str("eventid", id).Msg("could not read event") - } continue } - resp.Events = append(resp.Events, &ehmsg.Event{ - Id: id, - Event: evs[0].Value, - Type: evs[0].Metadata["type"].(string), - }) + resp.Events = append(resp.Events, ev) } return nil @@ -97,20 +107,37 @@ func (eh *EventHistoryService) GetEventsForUser(ctx context.Context, req *ehsvc. } for _, i := range idx { - e, err := eh.store.Read(i) + e, err := eh.getEvent(i) if err != nil { - eh.log.Error().Err(err).Str("eventid", i).Msg("could not read event") continue } - if userID.Match(e[0].Value) { - resp.Events = append(resp.Events, &ehmsg.Event{ - Id: i, - Event: e[0].Value, - Type: e[0].Metadata["type"].(string), - }) + if userID.Match(e.Event) { + resp.Events = append(resp.Events, e) } } return nil } + +func (eh *EventHistoryService) getEvent(id string) (*ehmsg.Event, error) { + evs, err := eh.store.Read(id) + if err != nil { + if err != store.ErrNotFound { + eh.log.Error().Err(err).Str("eventid", id).Msg("could not read event") + } + return nil, err + } + + var ev StoreEvent + if err := json.Unmarshal(evs[0].Value, &ev); err != nil { + eh.log.Error().Err(err).Str("eventid", id).Msg("could not unmarshal event") + return nil, err + } + + return &ehmsg.Event{ + Id: ev.ID, + Event: ev.Event, + Type: ev.Type, + }, nil +} diff --git a/services/userlog/pkg/service/service.go b/services/userlog/pkg/service/service.go index f3f028aad4..deda66be72 100644 --- a/services/userlog/pkg/service/service.go +++ b/services/userlog/pkg/service/service.go @@ -166,7 +166,7 @@ func (ul *UserlogService) MemorizeEvents(ch <-chan events.Event) { func (ul *UserlogService) GetEvents(ctx context.Context, userid string) ([]*ehmsg.Event, error) { rec, err := ul.store.Read(userid) if err != nil && err != store.ErrNotFound { - ul.log.Fatal().Err(err).Str("userid", userid).Msg("failed to read record from database") + ul.log.Error().Err(err).Str("userid", userid).Msg("failed to read record from store") return nil, err } @@ -177,7 +177,7 @@ func (ul *UserlogService) GetEvents(ctx context.Context, userid string) ([]*ehms var eventIDs []string if err := json.Unmarshal(rec[0].Value, &eventIDs); err != nil { - ul.log.Fatal().Err(err).Str("userid", userid).Msg("failed to umarshal record from database") + ul.log.Error().Err(err).Str("userid", userid).Msg("failed to umarshal record from store") return nil, err }