Merge pull request #5600 from kobergj/EventHistory

Eventhistory Service
This commit is contained in:
kobergj
2023-02-21 14:08:01 +01:00
committed by GitHub
40 changed files with 1721 additions and 29 deletions

View File

@@ -58,6 +58,7 @@ config = {
"services/auth-basic",
"services/auth-bearer",
"services/auth-machine",
"services/eventhistory",
"services/frontend",
"services/gateway",
"services/graph",

View File

@@ -22,6 +22,7 @@ OCIS_MODULES = \
services/auth-basic \
services/auth-bearer \
services/auth-machine \
services/eventhistory \
services/frontend \
services/gateway \
services/graph \

View File

@@ -0,0 +1,5 @@
Enhancement: Eventhistory service
Introduces the `eventhistory` service. It is a service that stores events and provides a grpc API to retrieve them.
https://github.com/owncloud/ocis/pull/5600

View File

@@ -9,6 +9,7 @@ import (
authbasic "github.com/owncloud/ocis/v2/services/auth-basic/pkg/config"
authbearer "github.com/owncloud/ocis/v2/services/auth-bearer/pkg/config"
authmachine "github.com/owncloud/ocis/v2/services/auth-machine/pkg/config"
eventhistory "github.com/owncloud/ocis/v2/services/eventhistory/pkg/config"
frontend "github.com/owncloud/ocis/v2/services/frontend/pkg/config"
gateway "github.com/owncloud/ocis/v2/services/gateway/pkg/config"
graph "github.com/owncloud/ocis/v2/services/graph/pkg/config"
@@ -84,6 +85,7 @@ type Config struct {
AuthBasic *authbasic.Config `yaml:"auth_basic"`
AuthBearer *authbearer.Config `yaml:"auth_bearer"`
AuthMachine *authmachine.Config `yaml:"auth_machine"`
EventHistory *eventhistory.Config `yaml:"eventhistory"`
Frontend *frontend.Config `yaml:"frontend"`
Gateway *gateway.Config `yaml:"gateway"`
Graph *graph.Config `yaml:"graph"`

View File

@@ -7,6 +7,7 @@ import (
authbasic "github.com/owncloud/ocis/v2/services/auth-basic/pkg/config/defaults"
authbearer "github.com/owncloud/ocis/v2/services/auth-bearer/pkg/config/defaults"
authmachine "github.com/owncloud/ocis/v2/services/auth-machine/pkg/config/defaults"
eventhistory "github.com/owncloud/ocis/v2/services/eventhistory/pkg/config/defaults"
frontend "github.com/owncloud/ocis/v2/services/frontend/pkg/config/defaults"
gateway "github.com/owncloud/ocis/v2/services/gateway/pkg/config/defaults"
graph "github.com/owncloud/ocis/v2/services/graph/pkg/config/defaults"
@@ -48,6 +49,7 @@ func DefaultConfig() *Config {
AuthBasic: authbasic.DefaultConfig(),
AuthBearer: authbearer.DefaultConfig(),
AuthMachine: authmachine.DefaultConfig(),
EventHistory: eventhistory.DefaultConfig(),
Frontend: frontend.DefaultConfig(),
Gateway: gateway.DefaultConfig(),
Graph: graph.DefaultConfig(),

View File

@@ -29,7 +29,7 @@ type Manager struct {
func NewManager(o ...Option) Manager {
opts := newOptions(o...)
nStore := ocisstore.GetStore(opts.storeOptions)
nStore := ocisstore.Create(opts.storeOptions...)
return Manager{
cache: nStore,
roleService: opts.roleService,

View File

@@ -8,7 +8,7 @@ import (
// Options are all the possible options.
type Options struct {
storeOptions ocisstore.OcisStoreOptions
storeOptions []ocisstore.Option
logger log.Logger
roleService settingssvc.RoleService
}
@@ -30,7 +30,8 @@ func RoleService(rs settingssvc.RoleService) Option {
}
}
func StoreOptions(storeOpts ocisstore.OcisStoreOptions) Option {
// StoreOptions are the options for the store
func StoreOptions(storeOpts []ocisstore.Option) Option {
return func(o *Options) {
o.storeOptions = storeOpts
}

50
ocis-pkg/store/options.go Normal file
View File

@@ -0,0 +1,50 @@
package store
import "time"
// Option provides an option to configure the store
type Option func(*Options)
// Type defines the type of the store
func Type(typ string) Option {
return func(o *Options) {
o.Type = typ
}
}
// Addresses defines the addresses where the store can be reached
func Addresses(addrs ...string) Option {
return func(o *Options) {
o.Addresses = addrs
}
}
// Database defines the Database the store should use
func Database(db string) Option {
return func(o *Options) {
o.Database = db
}
}
// Table defines the table the store should use
func Table(t string) Option {
return func(o *Options) {
o.Table = t
}
}
// Size defines the maximum capacity of the store.
// Only applicable when using "ocmem" store
func Size(s int) Option {
return func(o *Options) {
o.Size = s
}
}
// TTL defines the time to life for elements in the store.
// Only applicable when using "natsjs" store
func TTL(t time.Duration) Option {
return func(o *Options) {
o.TTL = t
}
}

View File

@@ -2,8 +2,11 @@ package store
import (
"context"
"strings"
"time"
natsjs "github.com/go-micro/plugins/v4/store/nats-js"
"github.com/go-micro/plugins/v4/store/redis"
"github.com/nats-io/nats.go"
"github.com/owncloud/ocis/v2/ocis-pkg/store/etcd"
"github.com/owncloud/ocis/v2/ocis-pkg/store/memory"
"go-micro.dev/v4/store"
@@ -11,7 +14,8 @@ import (
var ocMemStore *store.Store
type OcisStoreOptions struct {
// Options are the options to configure the store
type Options struct {
// Type determines the implementation:
// * "noop", for a noop store (it does nothing)
// * "etcd", for etcd
@@ -20,10 +24,8 @@ type OcisStoreOptions struct {
// * "memory", for a in-memory implementation, which is the default if noone matches
Type string
// Address is a comma-separated list of nodes that the store
// will use. This is currently usable only with the etcd implementation. If it
// isn't provided, "127.0.0.1:2379" will be the only node used.
Address string
// Address is a list of nodes that the store will use.
Addresses []string
// Size configures the maximum capacity of the cache for
// the "ocmem" implementation, in number of items that the cache can hold per table.
@@ -31,33 +33,50 @@ type OcisStoreOptions struct {
// The parameter only affects to the "ocmem" implementation, the rest will ignore it.
// If an invalid value is used, the default of 512 will be used instead.
Size int
// Database the store should use (optional)
Database string
// Table the store should use (optional)
Table string
// TTL is the time to life for documents stored in the store
TTL time.Duration
}
// GetStore returns a configured key-value store
// Create returns a configured key-value store
//
// Each microservice (or whatever piece is using the store) should use the
// options available in the interface's operations to choose the right database
// and table to prevent collisions with other microservices.
// Recommended approach is to use "services" or "ocis-pkg" for the database,
// and "services/<service-name>/" or "ocis-pkg/<pkg>/" for the package name.
func GetStore(ocisOpts OcisStoreOptions) store.Store {
var s store.Store
addresses := strings.Split(ocisOpts.Address, ",")
opts := []store.Option{
store.Nodes(addresses...),
func Create(opts ...Option) store.Store {
options := &Options{}
for _, o := range opts {
o(options)
}
switch ocisOpts.Type {
storeopts := storeOptions(options)
switch options.Type {
default:
// TODO: better to error in default case?
fallthrough
case "mem":
return store.NewMemoryStore(storeopts...)
case "noop":
s = store.NewNoopStore(opts...)
return store.NewNoopStore(storeopts...)
case "etcd":
s = etcd.NewEtcdStore(opts...)
return etcd.NewEtcdStore(storeopts...)
case "redis":
// FIXME redis plugin does not support redis cluster, sentinel or ring -> needs upstream patch or our implementation
return redis.NewStore(storeopts...)
case "ocmem":
if ocMemStore == nil {
var memStore store.Store
sizeNum := ocisOpts.Size
sizeNum := options.Size
if sizeNum <= 0 {
memStore = memory.NewMultiMemStore()
} else {
@@ -73,9 +92,37 @@ func GetStore(ocisOpts OcisStoreOptions) store.Store {
}
ocMemStore = &memStore
}
s = *ocMemStore
default:
s = store.NewMemoryStore(opts...)
return *ocMemStore
case "nats-js":
// TODO nats needs a DefaultTTL option as it does not support per Write TTL ...
// FIXME nats has restrictions on the key, we cannot use slashes AFAICT
// host, port, clusterid
return natsjs.NewStore(
append(storeopts,
natsjs.NatsOptions(nats.Options{Name: "TODO"}),
natsjs.DefaultTTL(options.TTL),
)...,
) // TODO test with ocis nats
}
return s
}
func storeOptions(o *Options) []store.Option {
var opts []store.Option
if o.Addresses != nil {
opts = append(opts, store.Nodes(o.Addresses...))
}
if o.Database != "" {
opts = append(opts, store.Database(o.Database))
}
if o.Table != "" {
opts = append(opts, store.Table(o.Table))
}
return opts
}

View File

@@ -24,6 +24,7 @@ import (
appRegistry "github.com/owncloud/ocis/v2/services/app-registry/pkg/command"
authbasic "github.com/owncloud/ocis/v2/services/auth-basic/pkg/command"
authmachine "github.com/owncloud/ocis/v2/services/auth-machine/pkg/command"
eventhistory "github.com/owncloud/ocis/v2/services/eventhistory/pkg/command"
frontend "github.com/owncloud/ocis/v2/services/frontend/pkg/command"
gateway "github.com/owncloud/ocis/v2/services/gateway/pkg/command"
graph "github.com/owncloud/ocis/v2/services/graph/pkg/command"
@@ -129,6 +130,7 @@ func NewService(options ...Option) (*Service, error) {
s.ServicesRegistry[opts.Config.Notifications.Service.Name] = notifications.NewSutureService
s.ServicesRegistry[opts.Config.Search.Service.Name] = search.NewSutureService
s.ServicesRegistry[opts.Config.Postprocessing.Service.Name] = postprocessing.NewSutureService
s.ServicesRegistry[opts.Config.EventHistory.Service.Name] = eventhistory.NewSutureService
// populate delayed services
s.Delayed[opts.Config.Sharing.Service.Name] = sharing.NewSutureService

View File

@@ -0,0 +1,170 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.1
// protoc (unknown)
// source: ocis/messages/eventhistory/v0/eventhistory.proto
package v0
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type Event struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// REQUIRED.
Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"`
// REQUIRED.
Id string `protobuf:"bytes,2,opt,name=id,proto3" json:"id,omitempty"`
// REQUIRED
Event []byte `protobuf:"bytes,3,opt,name=event,proto3" json:"event,omitempty"`
}
func (x *Event) Reset() {
*x = Event{}
if protoimpl.UnsafeEnabled {
mi := &file_ocis_messages_eventhistory_v0_eventhistory_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *Event) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Event) ProtoMessage() {}
func (x *Event) ProtoReflect() protoreflect.Message {
mi := &file_ocis_messages_eventhistory_v0_eventhistory_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Event.ProtoReflect.Descriptor instead.
func (*Event) Descriptor() ([]byte, []int) {
return file_ocis_messages_eventhistory_v0_eventhistory_proto_rawDescGZIP(), []int{0}
}
func (x *Event) GetType() string {
if x != nil {
return x.Type
}
return ""
}
func (x *Event) GetId() string {
if x != nil {
return x.Id
}
return ""
}
func (x *Event) GetEvent() []byte {
if x != nil {
return x.Event
}
return nil
}
var File_ocis_messages_eventhistory_v0_eventhistory_proto protoreflect.FileDescriptor
var file_ocis_messages_eventhistory_v0_eventhistory_proto_rawDesc = []byte{
0x0a, 0x30, 0x6f, 0x63, 0x69, 0x73, 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2f,
0x65, 0x76, 0x65, 0x6e, 0x74, 0x68, 0x69, 0x73, 0x74, 0x6f, 0x72, 0x79, 0x2f, 0x76, 0x30, 0x2f,
0x65, 0x76, 0x65, 0x6e, 0x74, 0x68, 0x69, 0x73, 0x74, 0x6f, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x12, 0x1d, 0x6f, 0x63, 0x69, 0x73, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
0x73, 0x2e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x68, 0x69, 0x73, 0x74, 0x6f, 0x72, 0x79, 0x2e, 0x76,
0x30, 0x22, 0x41, 0x0a, 0x05, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79,
0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x0e,
0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x14,
0x0a, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x65,
0x76, 0x65, 0x6e, 0x74, 0x42, 0x48, 0x5a, 0x46, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63,
0x6f, 0x6d, 0x2f, 0x6f, 0x77, 0x6e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2f, 0x6f, 0x63, 0x69, 0x73,
0x2f, 0x76, 0x32, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x67, 0x65, 0x6e, 0x2f, 0x67, 0x65, 0x6e,
0x2f, 0x6f, 0x63, 0x69, 0x73, 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2f, 0x65,
0x76, 0x65, 0x6e, 0x74, 0x68, 0x69, 0x73, 0x74, 0x6f, 0x72, 0x79, 0x2f, 0x76, 0x30, 0x62, 0x06,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_ocis_messages_eventhistory_v0_eventhistory_proto_rawDescOnce sync.Once
file_ocis_messages_eventhistory_v0_eventhistory_proto_rawDescData = file_ocis_messages_eventhistory_v0_eventhistory_proto_rawDesc
)
func file_ocis_messages_eventhistory_v0_eventhistory_proto_rawDescGZIP() []byte {
file_ocis_messages_eventhistory_v0_eventhistory_proto_rawDescOnce.Do(func() {
file_ocis_messages_eventhistory_v0_eventhistory_proto_rawDescData = protoimpl.X.CompressGZIP(file_ocis_messages_eventhistory_v0_eventhistory_proto_rawDescData)
})
return file_ocis_messages_eventhistory_v0_eventhistory_proto_rawDescData
}
var file_ocis_messages_eventhistory_v0_eventhistory_proto_msgTypes = make([]protoimpl.MessageInfo, 1)
var file_ocis_messages_eventhistory_v0_eventhistory_proto_goTypes = []interface{}{
(*Event)(nil), // 0: ocis.messages.eventhistory.v0.Event
}
var file_ocis_messages_eventhistory_v0_eventhistory_proto_depIdxs = []int32{
0, // [0:0] is the sub-list for method output_type
0, // [0:0] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_ocis_messages_eventhistory_v0_eventhistory_proto_init() }
func file_ocis_messages_eventhistory_v0_eventhistory_proto_init() {
if File_ocis_messages_eventhistory_v0_eventhistory_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_ocis_messages_eventhistory_v0_eventhistory_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Event); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_ocis_messages_eventhistory_v0_eventhistory_proto_rawDesc,
NumEnums: 0,
NumMessages: 1,
NumExtensions: 0,
NumServices: 0,
},
GoTypes: file_ocis_messages_eventhistory_v0_eventhistory_proto_goTypes,
DependencyIndexes: file_ocis_messages_eventhistory_v0_eventhistory_proto_depIdxs,
MessageInfos: file_ocis_messages_eventhistory_v0_eventhistory_proto_msgTypes,
}.Build()
File_ocis_messages_eventhistory_v0_eventhistory_proto = out.File
file_ocis_messages_eventhistory_v0_eventhistory_proto_rawDesc = nil
file_ocis_messages_eventhistory_v0_eventhistory_proto_goTypes = nil
file_ocis_messages_eventhistory_v0_eventhistory_proto_depIdxs = nil
}

View File

@@ -0,0 +1,15 @@
// Code generated by protoc-gen-micro. DO NOT EDIT.
// source: ocis/messages/eventhistory/v0/eventhistory.proto
package v0
import (
fmt "fmt"
proto "google.golang.org/protobuf/proto"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf

View File

@@ -0,0 +1,43 @@
{
"swagger": "2.0",
"info": {
"title": "ocis/messages/eventhistory/v0/eventhistory.proto",
"version": "version not set"
},
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"paths": {},
"definitions": {
"protobufAny": {
"type": "object",
"properties": {
"@type": {
"type": "string"
}
},
"additionalProperties": {}
},
"rpcStatus": {
"type": "object",
"properties": {
"code": {
"type": "integer",
"format": "int32"
},
"message": {
"type": "string"
},
"details": {
"type": "array",
"items": {
"$ref": "#/definitions/protobufAny"
}
}
}
}
}
}

View File

@@ -0,0 +1,256 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.1
// protoc (unknown)
// source: ocis/services/eventhistory/v0/eventhistory.proto
package v0
import (
_ "github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-openapiv2/options"
v0 "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/eventhistory/v0"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
// A request to retrieve events
type GetEventsRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// the ids of the events we want to get
Ids []string `protobuf:"bytes,1,rep,name=ids,proto3" json:"ids,omitempty"`
}
func (x *GetEventsRequest) Reset() {
*x = GetEventsRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_ocis_services_eventhistory_v0_eventhistory_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *GetEventsRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*GetEventsRequest) ProtoMessage() {}
func (x *GetEventsRequest) ProtoReflect() protoreflect.Message {
mi := &file_ocis_services_eventhistory_v0_eventhistory_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use GetEventsRequest.ProtoReflect.Descriptor instead.
func (*GetEventsRequest) Descriptor() ([]byte, []int) {
return file_ocis_services_eventhistory_v0_eventhistory_proto_rawDescGZIP(), []int{0}
}
func (x *GetEventsRequest) GetIds() []string {
if x != nil {
return x.Ids
}
return nil
}
// The service response
type GetEventsResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Events []*v0.Event `protobuf:"bytes,1,rep,name=events,proto3" json:"events,omitempty"`
}
func (x *GetEventsResponse) Reset() {
*x = GetEventsResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_ocis_services_eventhistory_v0_eventhistory_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *GetEventsResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*GetEventsResponse) ProtoMessage() {}
func (x *GetEventsResponse) ProtoReflect() protoreflect.Message {
mi := &file_ocis_services_eventhistory_v0_eventhistory_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use GetEventsResponse.ProtoReflect.Descriptor instead.
func (*GetEventsResponse) Descriptor() ([]byte, []int) {
return file_ocis_services_eventhistory_v0_eventhistory_proto_rawDescGZIP(), []int{1}
}
func (x *GetEventsResponse) GetEvents() []*v0.Event {
if x != nil {
return x.Events
}
return nil
}
var File_ocis_services_eventhistory_v0_eventhistory_proto protoreflect.FileDescriptor
var file_ocis_services_eventhistory_v0_eventhistory_proto_rawDesc = []byte{
0x0a, 0x30, 0x6f, 0x63, 0x69, 0x73, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f,
0x65, 0x76, 0x65, 0x6e, 0x74, 0x68, 0x69, 0x73, 0x74, 0x6f, 0x72, 0x79, 0x2f, 0x76, 0x30, 0x2f,
0x65, 0x76, 0x65, 0x6e, 0x74, 0x68, 0x69, 0x73, 0x74, 0x6f, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x12, 0x1d, 0x6f, 0x63, 0x69, 0x73, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65,
0x73, 0x2e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x68, 0x69, 0x73, 0x74, 0x6f, 0x72, 0x79, 0x2e, 0x76,
0x30, 0x1a, 0x30, 0x6f, 0x63, 0x69, 0x73, 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73,
0x2f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x68, 0x69, 0x73, 0x74, 0x6f, 0x72, 0x79, 0x2f, 0x76, 0x30,
0x2f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x68, 0x69, 0x73, 0x74, 0x6f, 0x72, 0x79, 0x2e, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x1a, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x2d, 0x67, 0x65, 0x6e, 0x2d,
0x6f, 0x70, 0x65, 0x6e, 0x61, 0x70, 0x69, 0x76, 0x32, 0x2f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e,
0x73, 0x2f, 0x61, 0x6e, 0x6e, 0x6f, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x22, 0x24, 0x0a, 0x10, 0x47, 0x65, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x69, 0x64, 0x73, 0x18, 0x01,
0x20, 0x03, 0x28, 0x09, 0x52, 0x03, 0x69, 0x64, 0x73, 0x22, 0x51, 0x0a, 0x11, 0x47, 0x65, 0x74,
0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3c,
0x0a, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x24,
0x2e, 0x6f, 0x63, 0x69, 0x73, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x65,
0x76, 0x65, 0x6e, 0x74, 0x68, 0x69, 0x73, 0x74, 0x6f, 0x72, 0x79, 0x2e, 0x76, 0x30, 0x2e, 0x45,
0x76, 0x65, 0x6e, 0x74, 0x52, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x32, 0x85, 0x01, 0x0a,
0x13, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x72, 0x79, 0x53, 0x65, 0x72,
0x76, 0x69, 0x63, 0x65, 0x12, 0x6e, 0x0a, 0x09, 0x47, 0x65, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74,
0x73, 0x12, 0x2f, 0x2e, 0x6f, 0x63, 0x69, 0x73, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65,
0x73, 0x2e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x68, 0x69, 0x73, 0x74, 0x6f, 0x72, 0x79, 0x2e, 0x76,
0x30, 0x2e, 0x47, 0x65, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x1a, 0x30, 0x2e, 0x6f, 0x63, 0x69, 0x73, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63,
0x65, 0x73, 0x2e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x68, 0x69, 0x73, 0x74, 0x6f, 0x72, 0x79, 0x2e,
0x76, 0x30, 0x2e, 0x47, 0x65, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x52, 0x65, 0x73, 0x70,
0x6f, 0x6e, 0x73, 0x65, 0x42, 0xeb, 0x02, 0x5a, 0x43, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e,
0x63, 0x6f, 0x6d, 0x2f, 0x6f, 0x77, 0x6e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2f, 0x6f, 0x63, 0x69,
0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x67, 0x65, 0x6e, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x6f,
0x63, 0x69, 0x73, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f, 0x65, 0x76, 0x65,
0x6e, 0x74, 0x68, 0x69, 0x73, 0x74, 0x6f, 0x72, 0x79, 0x2f, 0x76, 0x30, 0x92, 0x41, 0xa2, 0x02,
0x12, 0xb8, 0x01, 0x0a, 0x22, 0x6f, 0x77, 0x6e, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x20, 0x49, 0x6e,
0x66, 0x69, 0x6e, 0x69, 0x74, 0x65, 0x20, 0x53, 0x63, 0x61, 0x6c, 0x65, 0x20, 0x74, 0x68, 0x75,
0x6d, 0x62, 0x6e, 0x61, 0x69, 0x6c, 0x73, 0x22, 0x47, 0x0a, 0x0d, 0x6f, 0x77, 0x6e, 0x43, 0x6c,
0x6f, 0x75, 0x64, 0x20, 0x47, 0x6d, 0x62, 0x48, 0x12, 0x20, 0x68, 0x74, 0x74, 0x70, 0x73, 0x3a,
0x2f, 0x2f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6f, 0x77, 0x6e,
0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2f, 0x6f, 0x63, 0x69, 0x73, 0x1a, 0x14, 0x73, 0x75, 0x70, 0x70,
0x6f, 0x72, 0x74, 0x40, 0x6f, 0x77, 0x6e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x63, 0x6f, 0x6d,
0x2a, 0x42, 0x0a, 0x0a, 0x41, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2d, 0x32, 0x2e, 0x30, 0x12, 0x34,
0x68, 0x74, 0x74, 0x70, 0x73, 0x3a, 0x2f, 0x2f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63,
0x6f, 0x6d, 0x2f, 0x6f, 0x77, 0x6e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2f, 0x6f, 0x63, 0x69, 0x73,
0x2f, 0x62, 0x6c, 0x6f, 0x62, 0x2f, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x2f, 0x4c, 0x49, 0x43,
0x45, 0x4e, 0x53, 0x45, 0x32, 0x05, 0x31, 0x2e, 0x30, 0x2e, 0x30, 0x2a, 0x02, 0x01, 0x02, 0x32,
0x10, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x6a, 0x73, 0x6f,
0x6e, 0x3a, 0x10, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x6a,
0x73, 0x6f, 0x6e, 0x72, 0x3d, 0x0a, 0x10, 0x44, 0x65, 0x76, 0x65, 0x6c, 0x6f, 0x70, 0x65, 0x72,
0x20, 0x4d, 0x61, 0x6e, 0x75, 0x61, 0x6c, 0x12, 0x29, 0x68, 0x74, 0x74, 0x70, 0x73, 0x3a, 0x2f,
0x2f, 0x6f, 0x77, 0x6e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x64, 0x65, 0x76, 0x2f, 0x73, 0x65,
0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f, 0x74, 0x68, 0x75, 0x6d, 0x62, 0x6e, 0x61, 0x69, 0x6c,
0x73, 0x2f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_ocis_services_eventhistory_v0_eventhistory_proto_rawDescOnce sync.Once
file_ocis_services_eventhistory_v0_eventhistory_proto_rawDescData = file_ocis_services_eventhistory_v0_eventhistory_proto_rawDesc
)
func file_ocis_services_eventhistory_v0_eventhistory_proto_rawDescGZIP() []byte {
file_ocis_services_eventhistory_v0_eventhistory_proto_rawDescOnce.Do(func() {
file_ocis_services_eventhistory_v0_eventhistory_proto_rawDescData = protoimpl.X.CompressGZIP(file_ocis_services_eventhistory_v0_eventhistory_proto_rawDescData)
})
return file_ocis_services_eventhistory_v0_eventhistory_proto_rawDescData
}
var file_ocis_services_eventhistory_v0_eventhistory_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_ocis_services_eventhistory_v0_eventhistory_proto_goTypes = []interface{}{
(*GetEventsRequest)(nil), // 0: ocis.services.eventhistory.v0.GetEventsRequest
(*GetEventsResponse)(nil), // 1: ocis.services.eventhistory.v0.GetEventsResponse
(*v0.Event)(nil), // 2: ocis.messages.eventhistory.v0.Event
}
var file_ocis_services_eventhistory_v0_eventhistory_proto_depIdxs = []int32{
2, // 0: ocis.services.eventhistory.v0.GetEventsResponse.events:type_name -> ocis.messages.eventhistory.v0.Event
0, // 1: ocis.services.eventhistory.v0.EventHistoryService.GetEvents:input_type -> ocis.services.eventhistory.v0.GetEventsRequest
1, // 2: ocis.services.eventhistory.v0.EventHistoryService.GetEvents:output_type -> ocis.services.eventhistory.v0.GetEventsResponse
2, // [2:3] is the sub-list for method output_type
1, // [1:2] is the sub-list for method input_type
1, // [1:1] is the sub-list for extension type_name
1, // [1:1] is the sub-list for extension extendee
0, // [0:1] is the sub-list for field type_name
}
func init() { file_ocis_services_eventhistory_v0_eventhistory_proto_init() }
func file_ocis_services_eventhistory_v0_eventhistory_proto_init() {
if File_ocis_services_eventhistory_v0_eventhistory_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_ocis_services_eventhistory_v0_eventhistory_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*GetEventsRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_ocis_services_eventhistory_v0_eventhistory_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*GetEventsResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_ocis_services_eventhistory_v0_eventhistory_proto_rawDesc,
NumEnums: 0,
NumMessages: 2,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_ocis_services_eventhistory_v0_eventhistory_proto_goTypes,
DependencyIndexes: file_ocis_services_eventhistory_v0_eventhistory_proto_depIdxs,
MessageInfos: file_ocis_services_eventhistory_v0_eventhistory_proto_msgTypes,
}.Build()
File_ocis_services_eventhistory_v0_eventhistory_proto = out.File
file_ocis_services_eventhistory_v0_eventhistory_proto_rawDesc = nil
file_ocis_services_eventhistory_v0_eventhistory_proto_goTypes = nil
file_ocis_services_eventhistory_v0_eventhistory_proto_depIdxs = nil
}

View File

@@ -0,0 +1,91 @@
// Code generated by protoc-gen-micro. DO NOT EDIT.
// source: ocis/services/eventhistory/v0/eventhistory.proto
package v0
import (
fmt "fmt"
_ "github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-openapiv2/options"
_ "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/eventhistory/v0"
proto "google.golang.org/protobuf/proto"
math "math"
)
import (
context "context"
api "go-micro.dev/v4/api"
client "go-micro.dev/v4/client"
server "go-micro.dev/v4/server"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// Reference imports to suppress errors if they are not otherwise used.
var _ api.Endpoint
var _ context.Context
var _ client.Option
var _ server.Option
// Api Endpoints for EventHistoryService service
func NewEventHistoryServiceEndpoints() []*api.Endpoint {
return []*api.Endpoint{}
}
// Client API for EventHistoryService service
type EventHistoryService interface {
// returns the specified events
GetEvents(ctx context.Context, in *GetEventsRequest, opts ...client.CallOption) (*GetEventsResponse, error)
}
type eventHistoryService struct {
c client.Client
name string
}
func NewEventHistoryService(name string, c client.Client) EventHistoryService {
return &eventHistoryService{
c: c,
name: name,
}
}
func (c *eventHistoryService) GetEvents(ctx context.Context, in *GetEventsRequest, opts ...client.CallOption) (*GetEventsResponse, error) {
req := c.c.NewRequest(c.name, "EventHistoryService.GetEvents", in)
out := new(GetEventsResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for EventHistoryService service
type EventHistoryServiceHandler interface {
// returns the specified events
GetEvents(context.Context, *GetEventsRequest, *GetEventsResponse) error
}
func RegisterEventHistoryServiceHandler(s server.Server, hdlr EventHistoryServiceHandler, opts ...server.HandlerOption) error {
type eventHistoryService interface {
GetEvents(ctx context.Context, in *GetEventsRequest, out *GetEventsResponse) error
}
type EventHistoryService struct {
eventHistoryService
}
h := &eventHistoryServiceHandler{hdlr}
return s.Handle(s.NewHandler(&EventHistoryService{h}, opts...))
}
type eventHistoryServiceHandler struct {
EventHistoryServiceHandler
}
func (h *eventHistoryServiceHandler) GetEvents(ctx context.Context, in *GetEventsRequest, out *GetEventsResponse) error {
return h.EventHistoryServiceHandler.GetEvents(ctx, in, out)
}

View File

@@ -0,0 +1,95 @@
{
"swagger": "2.0",
"info": {
"title": "ownCloud Infinite Scale thumbnails",
"version": "1.0.0",
"contact": {
"name": "ownCloud GmbH",
"url": "https://github.com/owncloud/ocis",
"email": "support@owncloud.com"
},
"license": {
"name": "Apache-2.0",
"url": "https://github.com/owncloud/ocis/blob/master/LICENSE"
}
},
"tags": [
{
"name": "EventHistoryService"
}
],
"schemes": [
"http",
"https"
],
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"paths": {},
"definitions": {
"protobufAny": {
"type": "object",
"properties": {
"@type": {
"type": "string"
}
},
"additionalProperties": {}
},
"rpcStatus": {
"type": "object",
"properties": {
"code": {
"type": "integer",
"format": "int32"
},
"message": {
"type": "string"
},
"details": {
"type": "array",
"items": {
"$ref": "#/definitions/protobufAny"
}
}
}
},
"v0Event": {
"type": "object",
"properties": {
"type": {
"type": "string",
"description": "REQUIRED."
},
"id": {
"type": "string",
"description": "REQUIRED."
},
"event": {
"type": "string",
"format": "byte",
"title": "REQUIRED"
}
}
},
"v0GetEventsResponse": {
"type": "object",
"properties": {
"events": {
"type": "array",
"items": {
"$ref": "#/definitions/v0Event"
}
}
},
"title": "The service response"
}
},
"externalDocs": {
"description": "Developer Manual",
"url": "https://owncloud.dev/services/thumbnails/"
}
}

View File

@@ -21,7 +21,9 @@ plugins:
ocis.services.thumbnails.v0;\
ocis.messages.thumbnails.v0;\
ocis.services.store.v0;\
ocis.messages.store.v0"
ocis.messages.store.v0;\
ocis.services.eventhistory.v0;\
ocis.messages.eventhistory.v0"
- name: openapiv2
path: ../../.bingo/protoc-gen-openapiv2

View File

@@ -0,0 +1,16 @@
syntax = "proto3";
package ocis.messages.eventhistory.v0;
option go_package = "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/eventhistory/v0";
message Event {
// REQUIRED.
string type = 1;
// REQUIRED.
string id = 2;
// REQUIRED
bytes event = 3;
}

View File

@@ -0,0 +1,50 @@
syntax = "proto3";
package ocis.services.eventhistory.v0;
option go_package = "github.com/owncloud/ocis/protogen/gen/ocis/services/eventhistory/v0";
import "ocis/messages/eventhistory/v0/eventhistory.proto";
import "protoc-gen-openapiv2/options/annotations.proto";
option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_swagger) = {
info: {
title: "ownCloud Infinite Scale thumbnails";
version: "1.0.0";
contact: {
name: "ownCloud GmbH";
url: "https://github.com/owncloud/ocis";
email: "support@owncloud.com";
};
license: {
name: "Apache-2.0";
url: "https://github.com/owncloud/ocis/blob/master/LICENSE";
};
};
schemes: HTTP;
schemes: HTTPS;
consumes: "application/json";
produces: "application/json";
external_docs: {
description: "Developer Manual";
url: "https://owncloud.dev/services/thumbnails/";
};
};
// A Service for storing events
service EventHistoryService {
// returns the specified events
rpc GetEvents(GetEventsRequest) returns (GetEventsResponse);
}
// A request to retrieve events
message GetEventsRequest {
// the ids of the events we want to get
repeated string ids = 1;
}
// The service response
message GetEventsResponse {
repeated ocis.messages.eventhistory.v0.Event events = 1;
}

View File

@@ -0,0 +1,37 @@
SHELL := bash
NAME := eventhistory
include ../../.make/recursion.mk
############ tooling ############
ifneq (, $(shell command -v go 2> /dev/null)) # suppress `command not found warnings` for non go targets in CI
include ../../.bingo/Variables.mk
endif
############ go tooling ############
include ../../.make/go.mk
############ release ############
include ../../.make/release.mk
############ docs generate ############
include ../../.make/docs.mk
.PHONY: docs-generate
docs-generate: config-docs-generate
############ generate ############
include ../../.make/generate.mk
.PHONY: ci-go-generate
ci-go-generate: # CI runs ci-node-generate automatically before this target
.PHONY: ci-node-generate
ci-node-generate:
############ licenses ############
.PHONY: ci-node-check-licenses
ci-node-check-licenses:
.PHONY: ci-node-save-licenses
ci-node-save-licenses:

View File

@@ -0,0 +1,29 @@
# Eventhistory Service
The `eventhistory` consumes all events from the configured event system like NATS, stores them and allows other services to retrieve them via an eventid.
## Prerequisites
Running the eventhistory service without an event sytem like NATS is not possible.
## Consuming
The `eventhistory` services consumes all events from the configured event sytem.
## Storing
The `eventhistory` service stores each consumed event via the configured store in `EVENTHISTORY_STORE_TYPE`. Possible stores are:
- `mem`: Basic in-memory store and the default.
- `ocmem`: Advanced in-memory store allowing max size.
- `redis`: Stores data in a configured redis cluster.
- `etcd`: Stores data in a configured etcd cluster.
- `nats-js`: Stores data using key-value-store feature of [nats jetstream](https://docs.nats.io/nats-concepts/jetstream/key-value-store)
- `noop`: Stores nothing. Useful for testing. Not recommended in productive enviroments.
1. Note that in-memory stores are by nature not reboot persistent.
2. Though usually not necessary, a database name and a database table can be configured for event stores if the event store supports this. Generally not applicapable for stores of type `in-memory`. These settings are blank by default which means that the standard settings of the configured store applies.
3. Events stay in the store for 2 weeks by default. Use `EVENTHISTORY_RECORD_EXPIRY` to adjust this value.
## Retrieving
Other services can call the `eventhistory` service via a grpc call to retrieve events. The request must contain the eventid that should be retrieved.

View File

@@ -0,0 +1,14 @@
package main
import (
"os"
"github.com/owncloud/ocis/v2/services/eventhistory/pkg/command"
"github.com/owncloud/ocis/v2/services/eventhistory/pkg/config/defaults"
)
func main() {
if err := command.Execute(defaults.DefaultConfig()); err != nil {
os.Exit(1)
}
}

View File

@@ -0,0 +1,18 @@
package command
import (
"github.com/owncloud/ocis/v2/services/eventhistory/pkg/config"
"github.com/urfave/cli/v2"
)
// Health is the entrypoint for the health command.
func Health(cfg *config.Config) *cli.Command {
return &cli.Command{
Name: "health",
Usage: "Check health status",
Action: func(c *cli.Context) error {
// Not implemented
return nil
},
}
}

View File

@@ -0,0 +1,59 @@
package command
import (
"context"
"os"
"github.com/owncloud/ocis/v2/ocis-pkg/clihelper"
ociscfg "github.com/owncloud/ocis/v2/ocis-pkg/config"
"github.com/owncloud/ocis/v2/services/eventhistory/pkg/config"
"github.com/thejerf/suture/v4"
"github.com/urfave/cli/v2"
)
// GetCommands provides all commands for this service
func GetCommands(cfg *config.Config) cli.Commands {
return []*cli.Command{
// start this service
Server(cfg),
// interaction with this service
// infos about this service
Health(cfg),
Version(cfg),
}
}
// Execute is the entry point for the eventhistory command.
func Execute(cfg *config.Config) error {
app := clihelper.DefaultApp(&cli.App{
Name: "eventhistory",
Usage: "starts eventhistory service",
Commands: GetCommands(cfg),
})
return app.Run(os.Args)
}
// SutureService allows for the eventhistory command to be embedded and supervised by a suture supervisor tree.
type SutureService struct {
cfg *config.Config
}
// NewSutureService creates a new eventhistory.SutureService
func NewSutureService(cfg *ociscfg.Config) suture.Service {
cfg.Notifications.Commons = cfg.Commons
return SutureService{
cfg: cfg.EventHistory,
}
}
func (s SutureService) Serve(ctx context.Context) error {
s.cfg.Context = ctx
if err := Execute(s.cfg); err != nil {
return err
}
return nil
}

View File

@@ -0,0 +1,91 @@
package command
import (
"context"
"fmt"
"strings"
"github.com/cs3org/reva/v2/pkg/events/stream"
"github.com/oklog/run"
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
ogrpc "github.com/owncloud/ocis/v2/ocis-pkg/service/grpc"
"github.com/owncloud/ocis/v2/ocis-pkg/store"
"github.com/owncloud/ocis/v2/ocis-pkg/version"
"github.com/owncloud/ocis/v2/services/eventhistory/pkg/config"
"github.com/owncloud/ocis/v2/services/eventhistory/pkg/config/parser"
"github.com/owncloud/ocis/v2/services/eventhistory/pkg/logging"
"github.com/owncloud/ocis/v2/services/eventhistory/pkg/metrics"
"github.com/owncloud/ocis/v2/services/eventhistory/pkg/server/grpc"
"github.com/urfave/cli/v2"
)
// Server is the entrypoint for the server command.
func Server(cfg *config.Config) *cli.Command {
return &cli.Command{
Name: "server",
Usage: fmt.Sprintf("start the %s service without runtime (unsupervised mode)", cfg.Service.Name),
Category: "server",
Before: func(c *cli.Context) error {
return configlog.ReturnFatal(parser.ParseConfig(cfg))
},
Action: func(c *cli.Context) error {
logger := logging.Configure(cfg.Service.Name, cfg.Log)
err := ogrpc.Configure(ogrpc.GetClientOptions(cfg.GRPCClientTLS)...)
if err != nil {
return err
}
var (
gr = run.Group{}
ctx, cancel = func() (context.Context, context.CancelFunc) {
if cfg.Context == nil {
return context.WithCancel(context.Background())
}
return context.WithCancel(cfg.Context)
}()
metrics = metrics.New()
)
defer cancel()
metrics.BuildInfo.WithLabelValues(version.GetString()).Set(1)
consumer, err := stream.NatsFromConfig(stream.NatsConfig(cfg.Events))
if err != nil {
return err
}
st := store.Create(
store.Type(cfg.Store.Type),
store.Addresses(strings.Split(cfg.Store.Addresses, ",")...),
store.Database(cfg.Store.Database),
store.Table(cfg.Store.Table),
store.TTL(cfg.Store.RecordExpiry),
)
service := grpc.NewService(
grpc.Logger(logger),
grpc.Context(ctx),
grpc.Config(cfg),
grpc.Name(cfg.Service.Name),
grpc.Namespace(cfg.GRPC.Namespace),
grpc.Address(cfg.GRPC.Addr),
grpc.Metrics(metrics),
grpc.Consumer(consumer),
grpc.Store(st),
)
gr.Add(service.Run, func(err error) {
logger.Error().
Err(err).
Str("server", "grpc").
Msg("Shutting Down server")
cancel()
})
return gr.Run()
},
}
}

View File

@@ -0,0 +1,19 @@
package command
import (
"github.com/owncloud/ocis/v2/services/eventhistory/pkg/config"
"github.com/urfave/cli/v2"
)
// Version prints the service versions of all running instances.
func Version(cfg *config.Config) *cli.Command {
return &cli.Command{
Name: "version",
Usage: "print the version of this binary and the running service instances",
Category: "info",
Action: func(c *cli.Context) error {
// not implemented
return nil
},
}
}

View File

@@ -0,0 +1,52 @@
package config
import (
"context"
"time"
"github.com/owncloud/ocis/v2/ocis-pkg/shared"
)
// Config combines all available configuration parts.
type Config struct {
Commons *shared.Commons `yaml:"-"` // don't use this directly as configuration for a service
Service Service `yaml:"-"`
Log *Log `yaml:"log"`
Debug Debug `yaml:"debug"`
GRPC GRPCConfig `yaml:"grpc"`
GRPCClientTLS *shared.GRPCClientTLS `yaml:"grpc_client_tls"`
Events Events `yaml:"events"`
Store Store `yaml:"store"`
Context context.Context `yaml:"-"`
}
// GRPCConfig defines the available grpc configuration.
type GRPCConfig struct {
Addr string `ocisConfig:"addr" env:"EVENTHISTORY_GRPC_ADDR" desc:"The bind address of the GRPC service."`
Namespace string `ocisConfig:"-" yaml:"-"`
TLS *shared.GRPCServiceTLS `yaml:"tls"`
}
// Store configures the store to use
type Store struct {
Type string `yaml:"type" env:"EVENTHISTORY_STORE_TYPE" desc:"The type of the eventhistory store. Supported values are: 'mem', 'ocmem', 'etcd', 'redis', 'nats-js', 'noop'. See the text description for details."`
Addresses string `yaml:"addresses" env:"EVENTHISTORY_STORE_ADDRESSES" desc:"A comma separated list of addresses to access the configured store. This has no effect when 'in-memory' stores are configured. Note that the behaviour how addresses are used is dependent on the library of the configured store."`
Database string `yaml:"database" env:"EVENTHISTORY_STORE_DATABASE" desc:"(optional) The database name the configured store should use. This has no effect when 'in-memory' stores are configured."`
Table string `yaml:"table" env:"EVENTHISTORY_STORE_TABLE" desc:"(optional) The database table the store should use. This has no effect when 'in-memory' stores are configured."`
RecordExpiry time.Duration `yaml:"record_expiry" env:"EVENTHISTORY_RECORD_EXPIRY" desc:"Time to life for events in the store. The duration can be set as number followed by a unit identifier like s, m or h. Defaults to '336h' (2 weeks)."`
Size int `yaml:"size" env:"EVENTHISTORY_STORE_SIZE" desc:"The maximum quantity of items in the store. Only applies when store type 'ocmem' is configured. Defaults to 512."`
}
// Events combines the configuration options for the event bus.
type Events struct {
Endpoint string `yaml:"endpoint" env:"EVENTHISTORY_EVENTS_ENDPOINT" desc:"The address of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture."`
Cluster string `yaml:"cluster" env:"EVENTHISTORY_EVENTS_CLUSTER" desc:"The clusterID of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture. Mandatory when using NATS as event system."`
TLSInsecure bool `yaml:"tls_insecure" env:"OCIS_INSECURE;EVENTHISTORY_EVENTS_TLS_INSECURE" desc:"Whether to verify the server TLS certificates."`
TLSRootCACertificate string `yaml:"tls_root_ca_certificate" env:"EVENTHISTORY_EVENTS_TLS_ROOT_CA_CERTIFICATE" desc:"The root CA certificate used to validate the server's TLS certificate. If provided NOTIFICATIONS_EVENTS_TLS_INSECURE will be seen as false."`
EnableTLS bool `yaml:"enable_tls" env:"OCIS_EVENTS_ENABLE_TLS;EVENTHISTORY_EVENTS_ENABLE_TLS" desc:"Enable TLS for the connection to the events broker. The events broker is the ocis service which receives and delivers events between the services.."`
}

View File

@@ -0,0 +1,9 @@
package config
// Debug defines the available debug configuration.
type Debug struct {
Addr string `yaml:"addr" env:"EVENTHISTORY_DEBUG_ADDR" desc:"Bind address of the debug server, where metrics, health, config and debug endpoints will be exposed."`
Token string `yaml:"token" env:"EVENTHISTORY_DEBUG_TOKEN" desc:"Token to secure the metrics endpoint."`
Pprof bool `yaml:"pprof" env:"EVENTHISTORY_DEBUG_PPROF" desc:"Enables pprof, which can be used for profiling."`
Zpages bool `yaml:"zpages" env:"EVENTHISTORY_DEBUG_ZPAGES" desc:"Enables zpages, which can be used for collecting and viewing in-memory traces."`
}

View File

@@ -0,0 +1,74 @@
package defaults
import (
"time"
"github.com/owncloud/ocis/v2/ocis-pkg/shared"
"github.com/owncloud/ocis/v2/services/eventhistory/pkg/config"
)
// FullDefaultConfig returns the full default config
func FullDefaultConfig() *config.Config {
cfg := DefaultConfig()
EnsureDefaults(cfg)
Sanitize(cfg)
return cfg
}
// DefaultConfig return the default configuration
func DefaultConfig() *config.Config {
return &config.Config{
Service: config.Service{
Name: "eventhistory",
},
Events: config.Events{
Endpoint: "127.0.0.1:9233",
Cluster: "ocis-cluster",
EnableTLS: false,
},
Store: config.Store{
Type: "mem",
RecordExpiry: 336 * time.Hour,
},
GRPC: config.GRPCConfig{
Addr: "127.0.0.1:0",
Namespace: "com.owncloud.api",
},
}
}
// EnsureDefaults ensures the config contains default values
func EnsureDefaults(cfg *config.Config) {
// provide with defaults for shared logging, since we need a valid destination address for "envdecode".
if cfg.Log == nil && cfg.Commons != nil && cfg.Commons.Log != nil {
cfg.Log = &config.Log{
Level: cfg.Commons.Log.Level,
Pretty: cfg.Commons.Log.Pretty,
Color: cfg.Commons.Log.Color,
File: cfg.Commons.Log.File,
}
} else if cfg.Log == nil {
cfg.Log = &config.Log{}
}
if cfg.GRPCClientTLS == nil {
cfg.GRPCClientTLS = &shared.GRPCClientTLS{}
if cfg.Commons != nil && cfg.Commons.GRPCClientTLS != nil {
cfg.GRPCClientTLS = cfg.Commons.GRPCClientTLS
}
}
if cfg.GRPC.TLS == nil {
cfg.GRPC.TLS = &shared.GRPCServiceTLS{}
if cfg.Commons != nil && cfg.Commons.GRPCServiceTLS != nil {
cfg.GRPC.TLS.Enabled = cfg.Commons.GRPCServiceTLS.Enabled
cfg.GRPC.TLS.Cert = cfg.Commons.GRPCServiceTLS.Cert
cfg.GRPC.TLS.Key = cfg.Commons.GRPCServiceTLS.Key
}
}
}
// Sanitize sanitizes the config
func Sanitize(cfg *config.Config) {
// nothing to sanitize here atm
}

View File

@@ -0,0 +1,9 @@
package config
// Log defines the available log configuration.
type Log struct {
Level string `mapstructure:"level" env:"OCIS_LOG_LEVEL;EVENTHISTORY_LOG_LEVEL" desc:"The log level. Valid values are: \"panic\", \"fatal\", \"error\", \"warn\", \"info\", \"debug\", \"trace\"."`
Pretty bool `mapstructure:"pretty" env:"OCIS_LOG_PRETTY;EVENTHISTORY_LOG_PRETTY" desc:"Activates pretty log output."`
Color bool `mapstructure:"color" env:"OCIS_LOG_COLOR;EVENTHISTORY_LOG_COLOR" desc:"Activates colorized log output."`
File string `mapstructure:"file" env:"OCIS_LOG_FILE;EVENTHISTORY_LOG_FILE" desc:"The path to the log file. Activates logging to this file if set."`
}

View File

@@ -0,0 +1,38 @@
package parser
import (
"errors"
ociscfg "github.com/owncloud/ocis/v2/ocis-pkg/config"
"github.com/owncloud/ocis/v2/services/eventhistory/pkg/config"
"github.com/owncloud/ocis/v2/services/eventhistory/pkg/config/defaults"
"github.com/owncloud/ocis/v2/ocis-pkg/config/envdecode"
)
// ParseConfig loads configuration from known paths.
func ParseConfig(cfg *config.Config) error {
_, err := ociscfg.BindSourcesToStructs(cfg.Service.Name, cfg)
if err != nil {
return err
}
defaults.EnsureDefaults(cfg)
// load all env variables relevant to the config in the current context.
if err := envdecode.Decode(cfg); err != nil {
// no environment variable set for this config is an expected "error"
if !errors.Is(err, envdecode.ErrNoTargetFieldsAreSet) {
return err
}
}
defaults.Sanitize(cfg)
return Validate(cfg)
}
// Validate validates the config
func Validate(cfg *config.Config) error {
return nil
}

View File

@@ -0,0 +1,6 @@
package config
// Service defines the available service configuration.
type Service struct {
Name string `yaml:"-"`
}

View File

@@ -0,0 +1,17 @@
package logging
import (
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/services/eventhistory/pkg/config"
)
// LoggerFromConfig initializes a service-specific logger instance.
func Configure(name string, cfg *config.Log) log.Logger {
return log.NewLogger(
log.Name(name),
log.Level(cfg.Level),
log.Pretty(cfg.Pretty),
log.Color(cfg.Color),
log.File(cfg.File),
)
}

View File

@@ -0,0 +1,35 @@
package metrics
import "github.com/prometheus/client_golang/prometheus"
var (
// Namespace defines the namespace for the defines metrics.
Namespace = "ocis"
// Subsystem defines the subsystem for the defines metrics.
Subsystem = "eventhistory"
)
// Metrics defines the available metrics of this service.
type Metrics struct {
BuildInfo *prometheus.GaugeVec
}
// New initializes the available metrics.
func New() *Metrics {
m := &Metrics{
BuildInfo: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "build_info",
Help: "Build information",
}, []string{"version"}),
}
_ = prometheus.Register(
m.BuildInfo,
)
// TODO: implement metrics
return m
}

View File

@@ -0,0 +1,110 @@
package grpc
import (
"context"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/services/eventhistory/pkg/config"
"github.com/owncloud/ocis/v2/services/eventhistory/pkg/metrics"
"github.com/urfave/cli/v2"
"go-micro.dev/v4/store"
)
// Option defines a single option function.
type Option func(o *Options)
// Options defines the available options for this package.
type Options struct {
Name string
Address string
Logger log.Logger
Context context.Context
Config *config.Config
Metrics *metrics.Metrics
Namespace string
Flags []cli.Flag
Store store.Store
Consumer events.Consumer
}
// newOptions initializes the available default options.
func newOptions(opts ...Option) Options {
opt := Options{}
for _, o := range opts {
o(&opt)
}
return opt
}
// Logger provides a function to set the logger option.
func Logger(val log.Logger) Option {
return func(o *Options) {
o.Logger = val
}
}
// Name provides a name for the service.
func Name(val string) Option {
return func(o *Options) {
o.Name = val
}
}
// Address provides an address for the service.
func Address(val string) Option {
return func(o *Options) {
o.Address = val
}
}
// Context provides a function to set the context option.
func Context(val context.Context) Option {
return func(o *Options) {
o.Context = val
}
}
// Config provides a function to set the config option.
func Config(val *config.Config) Option {
return func(o *Options) {
o.Config = val
}
}
// Metrics provides a function to set the metrics option.
func Metrics(val *metrics.Metrics) Option {
return func(o *Options) {
o.Metrics = val
}
}
// Namespace provides a function to set the namespace option.
func Namespace(val string) Option {
return func(o *Options) {
o.Namespace = val
}
}
// Flags provides a function to set the flags option.
func Flags(flags []cli.Flag) Option {
return func(o *Options) {
o.Flags = append(o.Flags, flags...)
}
}
// Store provides a function to configure the store
func Store(store store.Store) Option {
return func(o *Options) {
o.Store = store
}
}
// Consumer provides a function to configure the consumer
func Consumer(consumer events.Consumer) Option {
return func(o *Options) {
o.Consumer = consumer
}
}

View File

@@ -0,0 +1,46 @@
package grpc
import (
"github.com/owncloud/ocis/v2/ocis-pkg/service/grpc"
"github.com/owncloud/ocis/v2/ocis-pkg/version"
ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0"
svc "github.com/owncloud/ocis/v2/services/eventhistory/pkg/service"
)
// NewService initializes the grpc service and server.
func NewService(opts ...Option) grpc.Service {
options := newOptions(opts...)
service, err := grpc.NewService(
grpc.TLSEnabled(options.Config.GRPC.TLS.Enabled),
grpc.TLSCert(
options.Config.GRPC.TLS.Cert,
options.Config.GRPC.TLS.Key,
),
grpc.Logger(options.Logger),
grpc.Namespace(options.Namespace),
grpc.Name(options.Name),
grpc.Version(version.GetString()),
grpc.Address(options.Address),
grpc.Context(options.Context),
grpc.Flags(options.Flags...),
grpc.Version(version.GetString()),
)
if err != nil {
options.Logger.Fatal().Err(err).Msg("Error creating event history service")
return grpc.Service{}
}
eh, err := svc.NewEventHistoryService(options.Config, options.Consumer, options.Store, options.Logger)
if err != nil {
options.Logger.Fatal().Err(err).Msg("Error creating event history service")
return grpc.Service{}
}
_ = ehsvc.RegisterEventHistoryServiceHandler(
service.Server(),
eh,
)
return service
}

View File

@@ -0,0 +1,77 @@
package service
import (
"context"
"fmt"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
ehmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/eventhistory/v0"
ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0"
"github.com/owncloud/ocis/v2/services/eventhistory/pkg/config"
"go-micro.dev/v4/store"
)
// EventHistoryService is the service responsible for event history
type EventHistoryService struct {
ch <-chan events.Event
store store.Store
cfg *config.Config
log log.Logger
}
// NewEventHistoryService returns an EventHistory service
func NewEventHistoryService(cfg *config.Config, consumer events.Consumer, store store.Store, log log.Logger) (*EventHistoryService, error) {
if consumer == nil || store == nil {
return nil, fmt.Errorf("Need non nil consumer (%v) and store (%v) to work properly", consumer, store)
}
ch, err := events.ConsumeAll(consumer, "evhistory")
if err != nil {
return nil, err
}
eh := &EventHistoryService{ch: ch, store: store, cfg: cfg, log: log}
go eh.StoreEvents()
return eh, nil
}
// StoreEvents consumes all events and stores them in the store. Will block
func (eh *EventHistoryService) StoreEvents() {
for event := range eh.ch {
if err := eh.store.Write(&store.Record{
Key: event.ID,
Value: event.Event.([]byte),
Expiry: eh.cfg.Store.RecordExpiry,
Metadata: map[string]interface{}{
"type": event.Type,
},
}); err != nil {
// we can't store. That's it for us.
eh.log.Error().Err(err).Str("eventid", event.ID).Msg("could not store event")
return
}
}
}
// GetEvents allows to retrieve events from the eventstore by id
func (eh *EventHistoryService) GetEvents(ctx context.Context, req *ehsvc.GetEventsRequest, resp *ehsvc.GetEventsResponse) error {
for _, id := range req.Ids {
evs, err := eh.store.Read(id)
if err != nil {
if err != store.ErrNotFound {
eh.log.Error().Err(err).Str("eventid", id).Msg("could not read event")
}
continue
}
resp.Events = append(resp.Events, &ehmsg.Event{
Id: id,
Event: evs[0].Value,
Type: evs[0].Metadata["type"].(string),
})
}
return nil
}

View File

@@ -0,0 +1,13 @@
package service_test
import (
"testing"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
func TestSearch(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Service Suite")
}

View File

@@ -0,0 +1,89 @@
package service_test
import (
"context"
"encoding/json"
"reflect"
"time"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/google/uuid"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/ocis-pkg/store"
ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0"
"github.com/owncloud/ocis/v2/services/eventhistory/pkg/config"
"github.com/owncloud/ocis/v2/services/eventhistory/pkg/service"
microevents "go-micro.dev/v4/events"
microstore "go-micro.dev/v4/store"
)
var _ = Describe("EventHistoryService", func() {
var (
cfg = &config.Config{}
eh *service.EventHistoryService
bus testBus
sto microstore.Store
)
BeforeEach(func() {
var err error
sto = store.Create()
bus = testBus(make(chan events.Event))
eh, err = service.NewEventHistoryService(cfg, bus, sto, log.Logger{})
Expect(err).ToNot(HaveOccurred())
})
AfterEach(func() {
close(bus)
})
It("Records events, stores them and allows to retrieve them", func() {
id := bus.Publish(events.UploadReady{})
// service will store eventually
time.Sleep(500 * time.Millisecond)
resp := &ehsvc.GetEventsResponse{}
err := eh.GetEvents(context.Background(), &ehsvc.GetEventsRequest{Ids: []string{id}}, resp)
Expect(err).ToNot(HaveOccurred())
Expect(resp).ToNot(BeNil())
Expect(len(resp.Events)).To(Equal(1))
Expect(resp.Events[0].Id).To(Equal(id))
})
})
type testBus chan events.Event
func (tb testBus) Consume(_ string, _ ...microevents.ConsumeOption) (<-chan microevents.Event, error) {
ch := make(chan microevents.Event)
go func() {
for ev := range tb {
b, _ := json.Marshal(ev.Event)
ch <- microevents.Event{
Payload: b,
Metadata: map[string]string{
events.MetadatakeyEventID: ev.ID,
events.MetadatakeyEventType: ev.Type,
},
}
}
}()
return ch, nil
}
func (tb testBus) Publish(e interface{}) string {
ev := events.Event{
ID: uuid.New().String(),
Type: reflect.TypeOf(e).String(),
Event: e,
}
tb <- ev
return ev.ID
}

View File

@@ -7,6 +7,7 @@ import (
"net/http"
"os"
"strconv"
"strings"
"time"
"github.com/go-chi/chi/v5"
@@ -156,10 +157,10 @@ func NewService(opts ...Option) (Graph, error) {
roleManager := options.RoleManager
if roleManager == nil {
storeOptions := store.OcisStoreOptions{
Type: options.Config.CacheStore.Type,
Address: options.Config.CacheStore.Address,
Size: options.Config.CacheStore.Size,
storeOptions := []store.Option{
store.Type(options.Config.CacheStore.Type),
store.Addresses(strings.Split(options.Config.CacheStore.Address, ",")...),
store.Size(options.Config.CacheStore.Size),
}
m := roles.NewManager(
roles.StoreOptions(storeOptions),