sharpen eventhistory service

Signed-off-by: jkoberg <jkoberg@owncloud.com>
This commit is contained in:
jkoberg
2023-02-06 13:08:55 +01:00
parent 5aa1f6eb55
commit c9dfb778a9
20 changed files with 310 additions and 48 deletions

View File

@@ -58,6 +58,7 @@ config = {
"services/auth-basic",
"services/auth-bearer",
"services/auth-machine",
"services/eventhistory",
"services/frontend",
"services/gateway",
"services/graph",

View File

@@ -22,6 +22,7 @@ OCIS_MODULES = \
services/auth-basic \
services/auth-bearer \
services/auth-machine \
services/eventhistory \
services/frontend \
services/gateway \
services/graph \

View File

@@ -1,5 +1,5 @@
Enhancement: Eventhistory service
Introduces the `eventhistory` service. It is a service that is storing events and providing a grpc API to retrieve them
Introduces the `eventhistory` service. It is a service that stores events and provides a grpc API to retrieve them.
https://github.com/owncloud/ocis/pull/5600

View File

@@ -9,6 +9,7 @@ import (
authbasic "github.com/owncloud/ocis/v2/services/auth-basic/pkg/config"
authbearer "github.com/owncloud/ocis/v2/services/auth-bearer/pkg/config"
authmachine "github.com/owncloud/ocis/v2/services/auth-machine/pkg/config"
eventhistory "github.com/owncloud/ocis/v2/services/eventhistory/pkg/config"
frontend "github.com/owncloud/ocis/v2/services/frontend/pkg/config"
gateway "github.com/owncloud/ocis/v2/services/gateway/pkg/config"
graph "github.com/owncloud/ocis/v2/services/graph/pkg/config"
@@ -84,6 +85,7 @@ type Config struct {
AuthBasic *authbasic.Config `yaml:"auth_basic"`
AuthBearer *authbearer.Config `yaml:"auth_bearer"`
AuthMachine *authmachine.Config `yaml:"auth_machine"`
EventHistory *eventhistory.Config `yaml:"eventhistory"`
Frontend *frontend.Config `yaml:"frontend"`
Gateway *gateway.Config `yaml:"gateway"`
Graph *graph.Config `yaml:"graph"`

View File

@@ -7,6 +7,7 @@ import (
authbasic "github.com/owncloud/ocis/v2/services/auth-basic/pkg/config/defaults"
authbearer "github.com/owncloud/ocis/v2/services/auth-bearer/pkg/config/defaults"
authmachine "github.com/owncloud/ocis/v2/services/auth-machine/pkg/config/defaults"
eventhistory "github.com/owncloud/ocis/v2/services/eventhistory/pkg/config/defaults"
frontend "github.com/owncloud/ocis/v2/services/frontend/pkg/config/defaults"
gateway "github.com/owncloud/ocis/v2/services/gateway/pkg/config/defaults"
graph "github.com/owncloud/ocis/v2/services/graph/pkg/config/defaults"
@@ -48,6 +49,7 @@ func DefaultConfig() *Config {
AuthBasic: authbasic.DefaultConfig(),
AuthBearer: authbearer.DefaultConfig(),
AuthMachine: authmachine.DefaultConfig(),
EventHistory: eventhistory.DefaultConfig(),
Frontend: frontend.DefaultConfig(),
Gateway: gateway.DefaultConfig(),
Graph: graph.DefaultConfig(),

View File

@@ -29,7 +29,7 @@ type Manager struct {
func NewManager(o ...Option) Manager {
opts := newOptions(o...)
nStore := ocisstore.GetStore(opts.storeOptions)
nStore := ocisstore.Create(opts.storeOptions...)
return Manager{
cache: nStore,
roleService: opts.roleService,

View File

@@ -8,7 +8,7 @@ import (
// Options are all the possible options.
type Options struct {
storeOptions ocisstore.OcisStoreOptions
storeOptions []ocisstore.Option
logger log.Logger
roleService settingssvc.RoleService
}
@@ -30,7 +30,8 @@ func RoleService(rs settingssvc.RoleService) Option {
}
}
func StoreOptions(storeOpts ocisstore.OcisStoreOptions) Option {
// StoreOptions are the options for the store
func StoreOptions(storeOpts []ocisstore.Option) Option {
return func(o *Options) {
o.storeOptions = storeOpts
}

50
ocis-pkg/store/options.go Normal file
View File

@@ -0,0 +1,50 @@
package store
import "time"
// Option provides an option to configure the store
type Option func(*Options)
// Type defines the type of the store
func Type(typ string) Option {
return func(o *Options) {
o.Type = typ
}
}
// Addresses defines the addresses where the store can be reached
func Addresses(addrs ...string) Option {
return func(o *Options) {
o.Addresses = addrs
}
}
// Database defines the Database the store should use
func Database(db string) Option {
return func(o *Options) {
o.Database = db
}
}
// Table defines the table the store should use
func Table(t string) Option {
return func(o *Options) {
o.Table = t
}
}
// Size defines the maximum capacity of the store.
// Only applicable when using "ocmem" store
func Size(s int) Option {
return func(o *Options) {
o.Size = s
}
}
// TTL defines the time to life for elements in the store.
// Only applicable when using "natsjs" store
func TTL(t time.Duration) Option {
return func(o *Options) {
o.TTL = t
}
}

View File

@@ -2,8 +2,11 @@ package store
import (
"context"
"strings"
"time"
natsjs "github.com/go-micro/plugins/v4/store/nats-js"
"github.com/go-micro/plugins/v4/store/redis"
"github.com/nats-io/nats.go"
"github.com/owncloud/ocis/v2/ocis-pkg/store/etcd"
"github.com/owncloud/ocis/v2/ocis-pkg/store/memory"
"go-micro.dev/v4/store"
@@ -11,7 +14,8 @@ import (
var ocMemStore *store.Store
type OcisStoreOptions struct {
// Options are the options to configure the store
type Options struct {
// Type determines the implementation:
// * "noop", for a noop store (it does nothing)
// * "etcd", for etcd
@@ -20,10 +24,8 @@ type OcisStoreOptions struct {
// * "memory", for a in-memory implementation, which is the default if noone matches
Type string
// Address is a comma-separated list of nodes that the store
// will use. This is currently usable only with the etcd implementation. If it
// isn't provided, "127.0.0.1:2379" will be the only node used.
Address string
// Address is a list of nodes that the store will use.
Addresses []string
// Size configures the maximum capacity of the cache for
// the "ocmem" implementation, in number of items that the cache can hold per table.
@@ -31,33 +33,50 @@ type OcisStoreOptions struct {
// The parameter only affects to the "ocmem" implementation, the rest will ignore it.
// If an invalid value is used, the default of 512 will be used instead.
Size int
// Database the store should use (optional)
Database string
// Table the store should use (optional)
Table string
// TTL is the time to life for documents stored in the store
TTL time.Duration
}
// GetStore returns a configured key-value store
// Create returns a configured key-value store
//
// Each microservice (or whatever piece is using the store) should use the
// options available in the interface's operations to choose the right database
// and table to prevent collisions with other microservices.
// Recommended approach is to use "services" or "ocis-pkg" for the database,
// and "services/<service-name>/" or "ocis-pkg/<pkg>/" for the package name.
func GetStore(ocisOpts OcisStoreOptions) store.Store {
var s store.Store
addresses := strings.Split(ocisOpts.Address, ",")
opts := []store.Option{
store.Nodes(addresses...),
func Create(opts ...Option) store.Store {
options := &Options{}
for _, o := range opts {
o(options)
}
switch ocisOpts.Type {
storeopts := storeOptions(options)
switch options.Type {
default:
// TODO: better to error in default case?
fallthrough
case "mem":
return store.NewMemoryStore(storeopts...)
case "noop":
s = store.NewNoopStore(opts...)
return store.NewNoopStore(storeopts...)
case "etcd":
s = etcd.NewEtcdStore(opts...)
return etcd.NewEtcdStore(storeopts...)
case "redis":
// FIXME redis plugin does not support redis cluster, sentinel or ring -> needs upstream patch or our implementation
return redis.NewStore(storeopts...)
case "ocmem":
if ocMemStore == nil {
var memStore store.Store
sizeNum := ocisOpts.Size
sizeNum := options.Size
if sizeNum <= 0 {
memStore = memory.NewMultiMemStore()
} else {
@@ -73,9 +92,37 @@ func GetStore(ocisOpts OcisStoreOptions) store.Store {
}
ocMemStore = &memStore
}
s = *ocMemStore
default:
s = store.NewMemoryStore(opts...)
return *ocMemStore
case "nats-js":
// TODO nats needs a DefaultTTL option as it does not support per Write TTL ...
// FIXME nats has restrictions on the key, we cannot use slashes AFAICT
// host, port, clusterid
return natsjs.NewStore(
append(storeopts,
natsjs.NatsOptions(nats.Options{Name: "TODO"}),
natsjs.DefaultTTL(options.TTL),
)...,
) // TODO test with ocis nats
}
return s
}
func storeOptions(o *Options) []store.Option {
var opts []store.Option
if o.Addresses != nil {
opts = append(opts, store.Nodes(o.Addresses...))
}
if o.Database != "" {
opts = append(opts, store.Database(o.Database))
}
if o.Table != "" {
opts = append(opts, store.Table(o.Table))
}
return opts
}

View File

@@ -24,6 +24,7 @@ import (
appRegistry "github.com/owncloud/ocis/v2/services/app-registry/pkg/command"
authbasic "github.com/owncloud/ocis/v2/services/auth-basic/pkg/command"
authmachine "github.com/owncloud/ocis/v2/services/auth-machine/pkg/command"
eventhistory "github.com/owncloud/ocis/v2/services/eventhistory/pkg/command"
frontend "github.com/owncloud/ocis/v2/services/frontend/pkg/command"
gateway "github.com/owncloud/ocis/v2/services/gateway/pkg/command"
graph "github.com/owncloud/ocis/v2/services/graph/pkg/command"
@@ -129,6 +130,7 @@ func NewService(options ...Option) (*Service, error) {
s.ServicesRegistry[opts.Config.Notifications.Service.Name] = notifications.NewSutureService
s.ServicesRegistry[opts.Config.Search.Service.Name] = search.NewSutureService
s.ServicesRegistry[opts.Config.Postprocessing.Service.Name] = postprocessing.NewSutureService
s.ServicesRegistry[opts.Config.EventHistory.Service.Name] = eventhistory.NewSutureService
// populate delayed services
s.Delayed[opts.Config.Sharing.Service.Name] = sharing.NewSutureService

View File

@@ -1,15 +1,29 @@
# Eventhistory service
# Eventhistory Service
The `eventhistory` consumes all events from the configured event systems, stores them and allows to retrieve them via an eventid
The `eventhistory` consumes all events from the configured event system like NATS, stores them and allows other services to retrieve them via an eventid.
## Prerequisites
Running the eventhistory service without an event sytem like NATS is not possible.
## Consuming
The `eventhistory` services consumes all events from the configured event sytem. Running it without an event sytem is not possible.
The `eventhistory` services consumes all events from the configured event sytem.
## Storing
The `eventhistory` stores each consumed event in the configured store. Possible stores are ? and ? but not ?.
The `eventhistory` service stores each consumed event via the configured store in `EVENTHISTORY_STORE_TYPE`. Possible stores are:
- `mem`: Basic in-memory store and the default.
- `ocmem`: Advanced in-memory store allowing max size.
- `redis`: Stores data in a configured redis cluster.
- `etcd`: Stores data in a configured etcd cluster.
- `nats-js`: Stores data using key-value-store feature of [nats jetstream](https://docs.nats.io/nats-concepts/jetstream/key-value-store)
- `noop`: Stores nothing. Useful for testing. Not recommended in productive enviroments.
1. Note that in-memory stores are by nature not reboot persistent.
2. Though usually not necessary, a database name and a database table can be configured for event stores if the event store supports this. Generally not applicapable for stores of type `in-memory`. These settings are blank by default which means that the standard settings of the configured store applies.
3. Events stay in the store for 2 weeks by default. Use `EVENTHISTORY_RECORD_EXPIRY` to adjust this value.
## Retrieving
Other services can call the `eventhistory` service via a grpc call to retrieve events. The request must contain the eventid that should be retrieved
Other services can call the `eventhistory` service via a grpc call to retrieve events. The request must contain the eventid that should be retrieved.

View File

@@ -45,7 +45,7 @@ type SutureService struct {
func NewSutureService(cfg *ociscfg.Config) suture.Service {
cfg.Notifications.Commons = cfg.Commons
return SutureService{
//cfg: cfg.Notifications,
cfg: cfg.EventHistory,
}
}

View File

@@ -3,11 +3,13 @@ package command
import (
"context"
"fmt"
"strings"
"github.com/cs3org/reva/v2/pkg/events/stream"
"github.com/oklog/run"
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
ogrpc "github.com/owncloud/ocis/v2/ocis-pkg/service/grpc"
"github.com/owncloud/ocis/v2/ocis-pkg/store"
"github.com/owncloud/ocis/v2/ocis-pkg/version"
"github.com/owncloud/ocis/v2/services/eventhistory/pkg/config"
"github.com/owncloud/ocis/v2/services/eventhistory/pkg/config/parser"
@@ -15,7 +17,6 @@ import (
"github.com/owncloud/ocis/v2/services/eventhistory/pkg/metrics"
"github.com/owncloud/ocis/v2/services/eventhistory/pkg/server/grpc"
"github.com/urfave/cli/v2"
"go-micro.dev/v4/store"
)
// Server is the entrypoint for the server command.
@@ -54,8 +55,13 @@ func Server(cfg *config.Config) *cli.Command {
return err
}
// TODO: configure store
st := store.DefaultStore
st := store.Create(
store.Type(cfg.Store.Type),
store.Addresses(strings.Split(cfg.Store.Addresses, ",")...),
store.Database(cfg.Store.Database),
store.Table(cfg.Store.Table),
store.TTL(cfg.Store.RecordExpiry),
)
service := grpc.NewService(
grpc.Logger(logger),
@@ -69,11 +75,11 @@ func Server(cfg *config.Config) *cli.Command {
grpc.Store(st),
)
gr.Add(service.Run, func(_ error) {
gr.Add(service.Run, func(err error) {
logger.Error().
Err(err).
Str("server", "grpc").
Msg("Shutting down server")
Msg("Shutting Down server")
cancel()
})

View File

@@ -34,7 +34,12 @@ type GRPCConfig struct {
// Store configures the store to use
type Store struct {
RecordExpiry time.Duration `yaml:"record_expiry" env:"RECORD_EXPIRY" desc:"time to life for events in the store"`
Type string `yaml:"type" env:"EVENTHISTORY_STORE_TYPE" desc:"The type of the eventhistory store. Supported values are: 'mem', 'ocmem', 'etcd', 'redis', 'nats-js', 'noop'. See the text description for details."`
Addresses string `yaml:"addresses" env:"EVENTHISTORY_STORE_ADDRESSES" desc:"A comma separated list of addresses to access the configured store. This has no effect when 'in-memory' stores are configured. Note that the behaviour how addresses are used is dependent on the library of the configured store."`
Database string `yaml:"database" env:"EVENTHISTORY_STORE_DATABASE" desc:"(optional) The database name the configured store should use. This has no effect when 'in-memory' stores are configured."`
Table string `yaml:"table" env:"EVENTHISTORY_STORE_TABLE" desc:"(optional) The database table the store should use. This has no effect when 'in-memory' stores are configured."`
RecordExpiry time.Duration `yaml:"record_expiry" env:"EVENTHISTORY_RECORD_EXPIRY" desc:"Time to life for events in the store. The duration can be set as number followed by a unit identifier like s, m or h. Defaults to '336h' (2 weeks)."`
Size int `yaml:"size" env:"EVENTHISTORY_STORE_SIZE" desc:"The maximum quantity of items in the store. Only applies when store type 'ocmem' is configured. Defaults to 512."`
}
// Events combines the configuration options for the event bus.

View File

@@ -1,6 +1,8 @@
package defaults
import (
"time"
"github.com/owncloud/ocis/v2/ocis-pkg/shared"
"github.com/owncloud/ocis/v2/services/eventhistory/pkg/config"
)
@@ -19,6 +21,19 @@ func DefaultConfig() *config.Config {
Service: config.Service{
Name: "eventhistory",
},
Events: config.Events{
Endpoint: "127.0.0.1:9233",
Cluster: "ocis-cluster",
EnableTLS: false,
},
Store: config.Store{
Type: "mem",
RecordExpiry: 336 * time.Hour,
},
GRPC: config.GRPCConfig{
Addr: "127.0.0.1:0",
Namespace: "com.owncloud.api",
},
}
}
@@ -42,6 +57,15 @@ func EnsureDefaults(cfg *config.Config) {
cfg.GRPCClientTLS = cfg.Commons.GRPCClientTLS
}
}
if cfg.GRPC.TLS == nil {
cfg.GRPC.TLS = &shared.GRPCServiceTLS{}
if cfg.Commons != nil && cfg.Commons.GRPCServiceTLS != nil {
cfg.GRPC.TLS.Enabled = cfg.Commons.GRPCServiceTLS.Enabled
cfg.GRPC.TLS.Cert = cfg.Commons.GRPCServiceTLS.Cert
cfg.GRPC.TLS.Key = cfg.Commons.GRPCServiceTLS.Key
}
}
}
// Sanitize sanitizes the config

View File

@@ -31,7 +31,7 @@ func NewService(opts ...Option) grpc.Service {
return grpc.Service{}
}
eh, err := svc.NewEventHistoryService(options.Config, options.Consumer, options.Store)
eh, err := svc.NewEventHistoryService(options.Config, options.Consumer, options.Store, options.Logger)
if err != nil {
options.Logger.Fatal().Err(err).Msg("Error creating event history service")
return grpc.Service{}

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
ehmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/eventhistory/v0"
ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0"
"github.com/owncloud/ocis/v2/services/eventhistory/pkg/config"
@@ -16,10 +17,11 @@ type EventHistoryService struct {
ch <-chan events.Event
store store.Store
cfg *config.Config
log log.Logger
}
// NewEventHistoryService returns an EventHistory service
func NewEventHistoryService(cfg *config.Config, consumer events.Consumer, store store.Store) (*EventHistoryService, error) {
func NewEventHistoryService(cfg *config.Config, consumer events.Consumer, store store.Store, log log.Logger) (*EventHistoryService, error) {
if consumer == nil || store == nil {
return nil, fmt.Errorf("Need non nil consumer (%v) and store (%v) to work properly", consumer, store)
}
@@ -29,7 +31,7 @@ func NewEventHistoryService(cfg *config.Config, consumer events.Consumer, store
return nil, err
}
eh := &EventHistoryService{ch: ch, store: store, cfg: cfg}
eh := &EventHistoryService{ch: ch, store: store, cfg: cfg, log: log}
go eh.StoreEvents()
return eh, nil
@@ -42,8 +44,12 @@ func (eh *EventHistoryService) StoreEvents() {
Key: event.ID,
Value: event.Event.([]byte),
Expiry: eh.cfg.Store.RecordExpiry,
Metadata: map[string]interface{}{
"type": event.Type,
},
}); err != nil {
// we can't store. That's it for us.
eh.log.Error().Err(err).Str("eventid", event.ID).Msg("could not store event")
return
}
}
@@ -54,15 +60,16 @@ func (eh *EventHistoryService) GetEvents(ctx context.Context, req *ehsvc.GetEven
for _, id := range req.Ids {
evs, err := eh.store.Read(id)
if err != nil {
// TODO: Handle!
// return?
// gather errors and add to response?
if err != store.ErrNotFound {
eh.log.Error().Err(err).Str("eventid", id).Msg("could not read event")
}
continue
}
resp.Events = append(resp.Events, &ehmsg.Event{
Id: id,
Event: evs[0].Value,
Type: evs[0].Metadata["type"].(string),
})
}

View File

@@ -0,0 +1,13 @@
package service_test
import (
"testing"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
func TestSearch(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Service Suite")
}

View File

@@ -1,3 +1,89 @@
package service_test
// tests here
import (
"context"
"encoding/json"
"reflect"
"time"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/google/uuid"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/ocis-pkg/store"
ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0"
"github.com/owncloud/ocis/v2/services/eventhistory/pkg/config"
"github.com/owncloud/ocis/v2/services/eventhistory/pkg/service"
microevents "go-micro.dev/v4/events"
microstore "go-micro.dev/v4/store"
)
var _ = Describe("EventHistoryService", func() {
var (
cfg = &config.Config{}
eh *service.EventHistoryService
bus testBus
sto microstore.Store
)
BeforeEach(func() {
var err error
sto = store.Create()
bus = testBus(make(chan events.Event))
eh, err = service.NewEventHistoryService(cfg, bus, sto, log.Logger{})
Expect(err).ToNot(HaveOccurred())
})
AfterEach(func() {
close(bus)
})
It("Records events, stores them and allows to retrieve them", func() {
id := bus.Publish(events.UploadReady{})
// service will store eventually
time.Sleep(500 * time.Millisecond)
resp := &ehsvc.GetEventsResponse{}
err := eh.GetEvents(context.Background(), &ehsvc.GetEventsRequest{Ids: []string{id}}, resp)
Expect(err).ToNot(HaveOccurred())
Expect(resp).ToNot(BeNil())
Expect(len(resp.Events)).To(Equal(1))
Expect(resp.Events[0].Id).To(Equal(id))
})
})
type testBus chan events.Event
func (tb testBus) Consume(_ string, _ ...microevents.ConsumeOption) (<-chan microevents.Event, error) {
ch := make(chan microevents.Event)
go func() {
for ev := range tb {
b, _ := json.Marshal(ev.Event)
ch <- microevents.Event{
Payload: b,
Metadata: map[string]string{
events.MetadatakeyEventID: ev.ID,
events.MetadatakeyEventType: ev.Type,
},
}
}
}()
return ch, nil
}
func (tb testBus) Publish(e interface{}) string {
ev := events.Event{
ID: uuid.New().String(),
Type: reflect.TypeOf(e).String(),
Event: e,
}
tb <- ev
return ev.ID
}

View File

@@ -7,6 +7,7 @@ import (
"net/http"
"os"
"strconv"
"strings"
"time"
"github.com/go-chi/chi/v5"
@@ -156,10 +157,10 @@ func NewService(opts ...Option) (Graph, error) {
roleManager := options.RoleManager
if roleManager == nil {
storeOptions := store.OcisStoreOptions{
Type: options.Config.CacheStore.Type,
Address: options.Config.CacheStore.Address,
Size: options.Config.CacheStore.Size,
storeOptions := []store.Option{
store.Type(options.Config.CacheStore.Type),
store.Addresses(strings.Split(options.Config.CacheStore.Address, ",")...),
store.Size(options.Config.CacheStore.Size),
}
m := roles.NewManager(
roles.StoreOptions(storeOptions),