sharpen userlog service

Signed-off-by: jkoberg <jkoberg@owncloud.com>
This commit is contained in:
jkoberg
2023-02-08 12:38:04 +01:00
parent d7f57f3a50
commit a9561d85c8
17 changed files with 1027 additions and 93 deletions

View File

@@ -79,6 +79,7 @@ config = {
"services/storage-users",
"services/store",
"services/thumbnails",
"services/userlog",
"services/users",
"services/web",
"services/webdav",

View File

@@ -101,6 +101,11 @@ func DefaultPolicies() []config.Policy {
Endpoint: "/archiver",
Service: "com.owncloud.web.frontend",
},
{
// reroute oc10 notifications endpoint to userlog service
Endpoint: "/ocs/v2.php/apps/notifications/api/v1/notifications",
Service: "com.owncloud.userlog.userlog",
},
{
Type: config.RegexRoute,
Endpoint: "/ocs/v[12].php/cloud/user/signing-key", // only `user/signing-key` is left in ocis-ocs
@@ -202,10 +207,6 @@ func DefaultPolicies() []config.Policy {
Endpoint: "/api/v0/settings",
Service: "com.owncloud.web.settings",
},
{
Endpoint: "/api/v0/activities",
Service: "com.owncloud.userlog.userlog",
},
},
},
}

View File

@@ -24,7 +24,8 @@ docs-generate: config-docs-generate
include ../../.make/generate.mk
.PHONY: ci-go-generate
ci-go-generate: # CI runs ci-node-generate automatically before this target
ci-go-generate: $(MOCKERY) # CI runs ci-node-generate automatically before this target
$(MOCKERY) --dir ../../protogen/gen/ocis/services/eventhistory/v0 --case underscore --name EventHistoryService
.PHONY: ci-node-generate
ci-node-generate:

View File

