more functionality for the sse service

Signed-off-by: jkoberg <jkoberg@owncloud.com>
This commit is contained in:
jkoberg
2023-08-17 15:51:38 +02:00
parent 96292ba4b6
commit 3eb66d8dac
11 changed files with 368 additions and 59 deletions

View File

@@ -78,6 +78,7 @@ config = {
"services/search",
"services/settings",
"services/sharing",
"services/sse",
"services/storage-system",
"services/storage-publiclink",
"services/storage-shares",

View File

@@ -44,6 +44,7 @@ OCIS_MODULES = \
services/search \
services/settings \
services/sharing \
services/sse \
services/storage-system \
services/storage-publiclink \
services/storage-shares \

View File

@@ -1,3 +1,7 @@
# SSE
@todo
The `sse` service is responsible for sending sse (Server-Sent Events) to a user. See [What is Server-Sent Events](https://medium.com/yemeksepeti-teknoloji/what-is-server-sent-events-sse-and-how-to-implement-it-904938bffd73) for a simple introduction and examples of server sent events.
## Subscribing
Clients can subscribe to the `/sse` endpoint to be informed by the server when an event happens. The `sse` endpoint will respect language changes of the user without needing to reconnect. Note that SSE has a limitation of six open connections per browser which can be reached if one has opened various tabs of the Web UI pointing to the same Infinite Scale instance.

View File

@@ -4,6 +4,8 @@ import (
"context"
"fmt"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/events/stream"
"github.com/oklog/run"
"github.com/urfave/cli/v2"
@@ -11,12 +13,18 @@ import (
"github.com/owncloud/ocis/v2/ocis-pkg/handlers"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/ocis-pkg/service/debug"
"github.com/owncloud/ocis/v2/ocis-pkg/tracing"
"github.com/owncloud/ocis/v2/ocis-pkg/version"
"github.com/owncloud/ocis/v2/services/sse/pkg/config"
"github.com/owncloud/ocis/v2/services/sse/pkg/config/parser"
"github.com/owncloud/ocis/v2/services/sse/pkg/service"
"github.com/owncloud/ocis/v2/services/sse/pkg/server/http"
)
// all events we care about
var _registeredEvents = []events.Unmarshaller{
events.SendSSE{},
}
// Server is the entrypoint for the server command.
func Server(cfg *config.Config) *cli.Command {
return &cli.Command{
@@ -45,13 +53,27 @@ func Server(cfg *config.Config) *cli.Command {
)
defer cancel()
tracerProvider, err := tracing.GetServiceTraceProvider(cfg.Tracing, cfg.Service.Name)
if err != nil {
return err
}
{
svc, err := service.NewSSE(cfg, logger)
natsStream, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events))
if err != nil {
return err
}
gr.Add(svc.Run, func(_ error) {
server, err := http.Server(
http.Logger(logger),
http.Context(ctx),
http.Config(cfg),
http.Consumer(natsStream),
http.RegisteredEvents(_registeredEvents),
http.TracerProvider(tracerProvider),
)
gr.Add(server.Run, func(_ error) {
cancel()
})
}

View File

@@ -11,11 +11,14 @@ type Config struct {
Commons *shared.Commons `yaml:"-"` // don't use this directly as configuration for a service
Log *Log
Debug Debug `mask:"struct" yaml:"debug"`
Debug Debug `mask:"struct" yaml:"debug"`
Tracing *Tracing `yaml:"tracing"`
Service Service `yaml:"-"`
Events Events
Events Events
HTTP HTTP `yaml:"http"`
TokenManager *TokenManager `yaml:"token_manager"`
Context context.Context `yaml:"-" json:"-"`
}
@@ -49,3 +52,25 @@ type Events struct {
TLSRootCACertificate string `yaml:"tls_root_ca_certificate" env:"OCIS_EVENTS_TLS_ROOT_CA_CERTIFICATE;SSE_EVENTS_TLS_ROOT_CA_CERTIFICATE" desc:"The root CA certificate used to validate the server's TLS certificate. If provided SSE_EVENTS_TLS_INSECURE will be seen as false."`
EnableTLS bool `yaml:"enable_tls" env:"OCIS_EVENTS_ENABLE_TLS;SSE_EVENTS_ENABLE_TLS" desc:"Enable TLS for the connection to the events broker. The events broker is the ocis service which receives and delivers events between the services."`
}
// CORS defines the available cors configuration.
type CORS struct {
AllowedOrigins []string `yaml:"allow_origins" env:"OCIS_CORS_ALLOW_ORIGINS;SSE_CORS_ALLOW_ORIGINS" desc:"A comma-separated list of allowed CORS origins. See following chapter for more details: *Access-Control-Allow-Origin* at https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Access-Control-Allow-Origin"`
AllowedMethods []string `yaml:"allow_methods" env:"OCIS_CORS_ALLOW_METHODS;SSE_CORS_ALLOW_METHODS" desc:"A comma-separated list of allowed CORS methods. See following chapter for more details: *Access-Control-Request-Method* at https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Access-Control-Request-Method"`
AllowedHeaders []string `yaml:"allow_headers" env:"OCIS_CORS_ALLOW_HEADERS;SSE_CORS_ALLOW_HEADERS" desc:"A blank or comma-separated list of allowed CORS headers. See following chapter for more details: *Access-Control-Request-Headers* at https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Access-Control-Request-Headers."`
AllowCredentials bool `yaml:"allow_credentials" env:"OCIS_CORS_ALLOW_CREDENTIALS;SSE_CORS_ALLOW_CREDENTIALS" desc:"Allow credentials for CORS.See following chapter for more details: *Access-Control-Allow-Credentials* at https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Access-Control-Allow-Credentials."`
}
// HTTP defines the available http configuration.
type HTTP struct {
Addr string `yaml:"addr" env:"SSE_HTTP_ADDR" desc:"The bind address of the HTTP service."`
Namespace string `yaml:"-"`
Root string `yaml:"root" env:"SSE_HTTP_ROOT" desc:"Subdirectory that serves as the root for this HTTP service."`
CORS CORS `yaml:"cors"`
TLS shared.HTTPServiceTLS `yaml:"tls"`
}
// TokenManager is the config for using the reva token manager
type TokenManager struct {
JWTSecret string `yaml:"jwt_secret" env:"OCIS_JWT_SECRET;SSE_JWT_SECRET" desc:"The secret to mint and validate jwt tokens."`
}

View File

@@ -1,6 +1,8 @@
package defaults
import (
"strings"
"github.com/owncloud/ocis/v2/services/sse/pkg/config"
)
@@ -26,17 +28,64 @@ func DefaultConfig() *config.Config {
Endpoint: "127.0.0.1:9233",
Cluster: "ocis-cluster",
},
HTTP: config.HTTP{
Addr: "127.0.0.1:0",
Root: "/",
Namespace: "com.owncloud.sse",
CORS: config.CORS{
AllowedOrigins: []string{"*"},
AllowedMethods: []string{"GET"},
AllowedHeaders: []string{"Authorization", "Origin", "Content-Type", "Accept", "X-Requested-With", "X-Request-Id", "Ocs-Apirequest"},
AllowCredentials: true,
},
},
}
}
// EnsureDefaults adds default values to the configuration if they are not set yet
func EnsureDefaults(cfg *config.Config) {
if cfg.Log == nil {
// provide with defaults for shared logging, since we need a valid destination address for "envdecode".
if cfg.Log == nil && cfg.Commons != nil && cfg.Commons.Log != nil {
cfg.Log = &config.Log{
Level: cfg.Commons.Log.Level,
Pretty: cfg.Commons.Log.Pretty,
Color: cfg.Commons.Log.Color,
File: cfg.Commons.Log.File,
}
} else if cfg.Log == nil {
cfg.Log = &config.Log{}
}
if cfg.TokenManager == nil && cfg.Commons != nil && cfg.Commons.TokenManager != nil {
cfg.TokenManager = &config.TokenManager{
JWTSecret: cfg.Commons.TokenManager.JWTSecret,
}
} else if cfg.TokenManager == nil {
cfg.TokenManager = &config.TokenManager{}
}
if cfg.Commons != nil {
cfg.HTTP.TLS = cfg.Commons.HTTPServiceTLS
}
// provide with defaults for shared tracing, since we need a valid destination address for "envdecode".
if cfg.Tracing == nil && cfg.Commons != nil && cfg.Commons.Tracing != nil {
cfg.Tracing = &config.Tracing{
Enabled: cfg.Commons.Tracing.Enabled,
Type: cfg.Commons.Tracing.Type,
Endpoint: cfg.Commons.Tracing.Endpoint,
Collector: cfg.Commons.Tracing.Collector,
}
} else if cfg.Tracing == nil {
cfg.Tracing = &config.Tracing{}
}
}
// Sanitize sanitizes the configuration
func Sanitize(cfg *config.Config) {
// sanitize config
if cfg.HTTP.Root != "/" {
cfg.HTTP.Root = strings.TrimSuffix(cfg.HTTP.Root, "/")
}
}

View File

@@ -0,0 +1,21 @@
package config
import "github.com/owncloud/ocis/v2/ocis-pkg/tracing"
// Tracing defines the available tracing configuration.
type Tracing struct {
Enabled bool `yaml:"enabled" env:"OCIS_TRACING_ENABLED;SSE_TRACING_ENABLED" desc:"Activates tracing."`
Type string `yaml:"type" env:"OCIS_TRACING_TYPE;SSE_TRACING_TYPE" desc:"The type of tracing. Defaults to '', which is the same as 'jaeger'. Allowed tracing types are 'jaeger' and '' as of now."`
Endpoint string `yaml:"endpoint" env:"OCIS_TRACING_ENDPOINT;SSE_TRACING_ENDPOINT" desc:"The endpoint of the tracing agent."`
Collector string `yaml:"collector" env:"OCIS_TRACING_COLLECTOR;SSE_TRACING_COLLECTOR" desc:"The HTTP endpoint for sending spans directly to a collector, i.e. http://jaeger-collector:14268/api/traces. Only used if the tracing endpoint is unset."`
}
// Convert Tracing to the tracing package's Config struct.
func (t Tracing) Convert() tracing.Config {
return tracing.Config{
Enabled: t.Enabled,
Type: t.Type,
Endpoint: t.Endpoint,
Collector: t.Collector,
}
}

View File

@@ -0,0 +1,76 @@
package http
import (
"context"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/services/sse/pkg/config"
"go.opentelemetry.io/otel/trace"
)
// Option defines a single option function.
type Option func(o *Options)
// Options defines the available options for this package.
type Options struct {
Logger log.Logger
Context context.Context
Config *config.Config
Consumer events.Consumer
RegisteredEvents []events.Unmarshaller
TracerProvider trace.TracerProvider
}
// newOptions initializes the available default options.
func newOptions(opts ...Option) Options {
opt := Options{}
for _, o := range opts {
o(&opt)
}
return opt
}
// Logger provides a function to set the logger option.
func Logger(val log.Logger) Option {
return func(o *Options) {
o.Logger = val
}
}
// Context provides a function to set the context option.
func Context(val context.Context) Option {
return func(o *Options) {
o.Context = val
}
}
// Config provides a function to set the config option.
func Config(val *config.Config) Option {
return func(o *Options) {
o.Config = val
}
}
// Consumer provides a function to configure the consumer
func Consumer(consumer events.Consumer) Option {
return func(o *Options) {
o.Consumer = consumer
}
}
// RegisteredEvents provides a function to register events
func RegisteredEvents(evs []events.Unmarshaller) Option {
return func(o *Options) {
o.RegisteredEvents = evs
}
}
// TracerProvider provides a function to set the TracerProvider option
func TracerProvider(val trace.TracerProvider) Option {
return func(o *Options) {
o.TracerProvider = val
}
}

View File

@@ -0,0 +1,96 @@
package http
import (
"fmt"
stdhttp "net/http"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/go-chi/chi/v5"
chimiddleware "github.com/go-chi/chi/v5/middleware"
"github.com/owncloud/ocis/v2/ocis-pkg/account"
"github.com/owncloud/ocis/v2/ocis-pkg/cors"
"github.com/owncloud/ocis/v2/ocis-pkg/middleware"
"github.com/owncloud/ocis/v2/ocis-pkg/service/http"
"github.com/owncloud/ocis/v2/ocis-pkg/tracing"
"github.com/owncloud/ocis/v2/ocis-pkg/version"
svc "github.com/owncloud/ocis/v2/services/sse/pkg/service"
"github.com/riandyrn/otelchi"
"go-micro.dev/v4"
)
// Service is the service interface
type Service interface {
}
// Server initializes the http service and server.
func Server(opts ...Option) (http.Service, error) {
options := newOptions(opts...)
service, err := http.NewService(
http.TLSConfig(options.Config.HTTP.TLS),
http.Logger(options.Logger),
http.Namespace(options.Config.HTTP.Namespace),
http.Name(options.Config.Service.Name),
http.Version(version.GetString()),
http.Address(options.Config.HTTP.Addr),
http.Context(options.Context),
)
if err != nil {
options.Logger.Error().
Err(err).
Msg("Error initializing http service")
return http.Service{}, fmt.Errorf("could not initialize http service: %w", err)
}
middlewares := []func(stdhttp.Handler) stdhttp.Handler{
chimiddleware.RequestID,
middleware.Version(
"userlog",
version.GetString(),
),
middleware.Logger(
options.Logger,
),
middleware.ExtractAccountUUID(
account.Logger(options.Logger),
account.JWTSecret(options.Config.TokenManager.JWTSecret),
),
middleware.Cors(
cors.Logger(options.Logger),
cors.AllowedOrigins(options.Config.HTTP.CORS.AllowedOrigins),
cors.AllowedMethods(options.Config.HTTP.CORS.AllowedMethods),
cors.AllowedHeaders(options.Config.HTTP.CORS.AllowedHeaders),
cors.AllowCredentials(options.Config.HTTP.CORS.AllowCredentials),
),
middleware.Secure,
}
mux := chi.NewMux()
mux.Use(middlewares...)
mux.Use(
otelchi.Middleware(
"sse",
otelchi.WithChiRoutes(mux),
otelchi.WithTracerProvider(options.TracerProvider),
otelchi.WithPropagators(tracing.GetPropagator()),
),
)
ch, err := events.Consume(options.Consumer, "sse", options.RegisteredEvents...)
if err != nil {
return http.Service{}, err
}
handle, err := svc.NewSSE(options.Config, options.Logger, ch, mux)
if err != nil {
return http.Service{}, err
}
if err := micro.RegisterHandler(service.Server(), handle); err != nil {
return http.Service{}, err
}
return service, nil
}

View File

@@ -1,71 +1,89 @@
package service
import (
"bytes"
"crypto/x509"
"fmt"
"io"
"net/http"
"os"
revactx "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/events/stream"
"github.com/cs3org/reva/v2/pkg/rhttp"
"github.com/go-chi/chi/v5"
"github.com/r3labs/sse/v2"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/services/sse/pkg/config"
)
// SSE defines implements the business logic for Service.
type SSE struct {
c *config.Config
l log.Logger
m *chi.Mux
sse *sse.Server
evChannel <-chan events.Event
}
// NewSSE returns a service implementation for Service.
func NewSSE(c *config.Config, l log.Logger) (SSE, error) {
s := SSE{c: c, l: l, client: rhttp.GetHTTPClient(rhttp.Insecure(true))}
func NewSSE(c *config.Config, l log.Logger, ch <-chan events.Event, mux *chi.Mux) (SSE, error) {
s := SSE{
c: c,
l: l,
m: mux,
sse: sse.New(),
evChannel: ch,
}
mux.Route("/ocs/v2.php/apps/notifications/api/v1/notifications", func(r chi.Router) {
r.Get("/sse", s.HandleSSE)
})
go s.ListenForEvents()
return s, nil
}
// SSE defines implements the business logic for Service.
type SSE struct {
c *config.Config
l log.Logger
m uint64
client *http.Client
// ServeHTTP fulfills Handler interface
func (s SSE) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.m.ServeHTTP(w, r)
}
// Run runs the service
func (s SSE) Run() error {
evtsCfg := s.c.Events
var rootCAPool *x509.CertPool
if evtsCfg.TLSRootCACertificate != "" {
rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate)
if err != nil {
return err
// ListenForEvents listens for events
func (s SSE) ListenForEvents() error {
for e := range s.evChannel {
switch ev := e.Event.(type) {
default:
s.l.Error().Interface("event", ev).Msg("unhandled event")
case events.SendSSE:
s.sse.Publish(ev.UserID, &sse.Event{
Event: []byte(ev.Type),
Data: ev.Message,
})
}
var certBytes bytes.Buffer
if _, err := io.Copy(&certBytes, rootCrtFile); err != nil {
return err
}
rootCAPool = x509.NewCertPool()
rootCAPool.AppendCertsFromPEM(certBytes.Bytes())
evtsCfg.TLSInsecure = false
}
natsStream, err := stream.NatsFromConfig(stream.NatsConfig(s.c.Events))
if err != nil {
return err
}
ch, err := events.Consume(natsStream, "sse", events.StartPostprocessingStep{})
if err != nil {
return err
}
for e := range ch {
fmt.Println(e) // todo
}
return nil
}
// HandleSSE is the GET handler for events
func (s SSE) HandleSSE(w http.ResponseWriter, r *http.Request) {
u, ok := revactx.ContextGetUser(r.Context())
if !ok {
s.l.Error().Msg("sse: no user in context")
w.WriteHeader(http.StatusInternalServerError)
return
}
uid := u.GetId().GetOpaqueId()
if uid == "" {
s.l.Error().Msg("sse: user in context is broken")
w.WriteHeader(http.StatusInternalServerError)
return
}
stream := s.sse.CreateStream(uid)
stream.AutoReplay = false
// add stream to URL
q := r.URL.Query()
q.Set("stream", uid)
r.URL.RawQuery = q.Encode()
s.sse.ServeHTTP(w, r)
}

View File

@@ -30,10 +30,6 @@ For the time being, the configuration which user related events are of interest
The `userlog` service provides an API to retrieve configured events. For now, this API is mostly following the [oc10 notification GET API](https://doc.owncloud.com/server/next/developer_manual/core/apis/ocs-notification-endpoint-v1.html#get-user-notifications).
## Subscribing
Additionally to the oc10 API, the `userlog` service also provides an `/sse` (Server-Sent Events) endpoint to be informed by the server when an event happens. See [What is Server-Sent Events](https://medium.com/yemeksepeti-teknoloji/what-is-server-sent-events-sse-and-how-to-implement-it-904938bffd73) for a simple introduction and examples of server sent events. The `sse` endpoint will respect language changes of the user without needing to reconnect. Note that SSE has a limitation of six open connections per browser which can be reached if one has opened various tabs of the Web UI pointing to the same Infinite Scale instance.
## Posting
The userlog service is able to store global messages that will be displayed in the Web UI to all users. If a user deletes the message in the Web UI, it reappears on reload. Global messages use the endpoint `/ocs/v2.php/apps/notifications/api/v1/notifications/global` and are activated by sending a `POST` request. Note that sending another `POST` request of the same type overwrites the previous one. For the time being, only the type `deprovision` is supported.