Merge pull request #7217 from kobergj/ClientlogService

Clientlog Service
This commit is contained in:
kobergj
2023-09-07 15:26:06 +02:00
committed by GitHub
46 changed files with 1289 additions and 347 deletions
+38
View File
@@ -0,0 +1,38 @@
SHELL := bash
NAME := clientlog
include ../../.make/recursion.mk
############ tooling ############
ifneq (, $(shell command -v go 2> /dev/null)) # suppress `command not found warnings` for non go targets in CI
include ../../.bingo/Variables.mk
endif
############ go tooling ############
include ../../.make/go.mk
############ release ############
include ../../.make/release.mk
############ docs generate ############
include ../../.make/docs.mk
.PHONY: docs-generate
docs-generate: config-docs-generate
############ generate ############
include ../../.make/generate.mk
.PHONY: ci-go-generate
ci-go-generate: $(MOCKERY) # CI runs ci-node-generate automatically before this target
$(MOCKERY) --dir ../../protogen/gen/ocis/services/eventhistory/v0 --case underscore --name EventHistoryService
.PHONY: ci-node-generate
ci-node-generate:
############ licenses ############
.PHONY: ci-node-check-licenses
ci-node-check-licenses:
.PHONY: ci-node-save-licenses
ci-node-save-licenses:
+14
View File
@@ -0,0 +1,14 @@
# Clientlog Service
The `clientlog` service is responsible for composing machine readable notifications for clients. Clients are apps and web interfaces.
## The Log Service Ecosystem
Log services like the `userlog`, `clientlog` and `sse` are responsible for composing notifications for a certain audience.
- The `userlog` service translates and adjusts messages to be human readable.
- The `clientlog` service composes machine readable messages, so clients can act without the need to query the server.
- The `sse` service is only responsible for sending these messages. It does not care about their form or language.
## Clientlog Events
The messages the `clientlog` service sends are intended for the use by clients, not by users. The client might for example be informed that a file has finished post-processing. With that, the client can make the file available to the user without additional server queries.
+14
View File
@@ -0,0 +1,14 @@
package main
import (
"os"
"github.com/owncloud/ocis/v2/services/clientlog/pkg/command"
"github.com/owncloud/ocis/v2/services/clientlog/pkg/config/defaults"
)
func main() {
if err := command.Execute(defaults.DefaultConfig()); err != nil {
os.Exit(1)
}
}
+18
View File
@@ -0,0 +1,18 @@
package command
import (
"github.com/owncloud/ocis/v2/services/clientlog/pkg/config"
"github.com/urfave/cli/v2"
)
// Health is the entrypoint for the health command.
func Health(cfg *config.Config) *cli.Command {
return &cli.Command{
Name: "health",
Usage: "Check health status",
Action: func(c *cli.Context) error {
// Not implemented
return nil
},
}
}
+34
View File
@@ -0,0 +1,34 @@
package command
import (
"os"
"github.com/owncloud/ocis/v2/ocis-pkg/clihelper"
"github.com/owncloud/ocis/v2/services/clientlog/pkg/config"
"github.com/urfave/cli/v2"
)
// GetCommands provides all commands for this service
func GetCommands(cfg *config.Config) cli.Commands {
return []*cli.Command{
// start this service
Server(cfg),
// interaction with this service
// infos about this service
Health(cfg),
Version(cfg),
}
}
// Execute is the entry point for the clientlog command.
func Execute(cfg *config.Config) error {
app := clihelper.DefaultApp(&cli.App{
Name: "clientlog",
Usage: "starts clientlog service",
Commands: GetCommands(cfg),
})
return app.Run(os.Args)
}
+128
View File
@@ -0,0 +1,128 @@
package command
import (
"context"
"fmt"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/events/stream"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/oklog/run"
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
"github.com/owncloud/ocis/v2/ocis-pkg/handlers"
"github.com/owncloud/ocis/v2/ocis-pkg/registry"
"github.com/owncloud/ocis/v2/ocis-pkg/service/debug"
"github.com/owncloud/ocis/v2/ocis-pkg/tracing"
"github.com/owncloud/ocis/v2/ocis-pkg/version"
"github.com/owncloud/ocis/v2/services/clientlog/pkg/config"
"github.com/owncloud/ocis/v2/services/clientlog/pkg/config/parser"
"github.com/owncloud/ocis/v2/services/clientlog/pkg/logging"
"github.com/owncloud/ocis/v2/services/clientlog/pkg/metrics"
"github.com/owncloud/ocis/v2/services/clientlog/pkg/service"
"github.com/urfave/cli/v2"
)
// all events we care about
var _registeredEvents = []events.Unmarshaller{
events.UploadReady{},
}
// Server is the entrypoint for the server command.
func Server(cfg *config.Config) *cli.Command {
return &cli.Command{
Name: "server",
Usage: fmt.Sprintf("start the %s service without runtime (unsupervised mode)", cfg.Service.Name),
Category: "server",
Before: func(c *cli.Context) error {
return configlog.ReturnFatal(parser.ParseConfig(cfg))
},
Action: func(c *cli.Context) error {
logger := logging.Configure(cfg.Service.Name, cfg.Log)
tracerProvider, err := tracing.GetServiceTraceProvider(cfg.Tracing, cfg.Service.Name)
if err != nil {
return err
}
gr := run.Group{}
ctx, cancel := func() (context.Context, context.CancelFunc) {
if cfg.Context == nil {
return context.WithCancel(context.Background())
}
return context.WithCancel(cfg.Context)
}()
mtrcs := metrics.New()
mtrcs.BuildInfo.WithLabelValues(version.GetString()).Set(1)
defer cancel()
stream, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events))
if err != nil {
return err
}
tm, err := pool.StringToTLSMode(cfg.GRPCClientTLS.Mode)
if err != nil {
return err
}
gatewaySelector, err := pool.GatewaySelector(
cfg.RevaGateway,
pool.WithTLSCACert(cfg.GRPCClientTLS.CACert),
pool.WithTLSMode(tm),
pool.WithRegistry(registry.GetRegistry()),
pool.WithTracerProvider(tracerProvider),
)
if err != nil {
return fmt.Errorf("could not get reva client selector: %s", err)
}
{
svc, err := service.NewClientlogService(
service.Logger(logger),
service.Config(cfg),
service.Stream(stream),
service.GatewaySelector(gatewaySelector),
service.RegisteredEvents(_registeredEvents),
service.TraceProvider(tracerProvider),
)
if err != nil {
logger.Info().Err(err).Str("transport", "http").Msg("Failed to initialize server")
return err
}
gr.Add(func() error {
return svc.Run()
}, func(err error) {
logger.Error().
Str("transport", "http").
Err(err).
Msg("Shutting down server")
cancel()
})
}
{
server := debug.NewService(
debug.Logger(logger),
debug.Name(cfg.Service.Name),
debug.Version(version.GetString()),
debug.Address(cfg.Debug.Addr),
debug.Token(cfg.Debug.Token),
debug.Pprof(cfg.Debug.Pprof),
debug.Zpages(cfg.Debug.Zpages),
debug.Health(handlers.Health),
debug.Ready(handlers.Ready),
)
gr.Add(server.ListenAndServe, func(_ error) {
_ = server.Shutdown(ctx)
cancel()
})
}
return gr.Run()
},
}
}
+19
View File
@@ -0,0 +1,19 @@
package command
import (
"github.com/owncloud/ocis/v2/services/clientlog/pkg/config"
"github.com/urfave/cli/v2"
)
// Version prints the service versions of all running instances.
func Version(cfg *config.Config) *cli.Command {
return &cli.Command{
Name: "version",
Usage: "print the version of this binary and the running service instances",
Category: "info",
Action: func(c *cli.Context) error {
// not implemented
return nil
},
}
}
+49
View File
@@ -0,0 +1,49 @@
package config
import (
"context"
"github.com/owncloud/ocis/v2/ocis-pkg/shared"
)
// Config combines all available configuration parts.
type Config struct {
Commons *shared.Commons `yaml:"-"` // don't use this directly as configuration for a service
Service Service `yaml:"-"`
Tracing *Tracing `yaml:"tracing"`
Log *Log `yaml:"log"`
Debug Debug `yaml:"debug"`
GRPCClientTLS *shared.GRPCClientTLS `yaml:"grpc_client_tls"`
TokenManager *TokenManager `yaml:"token_manager"`
RevaGateway string `yaml:"reva_gateway" env:"OCIS_REVA_GATEWAY" desc:"CS3 gateway used to look up user metadata"`
Events Events `yaml:"events"`
ServiceAccount ServiceAccount `yaml:"service_account"`
Context context.Context `yaml:"-"`
}
// Events combines the configuration options for the event bus.
type Events struct {
Endpoint string `yaml:"endpoint" env:"OCIS_EVENTS_ENDPOINT;CLIENTLOG_EVENTS_ENDPOINT" desc:"The address of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture."`
Cluster string `yaml:"cluster" env:"OCIS_EVENTS_CLUSTER;CLIENTLOG_EVENTS_CLUSTER" desc:"The clusterID of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture. Mandatory when using NATS as event system."`
TLSInsecure bool `yaml:"tls_insecure" env:"OCIS_INSECURE;CLIENTLOG_EVENTS_TLS_INSECURE" desc:"Whether to verify the server TLS certificates."`
TLSRootCACertificate string `yaml:"tls_root_ca_certificate" env:"OCIS_EVENTS_TLS_ROOT_CA_CERTIFICATE;CLIENTLOG_EVENTS_TLS_ROOT_CA_CERTIFICATE" desc:"The root CA certificate used to validate the server's TLS certificate. If provided NOTIFICATIONS_EVENTS_TLS_INSECURE will be seen as false."`
EnableTLS bool `yaml:"enable_tls" env:"OCIS_EVENTS_ENABLE_TLS;CLIENTLOG_EVENTS_ENABLE_TLS" desc:"Enable TLS for the connection to the events broker. The events broker is the ocis service which receives and delivers events between the services.."`
}
// TokenManager is the config for using the reva token manager
type TokenManager struct {
JWTSecret string `yaml:"jwt_secret" env:"OCIS_JWT_SECRET;CLIENTLOG_JWT_SECRET" desc:"The secret to mint and validate jwt tokens."`
}
// ServiceAccount is the configuration for the used service account
type ServiceAccount struct {
ServiceAccountID string `yaml:"service_account_id" env:"OCIS_SERVICE_ACCOUNT_ID;CLIENTLOG_SERVICE_ACCOUNT_ID" desc:"The ID of the service account the service should use. See the 'auth-service' service description for more details."`
ServiceAccountSecret string `yaml:"service_account_secret" env:"OCIS_SERVICE_ACCOUNT_SECRET;CLIENTLOG_SERVICE_ACCOUNT_SECRET" desc:"The service account secret."`
}
+9
View File
@@ -0,0 +1,9 @@
package config
// Debug defines the available debug configuration.
type Debug struct {
Addr string `yaml:"addr" env:"USERLOG_DEBUG_ADDR" desc:"Bind address of the debug server, where metrics, health, config and debug endpoints will be exposed."`
Token string `yaml:"token" env:"USERLOG_DEBUG_TOKEN" desc:"Token to secure the metrics endpoint."`
Pprof bool `yaml:"pprof" env:"USERLOG_DEBUG_PPROF" desc:"Enables pprof, which can be used for profiling."`
Zpages bool `yaml:"zpages" env:"USERLOG_DEBUG_ZPAGES" desc:"Enables zpages, which can be used for collecting and viewing in-memory traces."`
}
@@ -0,0 +1,80 @@
package defaults
import (
"github.com/owncloud/ocis/v2/ocis-pkg/shared"
"github.com/owncloud/ocis/v2/ocis-pkg/structs"
"github.com/owncloud/ocis/v2/services/clientlog/pkg/config"
)
// FullDefaultConfig returns the full default config
func FullDefaultConfig() *config.Config {
cfg := DefaultConfig()
EnsureDefaults(cfg)
Sanitize(cfg)
return cfg
}
// DefaultConfig return the default configuration
func DefaultConfig() *config.Config {
return &config.Config{
Debug: config.Debug{
Addr: "127.0.0.1:9260",
Token: "",
Pprof: false,
Zpages: false,
},
Service: config.Service{
Name: "clientlog",
},
Events: config.Events{
Endpoint: "127.0.0.1:9233",
Cluster: "ocis-cluster",
EnableTLS: false,
},
RevaGateway: shared.DefaultRevaConfig().Address,
}
}
// EnsureDefaults ensures the config contains default values
func EnsureDefaults(cfg *config.Config) {
// provide with defaults for shared logging, since we need a valid destination address for "envdecode".
if cfg.Log == nil && cfg.Commons != nil && cfg.Commons.Log != nil {
cfg.Log = &config.Log{
Level: cfg.Commons.Log.Level,
Pretty: cfg.Commons.Log.Pretty,
Color: cfg.Commons.Log.Color,
File: cfg.Commons.Log.File,
}
} else if cfg.Log == nil {
cfg.Log = &config.Log{}
}
if cfg.GRPCClientTLS == nil && cfg.Commons != nil {
cfg.GRPCClientTLS = structs.CopyOrZeroValue(cfg.Commons.GRPCClientTLS)
}
if cfg.TokenManager == nil && cfg.Commons != nil && cfg.Commons.TokenManager != nil {
cfg.TokenManager = &config.TokenManager{
JWTSecret: cfg.Commons.TokenManager.JWTSecret,
}
} else if cfg.TokenManager == nil {
cfg.TokenManager = &config.TokenManager{}
}
// provide with defaults for shared tracing, since we need a valid destination address for "envdecode".
if cfg.Tracing == nil && cfg.Commons != nil && cfg.Commons.Tracing != nil {
cfg.Tracing = &config.Tracing{
Enabled: cfg.Commons.Tracing.Enabled,
Type: cfg.Commons.Tracing.Type,
Endpoint: cfg.Commons.Tracing.Endpoint,
Collector: cfg.Commons.Tracing.Collector,
}
} else if cfg.Tracing == nil {
cfg.Tracing = &config.Tracing{}
}
}
// Sanitize sanitizes the config
func Sanitize(cfg *config.Config) {
// sanitize config
}
+9
View File
@@ -0,0 +1,9 @@
package config
// Log defines the available log configuration.
type Log struct {
Level string `mapstructure:"level" env:"OCIS_LOG_LEVEL;USERLOG_LOG_LEVEL" desc:"The log level. Valid values are: 'panic', 'fatal', 'error', 'warn', 'info', 'debug', 'trace'."`
Pretty bool `mapstructure:"pretty" env:"OCIS_LOG_PRETTY;USERLOG_LOG_PRETTY" desc:"Activates pretty log output."`
Color bool `mapstructure:"color" env:"OCIS_LOG_COLOR;USERLOG_LOG_COLOR" desc:"Activates colorized log output."`
File string `mapstructure:"file" env:"OCIS_LOG_FILE;USERLOG_LOG_FILE" desc:"The path to the log file. Activates logging to this file if set."`
}
@@ -0,0 +1,43 @@
package parser
import (
"errors"
ociscfg "github.com/owncloud/ocis/v2/ocis-pkg/config"
"github.com/owncloud/ocis/v2/ocis-pkg/shared"
"github.com/owncloud/ocis/v2/services/clientlog/pkg/config"
"github.com/owncloud/ocis/v2/services/clientlog/pkg/config/defaults"
"github.com/owncloud/ocis/v2/ocis-pkg/config/envdecode"
)
// ParseConfig loads configuration from known paths.
func ParseConfig(cfg *config.Config) error {
_, err := ociscfg.BindSourcesToStructs(cfg.Service.Name, cfg)
if err != nil {
return err
}
defaults.EnsureDefaults(cfg)
// load all env variables relevant to the config in the current context.
if err := envdecode.Decode(cfg); err != nil {
// no environment variable set for this config is an expected "error"
if !errors.Is(err, envdecode.ErrNoTargetFieldsAreSet) {
return err
}
}
defaults.Sanitize(cfg)
return Validate(cfg)
}
// Validate validates the config
func Validate(cfg *config.Config) error {
if cfg.TokenManager.JWTSecret == "" {
return shared.MissingJWTTokenError(cfg.Service.Name)
}
return nil
}
+6
View File
@@ -0,0 +1,6 @@
package config
// Service defines the available service configuration.
type Service struct {
Name string `yaml:"-"`
}
+21
View File
@@ -0,0 +1,21 @@
package config
import "github.com/owncloud/ocis/v2/ocis-pkg/tracing"
// Tracing defines the available tracing configuration.
type Tracing struct {
Enabled bool `yaml:"enabled" env:"OCIS_TRACING_ENABLED;CLIENTLOG_TRACING_ENABLED" desc:"Activates tracing."`
Type string `yaml:"type" env:"OCIS_TRACING_TYPE;CLIENTLOG_TRACING_TYPE" desc:"The type of tracing. Defaults to '', which is the same as 'jaeger'. Allowed tracing types are 'jaeger' and '' as of now."`
Endpoint string `yaml:"endpoint" env:"OCIS_TRACING_ENDPOINT;CLIENTLOG_TRACING_ENDPOINT" desc:"The endpoint of the tracing agent."`
Collector string `yaml:"collector" env:"OCIS_TRACING_COLLECTOR;CLIENTLOG_TRACING_COLLECTOR" desc:"The HTTP endpoint for sending spans directly to a collector, i.e. http://jaeger-collector:14268/api/traces. Only used if the tracing endpoint is unset."`
}
// Convert Tracing to the tracing package's Config struct.
func (t Tracing) Convert() tracing.Config {
return tracing.Config{
Enabled: t.Enabled,
Type: t.Type,
Endpoint: t.Endpoint,
Collector: t.Collector,
}
}
+17
View File
@@ -0,0 +1,17 @@
package logging
import (
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/services/clientlog/pkg/config"
)
// LoggerFromConfig initializes a service-specific logger instance.
func Configure(name string, cfg *config.Log) log.Logger {
return log.NewLogger(
log.Name(name),
log.Level(cfg.Level),
log.Pretty(cfg.Pretty),
log.Color(cfg.Color),
log.File(cfg.File),
)
}
+35
View File
@@ -0,0 +1,35 @@
package metrics
import "github.com/prometheus/client_golang/prometheus"
var (
// Namespace defines the namespace for the defines metrics.
Namespace = "ocis"
// Subsystem defines the subsystem for the defines metrics.
Subsystem = "clientlog"
)
// Metrics defines the available metrics of this service.
type Metrics struct {
BuildInfo *prometheus.GaugeVec
}
// New initializes the available metrics.
func New() *Metrics {
m := &Metrics{
BuildInfo: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "build_info",
Help: "Build information",
}, []string{"version"}),
}
_ = prometheus.Register(
m.BuildInfo,
)
// TODO: implement metrics
return m
}
+65
View File
@@ -0,0 +1,65 @@
package service
import (
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/services/clientlog/pkg/config"
"go.opentelemetry.io/otel/trace"
)
// Option for the clientlog service
type Option func(*Options)
// Options for the clientlog service
type Options struct {
Logger log.Logger
Stream events.Stream
Config *config.Config
GatewaySelector pool.Selectable[gateway.GatewayAPIClient]
RegisteredEvents []events.Unmarshaller
TraceProvider trace.TracerProvider
}
// Logger configures a logger for the clientlog service
func Logger(log log.Logger) Option {
return func(o *Options) {
o.Logger = log
}
}
// Stream configures an event stream for the clientlog service
func Stream(s events.Stream) Option {
return func(o *Options) {
o.Stream = s
}
}
// Config adds the config for the clientlog service
func Config(c *config.Config) Option {
return func(o *Options) {
o.Config = c
}
}
// GatewaySelector adds a grpc client selector for the gateway service
func GatewaySelector(gatewaySelector pool.Selectable[gateway.GatewayAPIClient]) Option {
return func(o *Options) {
o.GatewaySelector = gatewaySelector
}
}
// RegisteredEvents registers the events the service should listen to
func RegisteredEvents(e []events.Unmarshaller) Option {
return func(o *Options) {
o.RegisteredEvents = e
}
}
// TraceProvider adds a tracer provider for the clientlog service
func TraceProvider(tp trace.TracerProvider) Option {
return func(o *Options) {
o.TraceProvider = tp
}
}
+140
View File
@@ -0,0 +1,140 @@
package service
import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/services/clientlog/pkg/config"
"go.opentelemetry.io/otel/trace"
)
// ClientNotification is the event the clientlog service is sending to the client
type ClientNotification struct {
Type string
ItemID string
}
// ClientlogService is the service responsible for user activities
type ClientlogService struct {
log log.Logger
cfg *config.Config
gatewaySelector pool.Selectable[gateway.GatewayAPIClient]
registeredEvents map[string]events.Unmarshaller // ?
tp trace.TracerProvider
tracer trace.Tracer
publisher events.Publisher
ch <-chan events.Event
}
// NewClientlogService returns a clientlog service
func NewClientlogService(opts ...Option) (*ClientlogService, error) {
o := &Options{}
for _, opt := range opts {
opt(o)
}
if o.Stream == nil {
return nil, fmt.Errorf("need non nil stream (%v) to work properly", o.Stream)
}
ch, err := events.Consume(o.Stream, "clientlog", o.RegisteredEvents...)
if err != nil {
return nil, err
}
cl := &ClientlogService{
log: o.Logger,
cfg: o.Config,
gatewaySelector: o.GatewaySelector,
registeredEvents: make(map[string]events.Unmarshaller),
tp: o.TraceProvider,
tracer: o.TraceProvider.Tracer("github.com/owncloud/ocis/services/clientlog/pkg/service"),
publisher: o.Stream,
ch: ch,
}
for _, e := range o.RegisteredEvents {
typ := reflect.TypeOf(e)
cl.registeredEvents[typ.String()] = e
}
return cl, nil
}
// Run runs the service
func (cl *ClientlogService) Run() error {
for event := range cl.ch {
cl.processEvent(event)
}
return nil
}
func (cl *ClientlogService) processEvent(event events.Event) {
gwc, err := cl.gatewaySelector.Next()
if err != nil {
cl.log.Error().Err(err).Interface("event", event).Msg("error getting gatway client")
return
}
ctx, err := utils.GetServiceUserContext(cl.cfg.ServiceAccount.ServiceAccountID, gwc, cl.cfg.ServiceAccount.ServiceAccountSecret)
if err != nil {
cl.log.Error().Err(err).Interface("event", event).Msg("error authenticating service user")
return
}
var (
users []string
noti ClientNotification
)
switch e := event.Event.(type) {
default:
err = errors.New("unhandled event")
case events.UploadReady:
info, err := utils.GetResource(ctx, e.FileRef, gwc)
if err != nil {
cl.log.Error().Err(err).Interface("event", event).Msg("error getting resource")
return
}
noti.Type = "postprocessing-finished"
noti.ItemID = storagespace.FormatResourceID(*info.GetId())
users, err = utils.GetSpaceMembers(ctx, info.GetSpace().GetId().GetOpaqueId(), gwc, utils.ViewerRole)
}
if err != nil {
cl.log.Info().Err(err).Interface("event", event).Msg("error gathering members for event")
return
}
// II) instruct sse service to send the information
for _, id := range users {
if err := cl.sendSSE(id, noti); err != nil {
cl.log.Error().Err(err).Str("userID", id).Str("eventid", event.ID).Msg("failed to store event for user")
return
}
}
}
func (cl *ClientlogService) sendSSE(userid string, noti ClientNotification) error {
b, err := json.Marshal(noti)
if err != nil {
return err
}
return events.Publish(context.Background(), cl.publisher, events.SendSSE{
UserID: userid,
Type: "clientlog-notification",
Message: b,
})
}
+7
View File
@@ -2,6 +2,13 @@
The `sse` service is responsible for sending sse (Server-Sent Events) to a user. See [What is Server-Sent Events](https://medium.com/yemeksepeti-teknoloji/what-is-server-sent-events-sse-and-how-to-implement-it-904938bffd73) for a simple introduction and examples of server sent events.
## The Log Service Ecosystem
Log services like the `userlog`, `clientlog` and `sse` are responsible for composing notifications for a certain audience.
- The `userlog` service translates and adjusts messages to be human readable.
- The `clientlog` service composes machine readable messages, so clients can act without the need to query the server.
- The `sse` service is only responsible for sending these messages. It does not care about their form or language.
## Subscribing
Clients can subscribe to the `/sse` endpoint to be informed by the server when an event happens. The `sse` endpoint will respect language changes of the user without needing to reconnect. Note that SSE has a limitation of six open connections per browser which can be reached if one has opened various tabs of the Web UI pointing to the same Infinite Scale instance.
+7
View File
@@ -2,6 +2,13 @@
The `userlog` service is a mediator between the `eventhistory` service and clients who want to be informed about user related events. It provides an API to retrieve those.
## The Log Service Ecosystem
Log services like the `userlog`, `clientlog` and `sse` are responsible for composing notifications for a certain audience.
- The `userlog` service translates and adjusts messages to be human readable.
- The `clientlog` service composes machine readable messages, so clients can act without the need to query the server.
- The `sse` service is only responsible for sending these messages. It does not care about their form or language.
## Prerequisites
Running the `userlog` service without running the `eventhistory` service is not possible.
+4 -5
View File
@@ -22,11 +22,10 @@ type Config struct {
TokenManager *TokenManager `yaml:"token_manager"`
MachineAuthAPIKey string `yaml:"machine_auth_api_key" env:"OCIS_MACHINE_AUTH_API_KEY;USERLOG_MACHINE_AUTH_API_KEY" desc:"Machine auth API key used to validate internal requests necessary to access resources from other services."`
RevaGateway string `yaml:"reva_gateway" env:"OCIS_REVA_GATEWAY" desc:"CS3 gateway used to look up user metadata"`
TranslationPath string `yaml:"translation_path" env:"OCIS_TRANSLATION_PATH;USERLOG_TRANSLATION_PATH" desc:"(optional) Set this to a path with custom translations to overwrite the builtin translations. Note that file and folder naming rules apply, see the documentation for more details."`
Events Events `yaml:"events"`
Persistence Persistence `yaml:"persistence"`
RevaGateway string `yaml:"reva_gateway" env:"OCIS_REVA_GATEWAY" desc:"CS3 gateway used to look up user metadata"`
TranslationPath string `yaml:"translation_path" env:"OCIS_TRANSLATION_PATH;USERLOG_TRANSLATION_PATH" desc:"(optional) Set this to a path with custom translations to overwrite the builtin translations. Note that file and folder naming rules apply, see the documentation for more details."`
Events Events `yaml:"events"`
Persistence Persistence `yaml:"persistence"`
DisableSSE bool `yaml:"disable_sse" env:"OCIS_DISABLE_SSE,USERLOG_DISABLE_SSE" desc:"Disables server-sent events (sse). When disabled, clients will no longer receive sse notifications."`
@@ -69,10 +69,6 @@ func EnsureDefaults(cfg *config.Config) {
cfg.Log = &config.Log{}
}
if cfg.MachineAuthAPIKey == "" && cfg.Commons != nil && cfg.Commons.MachineAuthAPIKey != "" {
cfg.MachineAuthAPIKey = cfg.Commons.MachineAuthAPIKey
}
if cfg.GRPCClientTLS == nil && cfg.Commons != nil {
cfg.GRPCClientTLS = structs.CopyOrZeroValue(cfg.Commons.GRPCClientTLS)
}
@@ -35,10 +35,6 @@ func ParseConfig(cfg *config.Config) error {
// Validate validates the config
func Validate(cfg *config.Config) error {
if cfg.MachineAuthAPIKey == "" {
return shared.MissingMachineAuthApiKeyError(cfg.Service.Name)
}
if cfg.TokenManager.JWTSecret == "" {
return shared.MissingJWTTokenError(cfg.Service.Name)
}
+22 -52
View File
@@ -16,7 +16,6 @@ import (
collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1"
storageprovider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/leonelquinteros/gotext"
@@ -52,32 +51,29 @@ type OC10Notification struct {
// Converter is responsible for converting eventhistory events to OC10Notifications
type Converter struct {
locale string
gatewaySelector pool.Selectable[gateway.GatewayAPIClient]
serviceAccountID string
serviceAccountSecret string
serviceName string
translationPath string
locale string
gwc gateway.GatewayAPIClient
serviceName string
translationPath string
serviceAccountContext context.Context
// cached within one request not to query other service too much
spaces map[string]*storageprovider.StorageSpace
users map[string]*user.User
resources map[string]*storageprovider.ResourceInfo
serviceAccountContext context.Context
spaces map[string]*storageprovider.StorageSpace
users map[string]*user.User
resources map[string]*storageprovider.ResourceInfo
}
// NewConverter returns a new Converter
func NewConverter(loc string, gatewaySelector pool.Selectable[gateway.GatewayAPIClient], machineAuthAPIKey string, name string, translationPath string, serviceAccountID string, serviceAccountSecret string) *Converter {
func NewConverter(ctx context.Context, loc string, gwc gateway.GatewayAPIClient, name string, translationPath string) *Converter {
return &Converter{
locale: loc,
gatewaySelector: gatewaySelector,
serviceAccountID: serviceAccountID,
serviceAccountSecret: serviceAccountSecret,
serviceName: name,
translationPath: translationPath,
spaces: make(map[string]*storageprovider.StorageSpace),
users: make(map[string]*user.User),
resources: make(map[string]*storageprovider.ResourceInfo),
locale: loc,
gwc: gwc,
serviceName: name,
translationPath: translationPath,
serviceAccountContext: ctx,
spaces: make(map[string]*storageprovider.StorageSpace),
users: make(map[string]*user.User),
resources: make(map[string]*storageprovider.ResourceInfo),
}
}
@@ -172,12 +168,7 @@ func (c *Converter) spaceMessage(eventid string, nt NotificationTemplate, execut
return OC10Notification{}, err
}
ctx, err := c.authenticate()
if err != nil {
return OC10Notification{}, err
}
space, err := c.getSpace(ctx, spaceid)
space, err := c.getSpace(c.serviceAccountContext, spaceid)
if err != nil {
return OC10Notification{}, err
}
@@ -211,12 +202,7 @@ func (c *Converter) shareMessage(eventid string, nt NotificationTemplate, execut
return OC10Notification{}, err
}
ctx, err := c.authenticate()
if err != nil {
return OC10Notification{}, err
}
info, err := c.getResource(ctx, resourceid)
info, err := c.getResource(c.serviceAccountContext, resourceid)
if err != nil {
return OC10Notification{}, err
}
@@ -328,27 +314,11 @@ func (c *Converter) deprovisionMessage(nt NotificationTemplate, deproDate string
}, nil
}
func (c *Converter) authenticate() (context.Context, error) {
if c.serviceAccountContext != nil {
return c.serviceAccountContext, nil
}
gatewayClient, err := c.gatewaySelector.Next()
if err != nil {
return nil, err
}
ctx, err := utils.GetServiceUserContext(c.serviceAccountID, gatewayClient, c.serviceAccountSecret)
if err == nil {
c.serviceAccountContext = ctx
}
return ctx, err
}
func (c *Converter) getSpace(ctx context.Context, spaceID string) (*storageprovider.StorageSpace, error) {
if space, ok := c.spaces[spaceID]; ok {
return space, nil
}
space, err := getSpace(ctx, spaceID, c.gatewaySelector)
space, err := utils.GetSpace(ctx, spaceID, c.gwc)
if err == nil {
c.spaces[spaceID] = space
}
@@ -359,7 +329,7 @@ func (c *Converter) getResource(ctx context.Context, resourceID *storageprovider
if r, ok := c.resources[resourceID.GetOpaqueId()]; ok {
return r, nil
}
resource, err := getResource(ctx, resourceID, c.gatewaySelector)
resource, err := utils.GetResourceByID(ctx, resourceID, c.gwc)
if err == nil {
c.resources[resourceID.GetOpaqueId()] = resource
}
@@ -370,7 +340,7 @@ func (c *Converter) getUser(ctx context.Context, userID *user.UserId) (*user.Use
if u, ok := c.users[userID.GetOpaqueId()]; ok {
return u, nil
}
usr, err := getUser(ctx, userID, c.gatewaySelector)
usr, err := utils.GetUser(userID, c.gwc)
if err == nil {
c.users[userID.GetOpaqueId()] = usr
}
+16 -1
View File
@@ -8,6 +8,7 @@ import (
"github.com/cs3org/reva/v2/pkg/appctx"
revactx "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/owncloud/ocis/v2/ocis-pkg/roles"
"github.com/owncloud/ocis/v2/services/graph/pkg/service/v0/errorcode"
settings "github.com/owncloud/ocis/v2/services/settings/pkg/service/v0"
@@ -44,7 +45,21 @@ func (ul *UserlogService) HandleGetEvents(w http.ResponseWriter, r *http.Request
Value: attribute.IntValue(len(evs)),
})
conv := ul.getConverter(r.Header.Get(HeaderAcceptLanguage))
gwc, err := ul.gatewaySelector.Next()
if err != nil {
ul.log.Error().Err(err).Msg("cant get gateway client")
w.WriteHeader(http.StatusInternalServerError)
return
}
ctx, err = utils.GetServiceUserContext(ul.cfg.ServiceAccount.ServiceAccountID, gwc, ul.cfg.ServiceAccount.ServiceAccountSecret)
if err != nil {
ul.log.Error().Err(err).Msg("cant get service account")
w.WriteHeader(http.StatusInternalServerError)
return
}
conv := NewConverter(ctx, r.Header.Get(HeaderAcceptLanguage), gwc, ul.cfg.Service.Name, ul.cfg.TranslationPath)
resp := GetEventResponseOC10{}
for _, e := range evs {
+28 -248
View File
@@ -9,10 +9,7 @@ import (
"time"
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
group "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1"
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
storageprovider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/v2/pkg/utils"
@@ -114,6 +111,18 @@ func (ul *UserlogService) processEvent(event events.Event) {
err error
)
gwc, err := ul.gatewaySelector.Next()
if err != nil {
ul.log.Error().Err(err).Msg("cannot get gateway client")
return
}
ctx, err := utils.GetServiceUserContext(ul.cfg.ServiceAccount.ServiceAccountID, gwc, ul.cfg.ServiceAccount.ServiceAccountSecret)
if err != nil {
ul.log.Error().Err(err).Msg("cannot get service account")
return
}
switch e := event.Event.(type) {
default:
err = errors.New("unhandled event")
@@ -140,7 +149,7 @@ func (ul *UserlogService) processEvent(event events.Event) {
// space related // TODO: how to find spaceadmins?
case events.SpaceDisabled:
executant = e.Executant
users, err = ul.findSpaceMembers(ul.mustAuthenticate(), e.ID.GetOpaqueId(), viewer)
users, err = utils.GetSpaceMembers(ctx, e.ID.GetOpaqueId(), gwc, utils.ViewerRole)
case events.SpaceDeleted:
executant = e.Executant
for u := range e.FinalMembers {
@@ -148,22 +157,22 @@ func (ul *UserlogService) processEvent(event events.Event) {
}
case events.SpaceShared:
executant = e.Executant
users, err = ul.resolveID(ul.mustAuthenticate(), e.GranteeUserID, e.GranteeGroupID)
users, err = utils.ResolveID(ctx, e.GranteeUserID, e.GranteeGroupID, gwc)
case events.SpaceUnshared:
executant = e.Executant
users, err = ul.resolveID(ul.mustAuthenticate(), e.GranteeUserID, e.GranteeGroupID)
users, err = utils.ResolveID(ctx, e.GranteeUserID, e.GranteeGroupID, gwc)
case events.SpaceMembershipExpired:
users, err = ul.resolveID(ul.mustAuthenticate(), e.GranteeUserID, e.GranteeGroupID)
users, err = utils.ResolveID(ctx, e.GranteeUserID, e.GranteeGroupID, gwc)
// share related
case events.ShareCreated:
executant = e.Executant
users, err = ul.resolveID(ul.mustAuthenticate(), e.GranteeUserID, e.GranteeGroupID)
users, err = utils.ResolveID(ctx, e.GranteeUserID, e.GranteeGroupID, gwc)
case events.ShareRemoved:
executant = e.Executant
users, err = ul.resolveID(ul.mustAuthenticate(), e.GranteeUserID, e.GranteeGroupID)
users, err = utils.ResolveID(ctx, e.GranteeUserID, e.GranteeGroupID, gwc)
case events.ShareExpired:
users, err = ul.resolveID(ul.mustAuthenticate(), e.GranteeUserID, e.GranteeGroupID)
users, err = utils.ResolveID(ctx, e.GranteeUserID, e.GranteeGroupID, gwc)
}
if err != nil {
@@ -180,7 +189,12 @@ func (ul *UserlogService) processEvent(event events.Event) {
// III) store the eventID for each user
for _, id := range users {
if err := ul.addEventToUser(id, event); err != nil {
if !ul.cfg.DisableSSE {
if err := ul.sendSSE(ctx, id, event, gwc); err != nil {
ul.log.Error().Err(err).Str("userid", id).Str("eventid", event.ID).Msg("cannot create sse event")
}
}
if err := ul.addEventToUser(ctx, id, event); err != nil {
ul.log.Error().Err(err).Str("userID", id).Str("eventid", event.ID).Msg("failed to store event for user")
return
}
@@ -316,19 +330,14 @@ func (ul *UserlogService) DeleteGlobalEvents(ctx context.Context, evnames []stri
})
}
func (ul *UserlogService) addEventToUser(userid string, event events.Event) error {
if !ul.cfg.DisableSSE {
if err := ul.sendSSE(userid, event); err != nil {
ul.log.Error().Err(err).Str("userid", userid).Str("eventid", event.ID).Msg("cannot create sse event")
}
}
func (ul *UserlogService) addEventToUser(ctx context.Context, userid string, event events.Event) error {
return ul.alterUserEventList(userid, func(ids []string) []string {
return append(ids, event.ID)
})
}
func (ul *UserlogService) sendSSE(userid string, event events.Event) error {
ev, err := ul.getConverter(ul.getUserLocale(userid)).ConvertEvent(event.ID, event.Event)
func (ul *UserlogService) sendSSE(ctx context.Context, userid string, event events.Event, gwc gateway.GatewayAPIClient) error {
ev, err := NewConverter(ctx, ul.getUserLocale(userid), gwc, ul.cfg.Service.Name, ul.cfg.TranslationPath).ConvertEvent(event.ID, event.Event)
if err != nil {
return err
}
@@ -419,105 +428,6 @@ func (ul *UserlogService) alterGlobalEvents(ctx context.Context, alter func(map[
})
}
// we need the spaceid to inform other space members
// we need an owner to query space members
// we need to check the user has the required role to see the event
func (ul *UserlogService) findSpaceMembers(ctx context.Context, spaceID string, requiredRole permissionChecker) ([]string, error) {
if ctx == nil {
return nil, errors.New("need authenticated context to find space members")
}
space, err := getSpace(ctx, spaceID, ul.gatewaySelector)
if err != nil {
return nil, err
}
var users []string
switch space.SpaceType {
case "personal":
users = []string{space.GetOwner().GetId().GetOpaqueId()}
case "project":
if users, err = ul.gatherSpaceMembers(ctx, space, requiredRole); err != nil {
return nil, err
}
default:
// TODO: shares? other space types?
return nil, fmt.Errorf("unsupported space type: %s", space.SpaceType)
}
return users, nil
}
func (ul *UserlogService) gatherSpaceMembers(ctx context.Context, space *storageprovider.StorageSpace, hasRequiredRole permissionChecker) ([]string, error) {
var permissionsMap map[string]*storageprovider.ResourcePermissions
if err := utils.ReadJSONFromOpaque(space.GetOpaque(), "grants", &permissionsMap); err != nil {
return nil, err
}
groupsMap := make(map[string]struct{})
if opaqueGroups, ok := space.Opaque.Map["groups"]; ok {
_ = json.Unmarshal(opaqueGroups.GetValue(), &groupsMap)
}
// we use a map to avoid duplicates
usermap := make(map[string]struct{})
for id, perm := range permissionsMap {
if !hasRequiredRole(perm) {
// not allowed to receive event
continue
}
if _, isGroup := groupsMap[id]; !isGroup {
usermap[id] = struct{}{}
continue
}
usrs, err := ul.resolveGroup(ctx, id)
if err != nil {
ul.log.Error().Err(err).Str("groupID", id).Msg("failed to resolve group")
continue
}
for _, u := range usrs {
usermap[u] = struct{}{}
}
}
var users []string
for id := range usermap {
users = append(users, id)
}
return users, nil
}
func (ul *UserlogService) resolveID(ctx context.Context, userid *user.UserId, groupid *group.GroupId) ([]string, error) {
if userid != nil {
return []string{userid.GetOpaqueId()}, nil
}
if ctx == nil {
return nil, errors.New("need ctx to resolve group id")
}
return ul.resolveGroup(ctx, groupid.GetOpaqueId())
}
// resolves the users of a group
func (ul *UserlogService) resolveGroup(ctx context.Context, groupID string) ([]string, error) {
grp, err := getGroup(ctx, groupID, ul.gatewaySelector)
if err != nil {
return nil, err
}
var userIDs []string
for _, m := range grp.GetMembers() {
userIDs = append(userIDs, m.GetOpaqueId())
}
return userIDs, nil
}
func (ul *UserlogService) getUserLocale(userid string) string {
resp, err := ul.valueClient.GetValueByUniqueIdentifiers(
micrometadata.Set(context.Background(), middleware.AccountID, userid),
@@ -537,122 +447,6 @@ func (ul *UserlogService) getUserLocale(userid string) string {
return val[0].GetStringValue()
}
func (ul *UserlogService) getConverter(locale string) *Converter {
return NewConverter(locale, ul.gatewaySelector, ul.cfg.MachineAuthAPIKey, ul.cfg.Service.Name, ul.cfg.TranslationPath, ul.cfg.ServiceAccount.ServiceAccountID, ul.cfg.ServiceAccount.ServiceAccountSecret)
}
func (ul *UserlogService) mustAuthenticate() context.Context {
ctx, err := authenticate(ul.cfg.ServiceAccount.ServiceAccountID, ul.gatewaySelector, ul.cfg.ServiceAccount.ServiceAccountSecret)
if err != nil {
ul.log.Error().Err(err).Str("accountid", ul.cfg.ServiceAccount.ServiceAccountID).Msg("failed to impersonate service account")
return nil
}
return ctx
}
func authenticate(serviceAccountID string, gatewaySelector pool.Selectable[gateway.GatewayAPIClient], serviceAccountSecret string) (context.Context, error) {
gatewayClient, err := gatewaySelector.Next()
if err != nil {
return nil, err
}
return utils.GetServiceUserContext(serviceAccountID, gatewayClient, serviceAccountSecret)
}
func getSpace(ctx context.Context, spaceID string, gatewaySelector pool.Selectable[gateway.GatewayAPIClient]) (*storageprovider.StorageSpace, error) {
gatewayClient, err := gatewaySelector.Next()
if err != nil {
return nil, err
}
res, err := gatewayClient.ListStorageSpaces(ctx, listStorageSpaceRequest(spaceID))
if err != nil {
return nil, err
}
if res.GetStatus().GetCode() != rpc.Code_CODE_OK {
return nil, fmt.Errorf("error while getting space: (%v) %s", res.GetStatus().GetCode(), res.GetStatus().GetMessage())
}
if len(res.StorageSpaces) == 0 {
return nil, fmt.Errorf("error getting storage space %s: no space returned", spaceID)
}
return res.StorageSpaces[0], nil
}
func getUser(ctx context.Context, userid *user.UserId, gatewaySelector pool.Selectable[gateway.GatewayAPIClient]) (*user.User, error) {
gatewayClient, err := gatewaySelector.Next()
if err != nil {
return nil, err
}
getUserResponse, err := gatewayClient.GetUser(context.Background(), &user.GetUserRequest{
UserId: userid,
})
if err != nil {
return nil, err
}
if getUserResponse.Status.Code != rpc.Code_CODE_OK {
return nil, fmt.Errorf("error getting user: %s", getUserResponse.Status.Message)
}
return getUserResponse.GetUser(), nil
}
func getGroup(ctx context.Context, groupid string, gatewaySelector pool.Selectable[gateway.GatewayAPIClient]) (*group.Group, error) {
gatewayClient, err := gatewaySelector.Next()
if err != nil {
return nil, err
}
r, err := gatewayClient.GetGroup(ctx, &group.GetGroupRequest{GroupId: &group.GroupId{OpaqueId: groupid}})
if err != nil {
return nil, err
}
if r.GetStatus().GetCode() != rpc.Code_CODE_OK {
return nil, fmt.Errorf("unexpected status code from gateway client: %d", r.GetStatus().GetCode())
}
return r.GetGroup(), nil
}
func getResource(ctx context.Context, resourceid *storageprovider.ResourceId, gatewaySelector pool.Selectable[gateway.GatewayAPIClient]) (*storageprovider.ResourceInfo, error) {
gatewayClient, err := gatewaySelector.Next()
if err != nil {
return nil, err
}
res, err := gatewayClient.Stat(ctx, &storageprovider.StatRequest{Ref: &storageprovider.Reference{ResourceId: resourceid}})
if err != nil {
return nil, err
}
if res.GetStatus().GetCode() != rpc.Code_CODE_OK {
return nil, fmt.Errorf("unexpected status code while getting space: %v", res.GetStatus().GetCode())
}
return res.GetInfo(), nil
}
func listStorageSpaceRequest(spaceID string) *storageprovider.ListStorageSpacesRequest {
return &storageprovider.ListStorageSpacesRequest{
Opaque: utils.AppendPlainToOpaque(nil, "unrestricted", "true"),
Filters: []*storageprovider.ListStorageSpacesRequest_Filter{
{
Type: storageprovider.ListStorageSpacesRequest_Filter_TYPE_ID,
Term: &storageprovider.ListStorageSpacesRequest_Filter_Id{
Id: &storageprovider.StorageSpaceId{
OpaqueId: spaceID,
},
},
},
},
}
}
func removeExecutant(users []string, executant *user.UserId) []string {
var usrs []string
for _, u := range users {
@@ -662,17 +456,3 @@ func removeExecutant(users []string, executant *user.UserId) []string {
}
return usrs
}
type permissionChecker func(*storageprovider.ResourcePermissions) bool
func viewer(perms *storageprovider.ResourcePermissions) bool {
return perms.Stat
}
func editor(perms *storageprovider.ResourcePermissions) bool {
return perms.InitiateFileUpload
}
func manager(perms *storageprovider.ResourcePermissions) bool {
return perms.DenyGrant
}