add sses to userlog

Signed-off-by: jkoberg <jkoberg@owncloud.com>
This commit is contained in:
jkoberg
2023-04-03 17:14:16 +02:00
parent c9a9fddef1
commit d2b873acff
5 changed files with 72 additions and 37 deletions

View File

@@ -4,7 +4,6 @@ import (
"bytes"
"context"
"embed"
"errors"
"fmt"
"io/fs"
"strings"
@@ -20,7 +19,6 @@ import (
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/leonelquinteros/gotext"
ehmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/eventhistory/v0"
)
//go:embed l10n/locale
@@ -56,7 +54,6 @@ type Converter struct {
gatewaySelector pool.Selectable[gateway.GatewayAPIClient]
machineAuthAPIKey string
serviceName string
registeredEvents map[string]events.Unmarshaller
translationPath string
// cached within one request not to query other service too much
@@ -67,13 +64,12 @@ type Converter struct {
}
// NewConverter returns a new Converter
func NewConverter(loc string, gatewaySelector pool.Selectable[gateway.GatewayAPIClient], machineAuthAPIKey string, name string, translationPath string, registeredEvents map[string]events.Unmarshaller) *Converter {
func NewConverter(loc string, gatewaySelector pool.Selectable[gateway.GatewayAPIClient], machineAuthAPIKey string, name string, translationPath string) *Converter {
return &Converter{
locale: loc,
gatewaySelector: gatewaySelector,
machineAuthAPIKey: machineAuthAPIKey,
serviceName: name,
registeredEvents: registeredEvents,
translationPath: translationPath,
spaces: make(map[string]*storageprovider.StorageSpace),
users: make(map[string]*user.User),
@@ -83,20 +79,8 @@ func NewConverter(loc string, gatewaySelector pool.Selectable[gateway.GatewayAPI
}
// ConvertEvent converts an eventhistory event to an OC10Notification
func (c *Converter) ConvertEvent(event *ehmsg.Event) (OC10Notification, error) {
etype, ok := c.registeredEvents[event.Type]
if !ok {
// this should not happen
return OC10Notification{}, errors.New("eventtype not registered")
}
einterface, err := etype.Unmarshal(event.Event)
if err != nil {
// this shouldn't happen either
return OC10Notification{}, errors.New("cant unmarshal event")
}
switch ev := einterface.(type) {
func (c *Converter) ConvertEvent(eventid string, event interface{}) (OC10Notification, error) {
switch ev := event.(type) {
default:
return OC10Notification{}, fmt.Errorf("unknown event type: %T", ev)
// file related
@@ -104,31 +88,31 @@ func (c *Converter) ConvertEvent(event *ehmsg.Event) (OC10Notification, error) {
switch ev.FinishedStep {
case events.PPStepAntivirus:
res := ev.Result.(events.VirusscanResult)
return c.virusMessage(event.Id, VirusFound, ev.ExecutingUser, res.ResourceID, ev.Filename, res.Description, res.Scandate)
return c.virusMessage(eventid, VirusFound, ev.ExecutingUser, res.ResourceID, ev.Filename, res.Description, res.Scandate)
case events.PPStepPolicies:
return c.policiesMessage(event.Id, PoliciesEnforced, ev.ExecutingUser, ev.Filename, time.Now())
return c.policiesMessage(eventid, PoliciesEnforced, ev.ExecutingUser, ev.Filename, time.Now())
default:
return OC10Notification{}, fmt.Errorf("unknown postprocessing step: %s", ev.FinishedStep)
}
// space related
case events.SpaceDisabled:
return c.spaceMessage(event.Id, SpaceDisabled, ev.Executant, ev.ID.GetOpaqueId(), ev.Timestamp)
return c.spaceMessage(eventid, SpaceDisabled, ev.Executant, ev.ID.GetOpaqueId(), ev.Timestamp)
case events.SpaceDeleted:
return c.spaceDeletedMessage(event.Id, ev.Executant, ev.ID.GetOpaqueId(), ev.SpaceName, ev.Timestamp)
return c.spaceDeletedMessage(eventid, ev.Executant, ev.ID.GetOpaqueId(), ev.SpaceName, ev.Timestamp)
case events.SpaceShared:
return c.spaceMessage(event.Id, SpaceShared, ev.Executant, ev.ID.GetOpaqueId(), ev.Timestamp)
return c.spaceMessage(eventid, SpaceShared, ev.Executant, ev.ID.GetOpaqueId(), ev.Timestamp)
case events.SpaceUnshared:
return c.spaceMessage(event.Id, SpaceUnshared, ev.Executant, ev.ID.GetOpaqueId(), ev.Timestamp)
return c.spaceMessage(eventid, SpaceUnshared, ev.Executant, ev.ID.GetOpaqueId(), ev.Timestamp)
case events.SpaceMembershipExpired:
return c.spaceMessage(event.Id, SpaceMembershipExpired, ev.SpaceOwner, ev.SpaceID.GetOpaqueId(), ev.ExpiredAt)
return c.spaceMessage(eventid, SpaceMembershipExpired, ev.SpaceOwner, ev.SpaceID.GetOpaqueId(), ev.ExpiredAt)
// share related
case events.ShareCreated:
return c.shareMessage(event.Id, ShareCreated, ev.Executant, ev.ItemID, ev.ShareID, utils.TSToTime(ev.CTime))
return c.shareMessage(eventid, ShareCreated, ev.Executant, ev.ItemID, ev.ShareID, utils.TSToTime(ev.CTime))
case events.ShareExpired:
return c.shareMessage(event.Id, ShareExpired, ev.ShareOwner, ev.ItemID, ev.ShareID, ev.ExpiredAt)
return c.shareMessage(eventid, ShareExpired, ev.ShareOwner, ev.ItemID, ev.ShareID, ev.ExpiredAt)
case events.ShareRemoved:
return c.shareMessage(event.Id, ShareRemoved, ev.Executant, ev.ItemID, ev.ShareID, ev.Timestamp)
return c.shareMessage(eventid, ShareRemoved, ev.Executant, ev.ItemID, ev.ShareID, ev.Timestamp)
}
}

View File

@@ -4,6 +4,7 @@ import (
"encoding/json"
"net/http"
"github.com/cs3org/reva/v2/pkg/ctx"
revactx "github.com/cs3org/reva/v2/pkg/ctx"
)
@@ -31,11 +32,20 @@ func (ul *UserlogService) HandleGetEvents(w http.ResponseWriter, r *http.Request
return
}
conv := NewConverter(r.Header.Get(HeaderAcceptLanguage), ul.gatewaySelector, ul.cfg.MachineAuthAPIKey, ul.cfg.Service.Name, ul.cfg.TranslationPath, ul.registeredEvents)
conv := NewConverter(r.Header.Get(HeaderAcceptLanguage), ul.gatewaySelector, ul.cfg.MachineAuthAPIKey, ul.cfg.Service.Name, ul.cfg.TranslationPath)
resp := GetEventResponseOC10{}
for _, e := range evs {
noti, err := conv.ConvertEvent(e)
etype, ok := ul.registeredEvents[e.Type]
if !ok {
// this should not happen
}
einterface, err := etype.Unmarshal(e.Event)
if err != nil {
// this shouldn't happen either
}
noti, err := conv.ConvertEvent(e.Id, einterface)
if err != nil {
ul.log.Error().Err(err).Str("eventid", e.Id).Str("eventtype", e.Type).Msg("failed to convert event")
continue
@@ -49,6 +59,31 @@ func (ul *UserlogService) HandleGetEvents(w http.ResponseWriter, r *http.Request
w.Write(b)
}
// HandleSSE is the GET handler for events
func (ul *UserlogService) HandleSSE(w http.ResponseWriter, r *http.Request) {
u, ok := ctx.ContextGetUser(r.Context())
if !ok {
w.WriteHeader(http.StatusInternalServerError)
return
}
uid := u.GetId().GetOpaqueId()
if uid == "" {
w.WriteHeader(http.StatusInternalServerError)
return
}
stream := ul.sse.CreateStream(uid)
stream.AutoReplay = false
// add stream to URL
q := r.URL.Query()
q.Set("stream", uid)
r.URL.RawQuery = q.Encode()
ul.sse.ServeHTTP(w, r)
}
// HandleDeleteEvents is the DELETE handler for events
func (ul *UserlogService) HandleDeleteEvents(w http.ResponseWriter, r *http.Request) {
u, ok := revactx.ContextGetUser(r.Context())

View File

@@ -21,6 +21,7 @@ import (
ehmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/eventhistory/v0"
ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0"
"github.com/owncloud/ocis/v2/services/userlog/pkg/config"
"github.com/r3labs/sse/v2"
"go-micro.dev/v4/store"
"google.golang.org/grpc/metadata"
)
@@ -33,6 +34,7 @@ type UserlogService struct {
cfg *config.Config
historyClient ehsvc.EventHistoryService
gatewaySelector pool.Selectable[gateway.GatewayAPIClient]
sse *sse.Server
registeredEvents map[string]events.Unmarshaller
translationPath string
}
@@ -60,6 +62,7 @@ func NewUserlogService(opts ...Option) (*UserlogService, error) {
cfg: o.Config,
historyClient: o.HistoryClient,
gatewaySelector: o.GatewaySelector,
sse: sse.New(),
registeredEvents: make(map[string]events.Unmarshaller),
}
@@ -68,9 +71,10 @@ func NewUserlogService(opts ...Option) (*UserlogService, error) {
ul.registeredEvents[typ.String()] = e
}
ul.m.Route("/", func(r chi.Router) {
r.Get("/*", ul.HandleGetEvents)
r.Delete("/*", ul.HandleDeleteEvents)
ul.m.Route("/ocs/v2.php/apps/notifications/api/v1/notifications", func(r chi.Router) {
r.Get("/", ul.HandleGetEvents)
r.Get("/sse", ul.HandleSSE)
r.Delete("/", ul.HandleDeleteEvents)
})
go ul.MemorizeEvents(ch)
@@ -155,7 +159,7 @@ func (ul *UserlogService) MemorizeEvents(ch <-chan events.Event) {
// III) store the eventID for each user
for _, id := range users {
if err := ul.addEventsToUser(id, event.ID); err != nil {
if err := ul.addEventToUser(id, event); err != nil {
ul.log.Error().Err(err).Str("userID", id).Str("eventid", event.ID).Msg("failed to store event for user")
continue
}
@@ -219,9 +223,14 @@ func (ul *UserlogService) DeleteEvents(userid string, evids []string) error {
})
}
func (ul *UserlogService) addEventsToUser(userid string, eventids ...string) error {
func (ul *UserlogService) addEventToUser(userid string, event events.Event) error {
loc := "en" // TODO: where to get the locale from?
ev, _ := NewConverter(loc, ul.gatewaySelector, ul.cfg.MachineAuthAPIKey, ul.cfg.Service.Name, ul.cfg.TranslationPath).ConvertEvent(event.ID, event.Event)
b, _ := json.Marshal(ev)
ul.sse.Publish(userid, &sse.Event{Data: b})
return ul.alterUserEventList(userid, func(ids []string) []string {
return append(ids, eventids...)
return append(ids, event.ID)
})
}