mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-01-04 19:29:49 -06:00
Merge pull request #6057 from kobergj/DontPanicInUserlog
[tests-only] Avoid Metadata Functionality in Eventhistory
This commit is contained in:
@@ -2,6 +2,7 @@ package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"regexp"
|
||||
|
||||
@@ -13,6 +14,13 @@ import (
|
||||
"go-micro.dev/v4/store"
|
||||
)
|
||||
|
||||
// StoreEvent is data structure in the store
|
||||
type StoreEvent struct {
|
||||
ID string
|
||||
Type string
|
||||
Event []byte
|
||||
}
|
||||
|
||||
// EventHistoryService is the service responsible for event history
|
||||
type EventHistoryService struct {
|
||||
ch <-chan events.Event
|
||||
@@ -41,9 +49,18 @@ func NewEventHistoryService(cfg *config.Config, consumer events.Consumer, store
|
||||
// StoreEvents consumes all events and stores them in the store. Will block
|
||||
func (eh *EventHistoryService) StoreEvents() {
|
||||
for event := range eh.ch {
|
||||
ev, err := json.Marshal(StoreEvent{
|
||||
ID: event.ID,
|
||||
Type: event.Type,
|
||||
Event: event.Event.([]byte),
|
||||
})
|
||||
if err != nil {
|
||||
eh.log.Error().Err(err).Str("eventid", event.ID).Msg("could not marshal event")
|
||||
continue
|
||||
}
|
||||
if err := eh.store.Write(&store.Record{
|
||||
Key: event.ID,
|
||||
Value: event.Event.([]byte),
|
||||
Value: ev,
|
||||
Expiry: eh.cfg.Store.RecordExpiry,
|
||||
Metadata: map[string]interface{}{
|
||||
"type": event.Type,
|
||||
@@ -51,7 +68,7 @@ func (eh *EventHistoryService) StoreEvents() {
|
||||
}); err != nil {
|
||||
// we can't store. That's it for us.
|
||||
eh.log.Error().Err(err).Str("eventid", event.ID).Msg("could not store event")
|
||||
return
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -59,19 +76,12 @@ func (eh *EventHistoryService) StoreEvents() {
|
||||
// GetEvents allows to retrieve events from the eventstore by id
|
||||
func (eh *EventHistoryService) GetEvents(ctx context.Context, req *ehsvc.GetEventsRequest, resp *ehsvc.GetEventsResponse) error {
|
||||
for _, id := range req.Ids {
|
||||
evs, err := eh.store.Read(id)
|
||||
ev, err := eh.getEvent(id)
|
||||
if err != nil {
|
||||
if err != store.ErrNotFound {
|
||||
eh.log.Error().Err(err).Str("eventid", id).Msg("could not read event")
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
resp.Events = append(resp.Events, &ehmsg.Event{
|
||||
Id: id,
|
||||
Event: evs[0].Value,
|
||||
Type: evs[0].Metadata["type"].(string),
|
||||
})
|
||||
resp.Events = append(resp.Events, ev)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -97,20 +107,37 @@ func (eh *EventHistoryService) GetEventsForUser(ctx context.Context, req *ehsvc.
|
||||
}
|
||||
|
||||
for _, i := range idx {
|
||||
e, err := eh.store.Read(i)
|
||||
e, err := eh.getEvent(i)
|
||||
if err != nil {
|
||||
eh.log.Error().Err(err).Str("eventid", i).Msg("could not read event")
|
||||
continue
|
||||
}
|
||||
|
||||
if userID.Match(e[0].Value) {
|
||||
resp.Events = append(resp.Events, &ehmsg.Event{
|
||||
Id: i,
|
||||
Event: e[0].Value,
|
||||
Type: e[0].Metadata["type"].(string),
|
||||
})
|
||||
if userID.Match(e.Event) {
|
||||
resp.Events = append(resp.Events, e)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (eh *EventHistoryService) getEvent(id string) (*ehmsg.Event, error) {
|
||||
evs, err := eh.store.Read(id)
|
||||
if err != nil {
|
||||
if err != store.ErrNotFound {
|
||||
eh.log.Error().Err(err).Str("eventid", id).Msg("could not read event")
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var ev StoreEvent
|
||||
if err := json.Unmarshal(evs[0].Value, &ev); err != nil {
|
||||
eh.log.Error().Err(err).Str("eventid", id).Msg("could not unmarshal event")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &ehmsg.Event{
|
||||
Id: ev.ID,
|
||||
Event: ev.Event,
|
||||
Type: ev.Type,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -166,7 +166,7 @@ func (ul *UserlogService) MemorizeEvents(ch <-chan events.Event) {
|
||||
func (ul *UserlogService) GetEvents(ctx context.Context, userid string) ([]*ehmsg.Event, error) {
|
||||
rec, err := ul.store.Read(userid)
|
||||
if err != nil && err != store.ErrNotFound {
|
||||
ul.log.Fatal().Err(err).Str("userid", userid).Msg("failed to read record from database")
|
||||
ul.log.Error().Err(err).Str("userid", userid).Msg("failed to read record from store")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -177,7 +177,7 @@ func (ul *UserlogService) GetEvents(ctx context.Context, userid string) ([]*ehms
|
||||
|
||||
var eventIDs []string
|
||||
if err := json.Unmarshal(rec[0].Value, &eventIDs); err != nil {
|
||||
ul.log.Fatal().Err(err).Str("userid", userid).Msg("failed to umarshal record from database")
|
||||
ul.log.Error().Err(err).Str("userid", userid).Msg("failed to umarshal record from store")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user