@@ -1,11 +1,33 @@
# Userlog service
# Userlog Service
The `userlog` service provides a way to configure which events a user wants to be informed about and an API to retrieve them.
The `userlog` service is a mediator between the `eventhistory` service and clients who want to be informed about user related events. It provides an API to retrieve those.
## Prerequisites
Running the `userlog` service without running the `eventhistory` service is not possible.
## Storing
The `userlog` service persists information via the configured store in `USERLOG_STORE_TYPE`. Possible stores are:
- `mem`: Basic in-memory store and the default.
- `ocmem`: Advanced in-memory store allowing max size.
- `redis`: Stores data in a configured redis cluster.
- `etcd`: Stores data in a configured etcd cluster.
- `nats-js`: Stores data using key-value-store feature of [nats jetstream](https://docs.nats.io/nats-concepts/jetstream/key-value-store)
- `noop`: Stores nothing. Useful for testing. Not recommended in productive enviroments.
1. Note that in-memory stores are by nature not reboot persistent.
2. Though usually not necessary, a database name and a database table can be configured for event stores if the event store supports this. Generally not applicapable for stores of type `in-memory`. These settings are blank by default which means that the standard settings of the configured store applies.
3. The userlog service can be scaled if not using `in-memory` stores and the stores are configured identically over all instances.
## Configuring
The `userlog` service has hardcoded configuration for now.
For the time being, the configuration which user related events are of interest is hardcoded and cannot be changed.
## Retrieving
The `userlog` service provides an API to retrieve configured events.
The `userlog` service provides an API to retrieve configured events. For now, this API is mostly following the [oc10 notification GET API](https://doc.owncloud.com/server/next/developer_manual/core/apis/ocs-notification-endpoint-v1.html#get-user-notifications).
## Deleting
To delete events for an user, use a `DELETE` request to `ocs/v2.php/apps/notifications/api/v1/notifications` containing the IDs to delete.

View File

@@ -0,0 +1,63 @@
// Code generated by mockery v2.14.1. DO NOT EDIT.
package mocks
import (
context "context"
client "go-micro.dev/v4/client"
mock "github.com/stretchr/testify/mock"
v0 "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0"
)
// EventHistoryService is an autogenerated mock type for the EventHistoryService type
type EventHistoryService struct {
mock.Mock
}
// GetEvents provides a mock function with given fields: ctx, in, opts
func (_m *EventHistoryService) GetEvents(ctx context.Context, in *v0.GetEventsRequest, opts ...client.CallOption) (*v0.GetEventsResponse, error) {
_va := make([]interface{}, len(opts))
for _i := range opts {
_va[_i] = opts[_i]
}
var _ca []interface{}
_ca = append(_ca, ctx, in)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 *v0.GetEventsResponse
if rf, ok := ret.Get(0).(func(context.Context, *v0.GetEventsRequest, ...client.CallOption) *v0.GetEventsResponse); ok {
r0 = rf(ctx, in, opts...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*v0.GetEventsResponse)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *v0.GetEventsRequest, ...client.CallOption) error); ok {
r1 = rf(ctx, in, opts...)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
type mockConstructorTestingTNewEventHistoryService interface {
mock.TestingT
Cleanup(func())
}
// NewEventHistoryService creates a new instance of EventHistoryService. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewEventHistoryService(t mockConstructorTestingTNewEventHistoryService) *EventHistoryService {
mock := &EventHistoryService{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@@ -43,7 +43,7 @@ type SutureService struct {
// NewSutureService creates a new userlog.SutureService
func NewSutureService(cfg *ociscfg.Config) suture.Service {
cfg.Notifications.Commons = cfg.Commons
cfg.Userlog.Commons = cfg.Commons
return SutureService{
cfg: cfg.Userlog,
}

View File

@@ -3,25 +3,58 @@ package command
import (
"context"
"fmt"
"strings"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/events/stream"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/oklog/run"
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
"github.com/owncloud/ocis/v2/ocis-pkg/service/grpc"
ogrpc "github.com/owncloud/ocis/v2/ocis-pkg/service/grpc"
"github.com/owncloud/ocis/v2/ocis-pkg/store"
"github.com/owncloud/ocis/v2/ocis-pkg/version"
ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0"
"github.com/owncloud/ocis/v2/services/userlog/pkg/config"
"github.com/owncloud/ocis/v2/services/userlog/pkg/config/parser"
"github.com/owncloud/ocis/v2/services/userlog/pkg/logging"
"github.com/owncloud/ocis/v2/services/userlog/pkg/metrics"
"github.com/owncloud/ocis/v2/services/userlog/pkg/server/http"
"github.com/urfave/cli/v2"
"go-micro.dev/v4/store"
)
// all events we care about
var _registeredEvents = []events.Unmarshaller{
// file related
events.UploadReady{},
events.ContainerCreated{},
events.FileTouched{},
events.FileDownloaded{},
events.FileVersionRestored{},
events.ItemMoved{},
events.ItemTrashed{},
events.ItemPurged{},
events.ItemRestored{},
// space related
events.SpaceCreated{},
events.SpaceRenamed{},
events.SpaceEnabled{},
events.SpaceDisabled{},
events.SpaceDeleted{},
events.SpaceShared{},
events.SpaceUnshared{},
events.SpaceUpdated{},
events.SpaceMembershipExpired{},
// share related
events.ShareCreated{},
// events.ShareRemoved{}, // TODO: ShareRemoved doesn't hold sharee information
events.ShareUpdated{},
events.ShareExpired{},
events.LinkCreated{},
// events.LinkRemoved{}, // TODO: LinkRemoved doesn't hold sharee information
events.LinkUpdated{},
}
// Server is the entrypoint for the server command.
@@ -48,7 +81,9 @@ func Server(cfg *config.Config) *cli.Command {
}
return context.WithCancel(cfg.Context)
}()
mtrcs := metrics.New()
mtrcs.BuildInfo.WithLabelValues(version.GetString()).Set(1)
defer cancel()
@@ -57,15 +92,27 @@ func Server(cfg *config.Config) *cli.Command {
return err
}
var st store.Store
switch cfg.Store.Type {
case "inmemory":
st = store.NewMemoryStore()
default:
return fmt.Errorf("unknown store '%s' configured", cfg.Store.Type)
st := store.Create(
store.Type(cfg.Store.Type),
store.Addresses(strings.Split(cfg.Store.Addresses, ",")...),
store.Database(cfg.Store.Database),
store.Table(cfg.Store.Table),
)
tm, err := pool.StringToTLSMode(cfg.GRPCClientTLS.Mode)
if err != nil {
return err
}
gwclient, err := pool.GetGatewayServiceClient(
cfg.RevaGateway,
pool.WithTLSCACert(cfg.GRPCClientTLS.CACert),
pool.WithTLSMode(tm),
)
if err != nil {
return fmt.Errorf("could not get reva client: %s", err)
}
mtrcs.BuildInfo.WithLabelValues(version.GetString()).Set(1)
hClient := ehsvc.NewEventHistoryService("com.owncloud.api.eventhistory", grpc.DefaultClient())
{
server, err := http.Server(
@@ -75,6 +122,8 @@ func Server(cfg *config.Config) *cli.Command {
http.Metrics(mtrcs),
http.Store(st),
http.Consumer(consumer),
http.Gateway(gwclient),
http.History(hClient),
http.RegisteredEvents(_registeredEvents),
)

View File

@@ -2,7 +2,6 @@ package config
import (
"context"
"time"
"github.com/owncloud/ocis/v2/ocis-pkg/shared"
)
@@ -19,16 +18,21 @@ type Config struct {
HTTP HTTP `yaml:"http"`
GRPCClientTLS *shared.GRPCClientTLS `yaml:"grpc_client_tls"`
Events Events `yaml:"events"`
Store Store `yaml:"store"`
MachineAuthAPIKey string `yaml:"machine_auth_api_key" env:"OCIS_MACHINE_AUTH_API_KEY;USERLOG_MACHINE_AUTH_API_KEY" desc:"Machine auth API key used to validate internal requests necessary to access resources from other services."`
RevaGateway string `yaml:"reva_gateway" env:"REVA_GATEWAY" desc:"CS3 gateway used to look up user metadata"`
Events Events `yaml:"events"`
Store Store `yaml:"store"`
Context context.Context `yaml:"-"`
}
// Store configures the store to use
type Store struct {
Type string `yaml:"type" env:"USERLOG_STORE_TYPE" desc:"The type of the store. Supported is inmemory"`
RecordExpiry time.Duration `yaml:"record_expiry" env:"USERLOG_RECORD_EXPIRY" desc:"time to life for events in the store"`
Type string `yaml:"type" env:"USERLOG_STORE_TYPE" desc:"The type of the userlog store. Supported values are: 'mem', 'ocmem', 'etcd', 'redis', 'nats-js', 'noop'. See the text description for details."`
Addresses string `yaml:"addresses" env:"USERLOG_STORE_ADDRESSES" desc:"A comma separated list of addresses to access the configured store. This has no effect when 'in-memory' stores are configured. Note that the behaviour how addresses are used is dependent on the library of the configured store."`
Database string `yaml:"database" env:"USERLOG_STORE_DATABASE" desc:"(optional) The database name the configured store should use. This has no effect when 'in-memory' stores are configured."`
Table string `yaml:"table" env:"USERLOG_STORE_TABLE" desc:"(optional) The database table the store should use. This has no effect when 'in-memory' stores are configured."`
Size int `yaml:"size" env:"USERLOG_STORE_SIZE" desc:"The maximum quantity of items in the store. Only applies when store type 'ocmem' is configured. Defaults to 512."`
}
// Events combines the configuration options for the event bus.

View File

@@ -27,8 +27,9 @@ func DefaultConfig() *config.Config {
EnableTLS: false,
},
Store: config.Store{
Type: "inmemory",
Type: "mem",
},
RevaGateway: shared.DefaultRevaConfig().Address,
HTTP: config.HTTP{
Addr: "127.0.0.1:0",
Root: "/",
@@ -57,6 +58,10 @@ func EnsureDefaults(cfg *config.Config) {
cfg.Log = &config.Log{}
}
if cfg.MachineAuthAPIKey == "" && cfg.Commons != nil && cfg.Commons.MachineAuthAPIKey != "" {
cfg.MachineAuthAPIKey = cfg.Commons.MachineAuthAPIKey
}
if cfg.GRPCClientTLS == nil {
cfg.GRPCClientTLS = &shared.GRPCClientTLS{}
if cfg.Commons != nil && cfg.Commons.GRPCClientTLS != nil {

View File

@@ -4,6 +4,7 @@ import (
"errors"
ociscfg "github.com/owncloud/ocis/v2/ocis-pkg/config"
"github.com/owncloud/ocis/v2/ocis-pkg/shared"
"github.com/owncloud/ocis/v2/services/userlog/pkg/config"
"github.com/owncloud/ocis/v2/services/userlog/pkg/config/defaults"
@@ -34,5 +35,9 @@ func ParseConfig(cfg *config.Config) error {
// Validate validates the config
func Validate(cfg *config.Config) error {
if cfg.MachineAuthAPIKey == "" {
return shared.MissingMachineAuthApiKeyError(cfg.Service.Name)
}
return nil
}

View File

@@ -3,8 +3,10 @@ package http
import (
"context"
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0"
"github.com/owncloud/ocis/v2/services/userlog/pkg/config"
"github.com/owncloud/ocis/v2/services/userlog/pkg/metrics"
"github.com/urfave/cli/v2"
@@ -24,6 +26,8 @@ type Options struct {
Namespace string
Store store.Store
Consumer events.Consumer
GatewayClient gateway.GatewayAPIClient
HistoryClient ehsvc.EventHistoryService
RegisteredEvents []events.Unmarshaller
}
@@ -94,6 +98,20 @@ func Consumer(consumer events.Consumer) Option {
}
}
// Gateway provides a function to configure the gateway client
func Gateway(gw gateway.GatewayAPIClient) Option {
return func(o *Options) {
o.GatewayClient = gw
}
}
// History provides a function to configure the event history client
func History(h ehsvc.EventHistoryService) Option {
return func(o *Options) {
o.HistoryClient = h
}
}
// RegisteredEvents provides a function to register events
func RegisteredEvents(evs []events.Unmarshaller) Option {
return func(o *Options) {

View File

@@ -3,6 +3,11 @@ package http
import (
"fmt"
stdhttp "net/http"
"github.com/go-chi/chi/v5"
chimiddleware "github.com/go-chi/chi/v5/middleware"
"github.com/owncloud/ocis/v2/ocis-pkg/middleware"
"github.com/owncloud/ocis/v2/ocis-pkg/service/http"
"github.com/owncloud/ocis/v2/ocis-pkg/version"
svc "github.com/owncloud/ocis/v2/services/userlog/pkg/service"
@@ -34,27 +39,34 @@ func Server(opts ...Option) (http.Service, error) {
return http.Service{}, fmt.Errorf("could not initialize http service: %w", err)
}
//middlewares := []func(stdhttp.Handler) stdhttp.Handler{
//middleware.TraceContext,
//chimiddleware.RequestID,
//middleware.Version(
//"userlog",
//version.GetString(),
//),
//middleware.Logger(
//options.Logger,
//),
//}
handle, err := svc.NewUserlogService(options.Config, options.Consumer, options.Store, options.RegisteredEvents)
if err != nil {
return http.Service{}, err
middlewares := []func(stdhttp.Handler) stdhttp.Handler{
middleware.TraceContext,
chimiddleware.RequestID,
middleware.Version(
"userlog",
version.GetString(),
),
middleware.Logger(
options.Logger,
),
middleware.ExtractAccountUUID(),
}
{
//handle = svc.NewInstrument(handle, options.Metrics)
//handle = svc.NewLogging(handle, options.Logger)
//handle = svc.NewTracing(handle)
mux := chi.NewMux()
mux.Use(middlewares...)
handle, err := svc.NewUserlogService(
svc.Logger(options.Logger),
svc.Consumer(options.Consumer),
svc.Mux(mux),
svc.Store(options.Store),
svc.Config(options.Config),
svc.HistoryClient(options.HistoryClient),
svc.GatewayClient(options.GatewayClient),
svc.RegisteredEvents(options.RegisteredEvents),
)
if err != nil {
return http.Service{}, err
}
if err := micro.RegisterHandler(service.Server(), handle); err != nil {

View File

@@ -0,0 +1,233 @@
package service
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"time"
revactx "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/events"
ehmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/eventhistory/v0"
)
// ServeHTTP fulfills Handler interface
func (ul *UserlogService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ul.m.ServeHTTP(w, r)
}
// HandleGetEvents is the GET handler for events
func (ul *UserlogService) HandleGetEvents(w http.ResponseWriter, r *http.Request) {
u, ok := revactx.ContextGetUser(r.Context())
if !ok {
ul.log.Error().Int("returned statuscode", http.StatusUnauthorized).Msg("user unauthorized")
w.WriteHeader(http.StatusUnauthorized)
return
}
evs, err := ul.GetEvents(r.Context(), u.GetId().GetOpaqueId())
if err != nil {
ul.log.Error().Err(err).Int("returned statuscode", http.StatusInternalServerError).Msg("get events failed")
w.WriteHeader(http.StatusInternalServerError)
return
}
resp := GetEventResponseOC10{}
for _, e := range evs {
noti, err := ul.convertEvent(r.Context(), e)
if err != nil {
ul.log.Error().Err(err).Str("eventid", e.Id).Str("eventtype", e.Type).Msg("failed to convert event")
continue
}
resp.OCS.Data = append(resp.OCS.Data, noti)
}
resp.OCS.Meta.StatusCode = http.StatusOK
b, _ := json.Marshal(resp)
w.Write(b)
}
// HandleDeleteEvents is the DELETE handler for events
func (ul *UserlogService) HandleDeleteEvents(w http.ResponseWriter, r *http.Request) {
u, ok := revactx.ContextGetUser(r.Context())
if !ok {
ul.log.Error().Int("returned statuscode", http.StatusUnauthorized).Msg("user unauthorized")
w.WriteHeader(http.StatusUnauthorized)
return
}
var ids []string
if err := json.NewDecoder(r.Body).Decode(&ids); err != nil {
ul.log.Error().Err(err).Int("returned statuscode", http.StatusBadRequest).Msg("request body is malformed")
w.WriteHeader(http.StatusBadRequest)
return
}
if err := ul.DeleteEvents(u.GetId().GetOpaqueId(), ids); err != nil {
ul.log.Error().Err(err).Int("returned statuscode", http.StatusInternalServerError).Msg("delete events failed")
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
}
func (ul *UserlogService) convertEvent(ctx context.Context, event *ehmsg.Event) (OC10Notification, error) {
etype, ok := ul.registeredEvents[event.Type]
if !ok {
// this should not happen
return OC10Notification{}, errors.New("eventtype not registered")
}
einterface, err := etype.Unmarshal(event.Event)
if err != nil {
// this shouldn't happen either
return OC10Notification{}, errors.New("cant unmarshal event")
}
noti := OC10Notification{
EventID: event.Id,
Service: "userlog",
Timestamp: time.Now().Format(time.RFC3339Nano),
}
switch ev := einterface.(type) {
// file related
case events.UploadReady:
space, _ := ul.getSpace(ctx, ev.FileRef.GetResourceId().GetSpaceId())
noti.UserID = ev.ExecutingUser.GetId().GetOpaqueId()
noti.Subject = "File uploaded"
noti.Message = fmt.Sprintf("File '%s' was uploaded to space '%s' by user '%s'", ev.Filename, space.GetName(), ev.ExecutingUser.GetUsername())
case events.ContainerCreated:
noti.UserID = ev.Executant.GetOpaqueId()
noti.Subject = "Folder created"
noti.Message = fmt.Sprintf("Folder '%s' was created", ev.Ref.GetPath())
case events.FileTouched:
noti.UserID = ev.Executant.GetOpaqueId()
noti.Subject = "File touched"
noti.Message = fmt.Sprintf("File '%s' was touched", ev.Ref.GetPath())
case events.FileDownloaded:
noti.UserID = ev.Executant.GetOpaqueId()
noti.Subject = "File downloaded"
noti.Message = fmt.Sprintf("File '%s' was downloaded", ev.Ref.GetPath())
case events.FileVersionRestored:
noti.UserID = ev.Executant.GetOpaqueId()
noti.Subject = "File version restored"
noti.Message = fmt.Sprintf("An older version of file '%s' was restored", ev.Ref.GetPath())
case events.ItemMoved:
noti.UserID = ev.Executant.GetOpaqueId()
noti.Subject = "File moved"
noti.Message = fmt.Sprintf("File '%s' was moved from '%s'", ev.Ref.GetPath(), ev.OldReference.GetPath())
case events.ItemTrashed:
noti.UserID = ev.Executant.GetOpaqueId()
noti.Subject = "File trashed"
noti.Message = fmt.Sprintf("File '%s' was trashed", ev.Ref.GetPath())
case events.ItemPurged:
noti.UserID = ev.Executant.GetOpaqueId()
noti.Subject = "File purged"
noti.Message = fmt.Sprintf("File '%s' was purged", ev.Ref.GetPath())
case events.ItemRestored:
noti.UserID = ev.Executant.GetOpaqueId()
noti.Subject = "File restored"
noti.Message = fmt.Sprintf("File '%s' was restored", ev.Ref.GetPath())
// space related
case events.SpaceCreated:
noti.UserID = ev.Executant.GetOpaqueId()
noti.Subject = "Space created"
noti.Message = fmt.Sprintf("Space '%s' was created", ev.Name)
case events.SpaceRenamed:
noti.UserID = ev.Executant.GetOpaqueId()
noti.Subject = "Space renamed"
noti.Message = fmt.Sprintf("Space '%s' was renamed", ev.Name)
case events.SpaceEnabled:
space, _ := ul.getSpace(ctx, ev.ID.GetOpaqueId())
noti.UserID = ev.Executant.GetOpaqueId()
noti.Subject = "Space enabled"
noti.Message = fmt.Sprintf("Space '%s' was renamed", space.Name)
case events.SpaceDisabled:
space, _ := ul.getSpace(ctx, ev.ID.GetOpaqueId())
noti.UserID = ev.Executant.GetOpaqueId()
noti.Subject = "Space disabled"
noti.Message = fmt.Sprintf("Space '%s' was disabled", space.Name)
case events.SpaceDeleted:
space, _ := ul.getSpace(ctx, ev.ID.GetOpaqueId())
noti.UserID = ev.Executant.GetOpaqueId()
noti.Subject = "Space deleted"
noti.Message = fmt.Sprintf("Space '%s' was deleted", space.Name)
case events.SpaceShared:
space, _ := ul.getSpace(ctx, ev.ID.GetOpaqueId())
noti.UserID = ev.Executant.GetOpaqueId()
noti.Subject = "Space shared"
noti.Message = fmt.Sprintf("Space '%s' was shared", space.Name)
case events.SpaceUnshared:
space, _ := ul.getSpace(ctx, ev.ID.GetOpaqueId())
noti.UserID = ev.Executant.GetOpaqueId()
noti.Subject = "Space unshared"
noti.Message = fmt.Sprintf("Space '%s' was unshared", space.Name)
case events.SpaceUpdated:
space, _ := ul.getSpace(ctx, ev.ID.GetOpaqueId())
noti.UserID = ev.Executant.GetOpaqueId()
noti.Subject = "Space updated"
noti.Message = fmt.Sprintf("Space '%s' was updated", space.Name)
case events.SpaceMembershipExpired:
space, _ := ul.getSpace(ctx, ev.SpaceID.GetOpaqueId())
noti.UserID = ""
noti.Subject = "Space membership expired"
noti.Message = fmt.Sprintf("A spacemembership for space '%s' has expired", space.Name)
// share related
case events.ShareCreated:
space, _ := ul.getSpace(ctx, ev.ItemID.GetSpaceId())
noti.UserID = ev.Executant.GetOpaqueId()
noti.Subject = "Share received"
noti.Message = fmt.Sprintf("A file was shared in space %s", space.Name)
case events.ShareUpdated:
space, _ := ul.getSpace(ctx, ev.ItemID.GetSpaceId())
noti.UserID = ev.Executant.GetOpaqueId()
noti.Subject = "Share updated"
noti.Message = fmt.Sprintf("A share was updated in space %s", space.Name)
case events.ShareExpired:
space, _ := ul.getSpace(ctx, ev.ItemID.GetSpaceId())
noti.Subject = "Share expired"
noti.Message = fmt.Sprintf("A share has expired in space %s", space.Name)
case events.LinkCreated:
space, _ := ul.getSpace(ctx, ev.ItemID.GetSpaceId())
noti.UserID = ev.Executant.GetOpaqueId()
noti.Subject = "Share received"
noti.Message = fmt.Sprintf("A link was created in space %s", space.Name)
case events.LinkUpdated:
space, _ := ul.getSpace(ctx, ev.ItemID.GetSpaceId())
noti.UserID = ev.Executant.GetOpaqueId()
noti.Subject = "Share received"
noti.Message = fmt.Sprintf("A link was updated in space %s", space.Name)
}
return noti, nil
}
// OC10Notification is the oc10 style representation of an event
// some fields are left out for simplicity
type OC10Notification struct {
EventID string `json:"notification_id"`
Service string `json:"app"`
Timestamp string `json:"datetime"`
UserID string `json:"user"`
Subject string `json:"subject"`
Message string `json:"message"`
}
// GetEventResponseOC10 is the response from GET events endpoint in oc10 style
type GetEventResponseOC10 struct {
OCS struct {
Meta struct {
Message string `json:"message"`
Status string `json:"status"`
StatusCode int `json:"statuscode"`
} `json:"meta"`
Data []OC10Notification `json:"data"`
} `json:"ocs"`
}

View File

@@ -0,0 +1,82 @@
package service
import (
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/go-chi/chi/v5"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0"
"github.com/owncloud/ocis/v2/services/userlog/pkg/config"
"go-micro.dev/v4/store"
)
// Option for the userlog service
type Option func(*Options)
// Options for the userlog service
type Options struct {
Logger log.Logger
Consumer events.Consumer
Mux *chi.Mux
Store store.Store
Config *config.Config
HistoryClient ehsvc.EventHistoryService
GatewayClient gateway.GatewayAPIClient
RegisteredEvents []events.Unmarshaller
}
// Logger configures a logger for the userlog service
func Logger(log log.Logger) Option {
return func(o *Options) {
o.Logger = log
}
}
// Consumer configures an event consumer for the userlog service
func Consumer(c events.Consumer) Option {
return func(o *Options) {
o.Consumer = c
}
}
// Mux defines the muxer for the userlog service
func Mux(m *chi.Mux) Option {
return func(o *Options) {
o.Mux = m
}
}
// Store defines the store for the userlog service
func Store(s store.Store) Option {
return func(o *Options) {
o.Store = s
}
}
// Config adds the config for the userlog service
func Config(c *config.Config) Option {
return func(o *Options) {
o.Config = c
}
}
// HistoryClient adds a grpc client for the eventhistory service
func HistoryClient(hc ehsvc.EventHistoryService) Option {
return func(o *Options) {
o.HistoryClient = hc
}
}
// GatewayClient adds a grpc client for the gateway service
func GatewayClient(gwc gateway.GatewayAPIClient) Option {
return func(o *Options) {
o.GatewayClient = gwc
}
}
// RegisteredEvents registers the events the service should listen to
func RegisteredEvents(e []events.Unmarshaller) Option {
return func(o *Options) {
o.RegisteredEvents = e
}
}

View File

@@ -3,51 +3,74 @@ package service
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"reflect"
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
group "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1"
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
storageprovider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/owncloud/ocis/v2/ocis-pkg/service/grpc"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/go-chi/chi/v5"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
ehmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/eventhistory/v0"
ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0"
"github.com/owncloud/ocis/v2/services/userlog/pkg/config"
"go-micro.dev/v4/store"
)
// Comment when you read this on review
var _adminid = "2502d8b8-a5e7-4ab3-b858-5aafae4a64a2"
// UserlogService is the service responsible for user activities
type UserlogService struct {
log log.Logger
ch <-chan events.Event
m *chi.Mux
store store.Store
cfg *config.Config
historyClient ehsvc.EventHistoryService
gwClient gateway.GatewayAPIClient
registeredEvents map[string]events.Unmarshaller
}
// NewUserlogService returns an EventHistory service
func NewUserlogService(cfg *config.Config, consumer events.Consumer, store store.Store, registeredEvents []events.Unmarshaller) (*UserlogService, error) {
if consumer == nil || store == nil {
return nil, fmt.Errorf("Need non nil consumer (%v) and store (%v) to work properly", consumer, store)
func NewUserlogService(opts ...Option) (*UserlogService, error) {
o := &Options{}
for _, opt := range opts {
opt(o)
}
ch, err := events.Consume(consumer, "userlog", registeredEvents...)
if o.Consumer == nil || o.Store == nil {
return nil, fmt.Errorf("Need non nil consumer (%v) and store (%v) to work properly", o.Consumer, o.Store)
}
ch, err := events.Consume(o.Consumer, "userlog", o.RegisteredEvents...)
if err != nil {
return nil, err
}
grpcClient := grpc.DefaultClient()
grpcClient.Options()
c := ehsvc.NewEventHistoryService("com.owncloud.api.eventhistory", grpcClient)
ul := &UserlogService{
log: o.Logger,
ch: ch,
m: o.Mux,
store: o.Store,
cfg: o.Config,
historyClient: o.HistoryClient,
gwClient: o.GatewayClient,
registeredEvents: make(map[string]events.Unmarshaller),
}
ul := &UserlogService{ch: ch, store: store, cfg: cfg, historyClient: c, registeredEvents: make(map[string]events.Unmarshaller)}
for _, e := range registeredEvents {
for _, e := range o.RegisteredEvents {
typ := reflect.TypeOf(e)
ul.registeredEvents[typ.String()] = e
}
ul.m.Route("/", func(r chi.Router) {
r.Get("/*", ul.HandleGetEvents)
r.Delete("/*", ul.HandleDeleteEvents)
})
go ul.MemorizeEvents()
return ul, nil
@@ -56,18 +79,83 @@ func NewUserlogService(cfg *config.Config, consumer events.Consumer, store store
// MemorizeEvents stores eventIDs a user wants to receive
func (ul *UserlogService) MemorizeEvents() {
for event := range ul.ch {
switch event.Event.(type) {
// for each event we need to:
// I) find users eligible to receive the event
var (
users []string
err error
)
switch e := event.Event.(type) {
default:
// for each event type we need to:
err = errors.New("unhandled event")
// I) find users eligible to receive the event
// file related
case events.UploadReady:
users, err = ul.findSpaceMembers(ul.impersonate(e.SpaceOwner), e.FileRef.GetResourceId().GetSpaceId(), viewer)
case events.ContainerCreated:
users, err = ul.findSpaceMembers(ul.impersonate(e.SpaceOwner), e.Ref.GetResourceId().GetSpaceId(), viewer)
case events.FileTouched:
users, err = ul.findSpaceMembers(ul.impersonate(e.SpaceOwner), e.Ref.GetResourceId().GetSpaceId(), viewer)
case events.FileDownloaded:
users, err = ul.findSpaceMembers(ul.impersonate(e.Owner), e.Ref.GetResourceId().GetSpaceId(), viewer) // no space owner in event
case events.FileVersionRestored:
users, err = ul.findSpaceMembers(ul.impersonate(e.SpaceOwner), e.Ref.GetResourceId().GetSpaceId(), editor)
case events.ItemMoved:
users, err = ul.findSpaceMembers(ul.impersonate(e.SpaceOwner), e.Ref.GetResourceId().GetSpaceId(), viewer)
case events.ItemTrashed:
users, err = ul.findSpaceMembers(ul.impersonate(e.SpaceOwner), e.Ref.GetResourceId().GetSpaceId(), viewer)
case events.ItemPurged:
users, err = ul.findSpaceMembers(ul.impersonate(e.Owner), e.Ref.GetResourceId().GetSpaceId(), editor) // no space owner in event
case events.ItemRestored:
users, err = ul.findSpaceMembers(ul.impersonate(e.SpaceOwner), e.Ref.GetResourceId().GetSpaceId(), viewer)
// II) filter users who want to receive the event
// space related // TODO: how to find spaceadmins?
case events.SpaceCreated:
users, err = ul.findSpaceMembers(ul.impersonate(e.Executant), e.ID.GetOpaqueId(), viewer)
case events.SpaceRenamed:
users, err = ul.findSpaceMembers(ul.impersonate(e.Executant), e.ID.GetOpaqueId(), viewer)
case events.SpaceEnabled:
users, err = ul.findSpaceMembers(ul.impersonate(e.Executant), e.ID.GetOpaqueId(), viewer)
case events.SpaceDisabled:
users, err = ul.findSpaceMembers(ul.impersonate(e.Executant), e.ID.GetOpaqueId(), manager)
case events.SpaceDeleted:
users, err = ul.findSpaceMembers(ul.impersonate(e.Executant), e.ID.GetOpaqueId(), manager)
case events.SpaceShared:
users, err = ul.findSpaceMembers(ul.impersonate(e.Executant), e.ID.GetOpaqueId(), manager)
case events.SpaceUnshared:
users, err = ul.findSpaceMembers(ul.impersonate(e.Executant), e.ID.GetOpaqueId(), manager)
case events.SpaceUpdated:
users, err = ul.findSpaceMembers(ul.impersonate(e.Executant), e.ID.GetOpaqueId(), manager)
case events.SpaceMembershipExpired:
users, err = ul.resolveShare(ul.impersonate(e.SpaceOwner), e.GranteeUserID, e.GranteeGroupID, e.SpaceID.GetOpaqueId())
// III) store the eventID for each user
// share related
case events.ShareCreated:
users, err = ul.resolveShare(ul.impersonate(e.Executant), e.GranteeUserID, e.GranteeGroupID, e.ItemID.GetSpaceId())
case events.ShareUpdated:
users, err = ul.resolveShare(ul.impersonate(e.Executant), e.GranteeUserID, e.GranteeGroupID, e.ItemID.GetSpaceId())
case events.ShareExpired:
users, err = ul.resolveShare(ul.impersonate(e.ShareOwner), e.GranteeUserID, e.GranteeGroupID, e.ItemID.GetSpaceId())
case events.LinkCreated:
users, err = ul.findSpaceMembers(ul.impersonate(e.Executant), e.ItemID.GetOpaqueId(), editor)
case events.LinkUpdated:
users, err = ul.findSpaceMembers(ul.impersonate(e.Executant), e.ItemID.GetOpaqueId(), editor)
// TEMP TESTING CODE
if err := ul.addEventToUser(_adminid, event.ID); err != nil {
}
if err != nil {
ul.log.Error().Err(err).Interface("event", event).Msg("error gathering members for event")
continue
}
// II) filter users who want to receive the event
// This step is postponed for later.
// For now each user should get all events she is eligible to receive
// III) store the eventID for each user
for _, id := range users {
if err := ul.addEventsToUser(id, event.ID); err != nil {
ul.log.Error().Err(err).Str("userID", id).Str("eventid", event.ID).Msg("failed to store event for user")
continue
}
}
@@ -75,20 +163,21 @@ func (ul *UserlogService) MemorizeEvents() {
}
// GetEvents allows to retrieve events from the eventhistory by userid
func (ul *UserlogService) GetEvents(ctx context.Context, userid string) ([]interface{}, error) {
func (ul *UserlogService) GetEvents(ctx context.Context, userid string) ([]*ehmsg.Event, error) {
rec, err := ul.store.Read(userid)
if err != nil {
if err != nil && err != store.ErrNotFound {
ul.log.Fatal().Err(err).Str("userid", userid).Msg("failed to read record from database")
return nil, err
}
if len(rec) == 0 {
// no events available
return []interface{}{}, nil
return []*ehmsg.Event{}, nil
}
var eventIDs []string
if err := json.Unmarshal(rec[0].Value, &eventIDs); err != nil {
// this should never happen
ul.log.Fatal().Err(err).Str("userid", userid).Msg("failed to umarshal record from database")
return nil, err
}
@@ -97,38 +186,74 @@ func (ul *UserlogService) GetEvents(ctx context.Context, userid string) ([]inter
return nil, err
}
var events []interface{}
for _, e := range resp.Events {
ev, ok := ul.registeredEvents[e.Type]
if !ok {
// this should not happen but we handle it anyway
continue
// remove expired events from list asynchronously
go func() {
if err := ul.removeExpiredEvents(userid, eventIDs, resp.Events); err != nil {
ul.log.Error().Err(err).Str("userid", userid).Msg("could not remove expired events from user")
}
event, err := ev.Unmarshal(e.Event)
if err != nil {
// this shouldn't happen either
continue
}
}()
events = append(events, event)
}
return resp.Events, nil
return events, nil
}
func (ul *UserlogService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
evs, err := ul.GetEvents(r.Context(), _adminid)
// DeleteEvents will delete the specified events
func (ul *UserlogService) DeleteEvents(userid string, evids []string) error {
toDelete := make(map[string]struct{})
for _, e := range evids {
toDelete[e] = struct{}{}
}
return ul.alterUserEventList(userid, func(ids []string) []string {
var newids []string
for _, id := range ids {
if _, delete := toDelete[id]; delete {
continue
}
newids = append(newids, id)
}
return newids
})
}
func (ul *UserlogService) impersonate(u *user.UserId) context.Context {
ctx, _, err := utils.Impersonate(u, ul.gwClient, ul.cfg.MachineAuthAPIKey)
if err != nil {
return
ul.log.Error().Err(err).Str("userid", u.GetOpaqueId()).Msg("failed to impersonate user")
return context.Background()
}
// TODO: format response
b, _ := json.Marshal(evs)
w.Write(b)
return ctx
}
func (ul *UserlogService) addEventToUser(userid string, eventid string) error {
func (ul *UserlogService) addEventsToUser(userid string, eventids ...string) error {
return ul.alterUserEventList(userid, func(ids []string) []string {
return append(ids, eventids...)
})
}
func (ul *UserlogService) removeExpiredEvents(userid string, all []string, received []*ehmsg.Event) error {
exists := make(map[string]struct{}, len(received))
for _, e := range received {
exists[e.Id] = struct{}{}
}
var toDelete []string
for _, eid := range all {
if _, ok := exists[eid]; !ok {
toDelete = append(toDelete, eid)
}
}
if len(toDelete) == 0 {
return nil
}
return ul.DeleteEvents(userid, toDelete)
}
func (ul *UserlogService) alterUserEventList(userid string, alter func([]string) []string) error {
recs, err := ul.store.Read(userid)
if err != nil && err != store.ErrNotFound {
return err
@@ -141,7 +266,12 @@ func (ul *UserlogService) addEventToUser(userid string, eventid string) error {
}
}
ids = append(ids, eventid)
ids = alter(ids)
// store reacts unforseeable when trying to store nil values
if len(ids) == 0 {
return ul.store.Delete(userid)
}
b, err := json.Marshal(ids)
if err != nil {
@@ -153,3 +283,158 @@ func (ul *UserlogService) addEventToUser(userid string, eventid string) error {
Value: b,
})
}
// we need the spaceid to inform other space members
// we need an owner to query space members
// we need to check the user has the required role to see the event
func (ul *UserlogService) findSpaceMembers(ctx context.Context, spaceID string, requiredRole permissionChecker) ([]string, error) {
space, err := ul.getSpace(ctx, spaceID)
if err != nil {
return nil, err
}
var users []string
switch space.SpaceType {
case "personal":
users = []string{space.GetOwner().GetId().GetOpaqueId()}
case "project":
if users, err = ul.gatherSpaceMembers(ctx, space, requiredRole); err != nil {
return nil, err
}
default:
// TODO: shares? other space types?
return nil, fmt.Errorf("unsupported space type: %s", space.SpaceType)
}
return users, nil
}
func (ul *UserlogService) getSpace(ctx context.Context, spaceID string) (*storageprovider.StorageSpace, error) {
res, err := ul.gwClient.ListStorageSpaces(ctx, listStorageSpaceRequest(spaceID))
if err != nil {
return nil, err
}
if res.GetStatus().GetCode() != rpc.Code_CODE_OK {
return nil, fmt.Errorf("Unexpected status code while getting space: %v", res.GetStatus().GetCode())
}
if len(res.StorageSpaces) == 0 {
return nil, fmt.Errorf("error getting storage space %s: no space returned", spaceID)
}
return res.StorageSpaces[0], nil
}
func (ul *UserlogService) gatherSpaceMembers(ctx context.Context, space *storageprovider.StorageSpace, hasRequiredRole permissionChecker) ([]string, error) {
var permissionsMap map[string]*storageprovider.ResourcePermissions
if err := utils.ReadJSONFromOpaque(space.GetOpaque(), "grants", &permissionsMap); err != nil {
return nil, err
}
groupsMap := make(map[string]struct{})
if opaqueGroups, ok := space.Opaque.Map["groups"]; ok {
_ = json.Unmarshal(opaqueGroups.GetValue(), &groupsMap)
}
// we use a map to avoid duplicates
usermap := make(map[string]struct{})
for id, perm := range permissionsMap {
if !hasRequiredRole(perm) {
// not allowed to receive event
continue
}
if _, isGroup := groupsMap[id]; !isGroup {
usermap[id] = struct{}{}
continue
}
usrs, err := ul.resolveGroup(ctx, id)
if err != nil {
ul.log.Error().Err(err).Str("groupID", id).Msg("failed to resolve group")
continue
}
for _, u := range usrs {
usermap[u] = struct{}{}
}
}
var users []string
for id := range usermap {
users = append(users, id)
}
return users, nil
}
// resolves the users of a group
func (ul *UserlogService) resolveGroup(ctx context.Context, groupID string) ([]string, error) {
r, err := ul.gwClient.GetGroup(ctx, &group.GetGroupRequest{GroupId: &group.GroupId{OpaqueId: groupID}})
if err != nil {
return nil, err
}
if r.GetStatus().GetCode() != rpc.Code_CODE_OK {
return nil, fmt.Errorf("unexpected status code from gateway client: %d", r.GetStatus().GetCode())
}
var userIDs []string
for _, m := range r.GetGroup().GetMembers() {
userIDs = append(userIDs, m.GetOpaqueId())
}
return userIDs, nil
}
func (ul *UserlogService) resolveID(ctx context.Context, userid *user.UserId, groupid *group.GroupId) ([]string, error) {
if userid != nil {
return []string{userid.GetOpaqueId()}, nil
}
return ul.resolveGroup(ctx, groupid.GetOpaqueId())
}
func (ul *UserlogService) resolveShare(ctx context.Context, userid *user.UserId, groupid *group.GroupId, spaceid string) ([]string, error) {
users, err := ul.resolveID(ctx, userid, groupid)
if err != nil {
return nil, err
}
usr, err := ul.findSpaceMembers(ctx, spaceid, editor)
if err != nil {
return nil, err
}
return append(users, usr...), nil
}
func listStorageSpaceRequest(spaceID string) *storageprovider.ListStorageSpacesRequest {
return &storageprovider.ListStorageSpacesRequest{
Filters: []*storageprovider.ListStorageSpacesRequest_Filter{
{
Type: storageprovider.ListStorageSpacesRequest_Filter_TYPE_ID,
Term: &storageprovider.ListStorageSpacesRequest_Filter_Id{
Id: &storageprovider.StorageSpaceId{
OpaqueId: spaceID,
},
},
},
},
}
}
type permissionChecker func(*storageprovider.ResourcePermissions) bool
func viewer(perms *storageprovider.ResourcePermissions) bool {
return perms.Stat
}
func editor(perms *storageprovider.ResourcePermissions) bool {
return perms.InitiateFileUpload
}
func manager(perms *storageprovider.ResourcePermissions) bool {
return perms.DenyGrant
}

View File

@@ -0,0 +1,13 @@
package service_test
import (
"testing"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
func TestSearch(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Userlog service Suite")
}

View File

@@ -1,3 +1,143 @@
package service_test
// tests here
import (
"context"
"encoding/json"
"reflect"
"time"
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/utils"
cs3mocks "github.com/cs3org/reva/v2/tests/cs3mocks/mocks"
"github.com/go-chi/chi/v5"
"github.com/google/uuid"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/ocis-pkg/store"
ehmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/eventhistory/v0"
ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0"
"github.com/owncloud/ocis/v2/services/userlog/mocks"
"github.com/owncloud/ocis/v2/services/userlog/pkg/config"
"github.com/owncloud/ocis/v2/services/userlog/pkg/service"
"github.com/test-go/testify/mock"
microevents "go-micro.dev/v4/events"
microstore "go-micro.dev/v4/store"
)
var _ = Describe("UserlogService", func() {
var (
cfg = &config.Config{}
ul *service.UserlogService
bus testBus
sto microstore.Store
gwc cs3mocks.GatewayAPIClient
ehc mocks.EventHistoryService
)
BeforeEach(func() {
var err error
sto = store.Create()
bus = testBus(make(chan events.Event))
o := utils.AppendJSONToOpaque(nil, "grants", map[string]*provider.ResourcePermissions{"userid": {Stat: true}})
gwc.On("ListStorageSpaces", mock.Anything, mock.Anything).Return(&provider.ListStorageSpacesResponse{StorageSpaces: []*provider.StorageSpace{
{
Opaque: o,
SpaceType: "project",
},
}, Status: &rpc.Status{Code: rpc.Code_CODE_OK}}, nil)
gwc.On("GetUser", mock.Anything, mock.Anything).Return(&user.GetUserResponse{User: &user.User{Id: &user.UserId{OpaqueId: "userid"}}, Status: &rpc.Status{Code: rpc.Code_CODE_OK}}, nil)
gwc.On("Authenticate", mock.Anything, mock.Anything).Return(&gateway.AuthenticateResponse{Status: &rpc.Status{Code: rpc.Code_CODE_OK}}, nil)
ul, err = service.NewUserlogService(
service.Config(cfg),
service.Consumer(bus),
service.Store(sto),
service.Logger(log.NewLogger()),
service.Mux(chi.NewMux()),
service.GatewayClient(&gwc),
service.HistoryClient(&ehc),
service.RegisteredEvents([]events.Unmarshaller{
events.UploadReady{},
}),
)
Expect(err).ToNot(HaveOccurred())
})
It("it stores, returns and deletes a couple of events", func() {
ids := make(map[string]struct{})
ids[bus.Publish(events.SpaceCreated{Executant: &user.UserId{OpaqueId: "userid"}})] = struct{}{}
ids[bus.Publish(events.UploadReady{SpaceOwner: &user.UserId{OpaqueId: "userid"}})] = struct{}{}
ids[bus.Publish(events.ContainerCreated{SpaceOwner: &user.UserId{OpaqueId: "userid"}})] = struct{}{}
time.Sleep(500 * time.Millisecond)
var events []*ehmsg.Event
for id := range ids {
events = append(events, &ehmsg.Event{Id: id})
}
ehc.On("GetEvents", mock.Anything, mock.Anything).Return(&ehsvc.GetEventsResponse{Events: events}, nil)
evs, err := ul.GetEvents(context.Background(), "userid")
Expect(err).ToNot(HaveOccurred())
Expect(len(evs)).To(Equal(len(ids)))
var evids []string
for _, e := range evs {
_, exists := ids[e.Id]
Expect(exists).To(BeTrue())
delete(ids, e.Id)
evids = append(evids, e.Id)
}
Expect(len(ids)).To(Equal(0))
err = ul.DeleteEvents("userid", evids)
Expect(err).ToNot(HaveOccurred())
evs, err = ul.GetEvents(context.Background(), "userid")
Expect(err).ToNot(HaveOccurred())
Expect(len(evs)).To(Equal(0))
})
AfterEach(func() {
close(bus)
})
})
type testBus chan events.Event
func (tb testBus) Consume(_ string, _ ...microevents.ConsumeOption) (<-chan microevents.Event, error) {
ch := make(chan microevents.Event)
go func() {
for ev := range tb {
b, _ := json.Marshal(ev.Event)
ch <- microevents.Event{
Payload: b,
Metadata: map[string]string{
events.MetadatakeyEventID: ev.ID,
events.MetadatakeyEventType: ev.Type,
},
}
}
}()
return ch, nil
}
func (tb testBus) Publish(e interface{}) string {
ev := events.Event{
ID: uuid.New().String(),
Type: reflect.TypeOf(e).String(),
Event: e,
}
tb <- ev
return ev.ID
}