diff --git a/.drone.star b/.drone.star index 951690839d..dcf05705a0 100644 --- a/.drone.star +++ b/.drone.star @@ -78,6 +78,7 @@ config = { "services/search", "services/settings", "services/sharing", + "services/sse", "services/storage-system", "services/storage-publiclink", "services/storage-shares", diff --git a/Makefile b/Makefile index 92d5b88573..8e648ffc23 100644 --- a/Makefile +++ b/Makefile @@ -44,6 +44,7 @@ OCIS_MODULES = \ services/search \ services/settings \ services/sharing \ + services/sse \ services/storage-system \ services/storage-publiclink \ services/storage-shares \ diff --git a/services/sse/README.md b/services/sse/README.md index 4c0aabfc0d..ac5e43ae41 100644 --- a/services/sse/README.md +++ b/services/sse/README.md @@ -1,3 +1,7 @@ # SSE -@todo +The `sse` service is responsible for sending sse (Server-Sent Events) to a user. See [What is Server-Sent Events](https://medium.com/yemeksepeti-teknoloji/what-is-server-sent-events-sse-and-how-to-implement-it-904938bffd73) for a simple introduction and examples of server sent events. + +## Subscribing + +Clients can subscribe to the `/sse` endpoint to be informed by the server when an event happens. The `sse` endpoint will respect language changes of the user without needing to reconnect. Note that SSE has a limitation of six open connections per browser which can be reached if one has opened various tabs of the Web UI pointing to the same Infinite Scale instance. diff --git a/services/sse/pkg/command/server.go b/services/sse/pkg/command/server.go index 7d3d8c32ab..b5da4063da 100644 --- a/services/sse/pkg/command/server.go +++ b/services/sse/pkg/command/server.go @@ -4,6 +4,8 @@ import ( "context" "fmt" + "github.com/cs3org/reva/v2/pkg/events" + "github.com/cs3org/reva/v2/pkg/events/stream" "github.com/oklog/run" "github.com/urfave/cli/v2" @@ -11,12 +13,18 @@ import ( "github.com/owncloud/ocis/v2/ocis-pkg/handlers" "github.com/owncloud/ocis/v2/ocis-pkg/log" "github.com/owncloud/ocis/v2/ocis-pkg/service/debug" + "github.com/owncloud/ocis/v2/ocis-pkg/tracing" "github.com/owncloud/ocis/v2/ocis-pkg/version" "github.com/owncloud/ocis/v2/services/sse/pkg/config" "github.com/owncloud/ocis/v2/services/sse/pkg/config/parser" - "github.com/owncloud/ocis/v2/services/sse/pkg/service" + "github.com/owncloud/ocis/v2/services/sse/pkg/server/http" ) +// all events we care about +var _registeredEvents = []events.Unmarshaller{ + events.SendSSE{}, +} + // Server is the entrypoint for the server command. func Server(cfg *config.Config) *cli.Command { return &cli.Command{ @@ -45,13 +53,27 @@ func Server(cfg *config.Config) *cli.Command { ) defer cancel() + tracerProvider, err := tracing.GetServiceTraceProvider(cfg.Tracing, cfg.Service.Name) + if err != nil { + return err + } + { - svc, err := service.NewSSE(cfg, logger) + natsStream, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events)) if err != nil { return err } - gr.Add(svc.Run, func(_ error) { + server, err := http.Server( + http.Logger(logger), + http.Context(ctx), + http.Config(cfg), + http.Consumer(natsStream), + http.RegisteredEvents(_registeredEvents), + http.TracerProvider(tracerProvider), + ) + + gr.Add(server.Run, func(_ error) { cancel() }) } diff --git a/services/sse/pkg/config/config.go b/services/sse/pkg/config/config.go index a03c63f459..b7e9509c48 100644 --- a/services/sse/pkg/config/config.go +++ b/services/sse/pkg/config/config.go @@ -11,11 +11,14 @@ type Config struct { Commons *shared.Commons `yaml:"-"` // don't use this directly as configuration for a service Log *Log - Debug Debug `mask:"struct" yaml:"debug"` + Debug Debug `mask:"struct" yaml:"debug"` + Tracing *Tracing `yaml:"tracing"` Service Service `yaml:"-"` - Events Events + Events Events + HTTP HTTP `yaml:"http"` + TokenManager *TokenManager `yaml:"token_manager"` Context context.Context `yaml:"-" json:"-"` } @@ -49,3 +52,25 @@ type Events struct { TLSRootCACertificate string `yaml:"tls_root_ca_certificate" env:"OCIS_EVENTS_TLS_ROOT_CA_CERTIFICATE;SSE_EVENTS_TLS_ROOT_CA_CERTIFICATE" desc:"The root CA certificate used to validate the server's TLS certificate. If provided SSE_EVENTS_TLS_INSECURE will be seen as false."` EnableTLS bool `yaml:"enable_tls" env:"OCIS_EVENTS_ENABLE_TLS;SSE_EVENTS_ENABLE_TLS" desc:"Enable TLS for the connection to the events broker. The events broker is the ocis service which receives and delivers events between the services."` } + +// CORS defines the available cors configuration. +type CORS struct { + AllowedOrigins []string `yaml:"allow_origins" env:"OCIS_CORS_ALLOW_ORIGINS;SSE_CORS_ALLOW_ORIGINS" desc:"A comma-separated list of allowed CORS origins. See following chapter for more details: *Access-Control-Allow-Origin* at https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Access-Control-Allow-Origin"` + AllowedMethods []string `yaml:"allow_methods" env:"OCIS_CORS_ALLOW_METHODS;SSE_CORS_ALLOW_METHODS" desc:"A comma-separated list of allowed CORS methods. See following chapter for more details: *Access-Control-Request-Method* at https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Access-Control-Request-Method"` + AllowedHeaders []string `yaml:"allow_headers" env:"OCIS_CORS_ALLOW_HEADERS;SSE_CORS_ALLOW_HEADERS" desc:"A blank or comma-separated list of allowed CORS headers. See following chapter for more details: *Access-Control-Request-Headers* at https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Access-Control-Request-Headers."` + AllowCredentials bool `yaml:"allow_credentials" env:"OCIS_CORS_ALLOW_CREDENTIALS;SSE_CORS_ALLOW_CREDENTIALS" desc:"Allow credentials for CORS.See following chapter for more details: *Access-Control-Allow-Credentials* at https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Access-Control-Allow-Credentials."` +} + +// HTTP defines the available http configuration. +type HTTP struct { + Addr string `yaml:"addr" env:"SSE_HTTP_ADDR" desc:"The bind address of the HTTP service."` + Namespace string `yaml:"-"` + Root string `yaml:"root" env:"SSE_HTTP_ROOT" desc:"Subdirectory that serves as the root for this HTTP service."` + CORS CORS `yaml:"cors"` + TLS shared.HTTPServiceTLS `yaml:"tls"` +} + +// TokenManager is the config for using the reva token manager +type TokenManager struct { + JWTSecret string `yaml:"jwt_secret" env:"OCIS_JWT_SECRET;SSE_JWT_SECRET" desc:"The secret to mint and validate jwt tokens."` +} diff --git a/services/sse/pkg/config/defaults/defaultconfig.go b/services/sse/pkg/config/defaults/defaultconfig.go index c1e77c262b..0aba9557fc 100644 --- a/services/sse/pkg/config/defaults/defaultconfig.go +++ b/services/sse/pkg/config/defaults/defaultconfig.go @@ -1,6 +1,8 @@ package defaults import ( + "strings" + "github.com/owncloud/ocis/v2/services/sse/pkg/config" ) @@ -26,17 +28,64 @@ func DefaultConfig() *config.Config { Endpoint: "127.0.0.1:9233", Cluster: "ocis-cluster", }, + HTTP: config.HTTP{ + Addr: "127.0.0.1:0", + Root: "/", + Namespace: "com.owncloud.sse", + CORS: config.CORS{ + AllowedOrigins: []string{"*"}, + AllowedMethods: []string{"GET"}, + AllowedHeaders: []string{"Authorization", "Origin", "Content-Type", "Accept", "X-Requested-With", "X-Request-Id", "Ocs-Apirequest"}, + AllowCredentials: true, + }, + }, } } // EnsureDefaults adds default values to the configuration if they are not set yet func EnsureDefaults(cfg *config.Config) { - if cfg.Log == nil { + // provide with defaults for shared logging, since we need a valid destination address for "envdecode". + if cfg.Log == nil && cfg.Commons != nil && cfg.Commons.Log != nil { + cfg.Log = &config.Log{ + Level: cfg.Commons.Log.Level, + Pretty: cfg.Commons.Log.Pretty, + Color: cfg.Commons.Log.Color, + File: cfg.Commons.Log.File, + } + } else if cfg.Log == nil { cfg.Log = &config.Log{} } + + if cfg.TokenManager == nil && cfg.Commons != nil && cfg.Commons.TokenManager != nil { + cfg.TokenManager = &config.TokenManager{ + JWTSecret: cfg.Commons.TokenManager.JWTSecret, + } + } else if cfg.TokenManager == nil { + cfg.TokenManager = &config.TokenManager{} + } + + if cfg.Commons != nil { + cfg.HTTP.TLS = cfg.Commons.HTTPServiceTLS + } + + // provide with defaults for shared tracing, since we need a valid destination address for "envdecode". + if cfg.Tracing == nil && cfg.Commons != nil && cfg.Commons.Tracing != nil { + cfg.Tracing = &config.Tracing{ + Enabled: cfg.Commons.Tracing.Enabled, + Type: cfg.Commons.Tracing.Type, + Endpoint: cfg.Commons.Tracing.Endpoint, + Collector: cfg.Commons.Tracing.Collector, + } + } else if cfg.Tracing == nil { + cfg.Tracing = &config.Tracing{} + } } // Sanitize sanitizes the configuration func Sanitize(cfg *config.Config) { + // sanitize config + if cfg.HTTP.Root != "/" { + cfg.HTTP.Root = strings.TrimSuffix(cfg.HTTP.Root, "/") + } } diff --git a/services/sse/pkg/config/tracing.go b/services/sse/pkg/config/tracing.go new file mode 100644 index 0000000000..5a177c3f59 --- /dev/null +++ b/services/sse/pkg/config/tracing.go @@ -0,0 +1,21 @@ +package config + +import "github.com/owncloud/ocis/v2/ocis-pkg/tracing" + +// Tracing defines the available tracing configuration. +type Tracing struct { + Enabled bool `yaml:"enabled" env:"OCIS_TRACING_ENABLED;SSE_TRACING_ENABLED" desc:"Activates tracing."` + Type string `yaml:"type" env:"OCIS_TRACING_TYPE;SSE_TRACING_TYPE" desc:"The type of tracing. Defaults to '', which is the same as 'jaeger'. Allowed tracing types are 'jaeger' and '' as of now."` + Endpoint string `yaml:"endpoint" env:"OCIS_TRACING_ENDPOINT;SSE_TRACING_ENDPOINT" desc:"The endpoint of the tracing agent."` + Collector string `yaml:"collector" env:"OCIS_TRACING_COLLECTOR;SSE_TRACING_COLLECTOR" desc:"The HTTP endpoint for sending spans directly to a collector, i.e. http://jaeger-collector:14268/api/traces. Only used if the tracing endpoint is unset."` +} + +// Convert Tracing to the tracing package's Config struct. +func (t Tracing) Convert() tracing.Config { + return tracing.Config{ + Enabled: t.Enabled, + Type: t.Type, + Endpoint: t.Endpoint, + Collector: t.Collector, + } +} diff --git a/services/sse/pkg/server/http/option.go b/services/sse/pkg/server/http/option.go new file mode 100644 index 0000000000..3640ba9ea9 --- /dev/null +++ b/services/sse/pkg/server/http/option.go @@ -0,0 +1,76 @@ +package http + +import ( + "context" + + "github.com/cs3org/reva/v2/pkg/events" + "github.com/owncloud/ocis/v2/ocis-pkg/log" + "github.com/owncloud/ocis/v2/services/sse/pkg/config" + "go.opentelemetry.io/otel/trace" +) + +// Option defines a single option function. +type Option func(o *Options) + +// Options defines the available options for this package. +type Options struct { + Logger log.Logger + Context context.Context + Config *config.Config + Consumer events.Consumer + RegisteredEvents []events.Unmarshaller + TracerProvider trace.TracerProvider +} + +// newOptions initializes the available default options. +func newOptions(opts ...Option) Options { + opt := Options{} + + for _, o := range opts { + o(&opt) + } + + return opt +} + +// Logger provides a function to set the logger option. +func Logger(val log.Logger) Option { + return func(o *Options) { + o.Logger = val + } +} + +// Context provides a function to set the context option. +func Context(val context.Context) Option { + return func(o *Options) { + o.Context = val + } +} + +// Config provides a function to set the config option. +func Config(val *config.Config) Option { + return func(o *Options) { + o.Config = val + } +} + +// Consumer provides a function to configure the consumer +func Consumer(consumer events.Consumer) Option { + return func(o *Options) { + o.Consumer = consumer + } +} + +// RegisteredEvents provides a function to register events +func RegisteredEvents(evs []events.Unmarshaller) Option { + return func(o *Options) { + o.RegisteredEvents = evs + } +} + +// TracerProvider provides a function to set the TracerProvider option +func TracerProvider(val trace.TracerProvider) Option { + return func(o *Options) { + o.TracerProvider = val + } +} diff --git a/services/sse/pkg/server/http/server.go b/services/sse/pkg/server/http/server.go new file mode 100644 index 0000000000..430ae0db3d --- /dev/null +++ b/services/sse/pkg/server/http/server.go @@ -0,0 +1,96 @@ +package http + +import ( + "fmt" + + stdhttp "net/http" + + "github.com/cs3org/reva/v2/pkg/events" + "github.com/go-chi/chi/v5" + chimiddleware "github.com/go-chi/chi/v5/middleware" + "github.com/owncloud/ocis/v2/ocis-pkg/account" + "github.com/owncloud/ocis/v2/ocis-pkg/cors" + "github.com/owncloud/ocis/v2/ocis-pkg/middleware" + "github.com/owncloud/ocis/v2/ocis-pkg/service/http" + "github.com/owncloud/ocis/v2/ocis-pkg/tracing" + "github.com/owncloud/ocis/v2/ocis-pkg/version" + svc "github.com/owncloud/ocis/v2/services/sse/pkg/service" + "github.com/riandyrn/otelchi" + "go-micro.dev/v4" +) + +// Service is the service interface +type Service interface { +} + +// Server initializes the http service and server. +func Server(opts ...Option) (http.Service, error) { + options := newOptions(opts...) + + service, err := http.NewService( + http.TLSConfig(options.Config.HTTP.TLS), + http.Logger(options.Logger), + http.Namespace(options.Config.HTTP.Namespace), + http.Name(options.Config.Service.Name), + http.Version(version.GetString()), + http.Address(options.Config.HTTP.Addr), + http.Context(options.Context), + ) + if err != nil { + options.Logger.Error(). + Err(err). + Msg("Error initializing http service") + return http.Service{}, fmt.Errorf("could not initialize http service: %w", err) + } + + middlewares := []func(stdhttp.Handler) stdhttp.Handler{ + chimiddleware.RequestID, + middleware.Version( + "userlog", + version.GetString(), + ), + middleware.Logger( + options.Logger, + ), + middleware.ExtractAccountUUID( + account.Logger(options.Logger), + account.JWTSecret(options.Config.TokenManager.JWTSecret), + ), + middleware.Cors( + cors.Logger(options.Logger), + cors.AllowedOrigins(options.Config.HTTP.CORS.AllowedOrigins), + cors.AllowedMethods(options.Config.HTTP.CORS.AllowedMethods), + cors.AllowedHeaders(options.Config.HTTP.CORS.AllowedHeaders), + cors.AllowCredentials(options.Config.HTTP.CORS.AllowCredentials), + ), + middleware.Secure, + } + + mux := chi.NewMux() + mux.Use(middlewares...) + + mux.Use( + otelchi.Middleware( + "sse", + otelchi.WithChiRoutes(mux), + otelchi.WithTracerProvider(options.TracerProvider), + otelchi.WithPropagators(tracing.GetPropagator()), + ), + ) + + ch, err := events.Consume(options.Consumer, "sse", options.RegisteredEvents...) + if err != nil { + return http.Service{}, err + } + + handle, err := svc.NewSSE(options.Config, options.Logger, ch, mux) + if err != nil { + return http.Service{}, err + } + + if err := micro.RegisterHandler(service.Server(), handle); err != nil { + return http.Service{}, err + } + + return service, nil +} diff --git a/services/sse/pkg/service/service.go b/services/sse/pkg/service/service.go index 72be9d020a..e0043f2a86 100644 --- a/services/sse/pkg/service/service.go +++ b/services/sse/pkg/service/service.go @@ -1,71 +1,89 @@ package service import ( - "bytes" - "crypto/x509" - "fmt" - "io" "net/http" - "os" + revactx "github.com/cs3org/reva/v2/pkg/ctx" "github.com/cs3org/reva/v2/pkg/events" - "github.com/cs3org/reva/v2/pkg/events/stream" - "github.com/cs3org/reva/v2/pkg/rhttp" + "github.com/go-chi/chi/v5" + "github.com/r3labs/sse/v2" "github.com/owncloud/ocis/v2/ocis-pkg/log" "github.com/owncloud/ocis/v2/services/sse/pkg/config" ) +// SSE defines implements the business logic for Service. +type SSE struct { + c *config.Config + l log.Logger + m *chi.Mux + sse *sse.Server + evChannel <-chan events.Event +} + // NewSSE returns a service implementation for Service. -func NewSSE(c *config.Config, l log.Logger) (SSE, error) { - s := SSE{c: c, l: l, client: rhttp.GetHTTPClient(rhttp.Insecure(true))} +func NewSSE(c *config.Config, l log.Logger, ch <-chan events.Event, mux *chi.Mux) (SSE, error) { + s := SSE{ + c: c, + l: l, + m: mux, + sse: sse.New(), + evChannel: ch, + } + mux.Route("/ocs/v2.php/apps/notifications/api/v1/notifications", func(r chi.Router) { + r.Get("/sse", s.HandleSSE) + }) + + go s.ListenForEvents() return s, nil } -// SSE defines implements the business logic for Service. -type SSE struct { - c *config.Config - l log.Logger - m uint64 - - client *http.Client +// ServeHTTP fulfills Handler interface +func (s SSE) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.m.ServeHTTP(w, r) } -// Run runs the service -func (s SSE) Run() error { - evtsCfg := s.c.Events - - var rootCAPool *x509.CertPool - if evtsCfg.TLSRootCACertificate != "" { - rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate) - if err != nil { - return err +// ListenForEvents listens for events +func (s SSE) ListenForEvents() error { + for e := range s.evChannel { + switch ev := e.Event.(type) { + default: + s.l.Error().Interface("event", ev).Msg("unhandled event") + case events.SendSSE: + s.sse.Publish(ev.UserID, &sse.Event{ + Event: []byte(ev.Type), + Data: ev.Message, + }) } - - var certBytes bytes.Buffer - if _, err := io.Copy(&certBytes, rootCrtFile); err != nil { - return err - } - - rootCAPool = x509.NewCertPool() - rootCAPool.AppendCertsFromPEM(certBytes.Bytes()) - evtsCfg.TLSInsecure = false - } - - natsStream, err := stream.NatsFromConfig(stream.NatsConfig(s.c.Events)) - if err != nil { - return err - } - - ch, err := events.Consume(natsStream, "sse", events.StartPostprocessingStep{}) - if err != nil { - return err - } - - for e := range ch { - fmt.Println(e) // todo } return nil } + +// HandleSSE is the GET handler for events +func (s SSE) HandleSSE(w http.ResponseWriter, r *http.Request) { + u, ok := revactx.ContextGetUser(r.Context()) + if !ok { + s.l.Error().Msg("sse: no user in context") + w.WriteHeader(http.StatusInternalServerError) + return + } + + uid := u.GetId().GetOpaqueId() + if uid == "" { + s.l.Error().Msg("sse: user in context is broken") + w.WriteHeader(http.StatusInternalServerError) + return + } + + stream := s.sse.CreateStream(uid) + stream.AutoReplay = false + + // add stream to URL + q := r.URL.Query() + q.Set("stream", uid) + r.URL.RawQuery = q.Encode() + + s.sse.ServeHTTP(w, r) +} diff --git a/services/userlog/README.md b/services/userlog/README.md index 474fcd07f7..aaaf18f409 100644 --- a/services/userlog/README.md +++ b/services/userlog/README.md @@ -30,10 +30,6 @@ For the time being, the configuration which user related events are of interest The `userlog` service provides an API to retrieve configured events. For now, this API is mostly following the [oc10 notification GET API](https://doc.owncloud.com/server/next/developer_manual/core/apis/ocs-notification-endpoint-v1.html#get-user-notifications). -## Subscribing - -Additionally to the oc10 API, the `userlog` service also provides an `/sse` (Server-Sent Events) endpoint to be informed by the server when an event happens. See [What is Server-Sent Events](https://medium.com/yemeksepeti-teknoloji/what-is-server-sent-events-sse-and-how-to-implement-it-904938bffd73) for a simple introduction and examples of server sent events. The `sse` endpoint will respect language changes of the user without needing to reconnect. Note that SSE has a limitation of six open connections per browser which can be reached if one has opened various tabs of the Web UI pointing to the same Infinite Scale instance. - ## Posting The userlog service is able to store global messages that will be displayed in the Web UI to all users. If a user deletes the message in the Web UI, it reappears on reload. Global messages use the endpoint `/ocs/v2.php/apps/notifications/api/v1/notifications/global` and are activated by sending a `POST` request. Note that sending another `POST` request of the same type overwrites the previous one. For the time being, only the type `deprovision` is supported.