diff --git a/.gitignore b/.gitignore index 0902237f4a..c4c08a62aa 100644 --- a/.gitignore +++ b/.gitignore @@ -49,6 +49,7 @@ protogen/buf.sha1.lock /third-party-licenses # misc +/tmp go.work go.work.sum .env diff --git a/changelog/unreleased/enhancement-sse-messaging.md b/changelog/unreleased/enhancement-sse-messaging.md new file mode 100644 index 0000000000..31c98eb2a0 --- /dev/null +++ b/changelog/unreleased/enhancement-sse-messaging.md @@ -0,0 +1,10 @@ +Enhancement: SSE for messaging + +So far, sse has only been used to exchange messages between the server and the client. +In order to be able to send more content to the client, we have moved the endpoint to a separate service and are now also using it for other notifications like: + +* notify postprocessing state changes. +* notify file locking and unlocking. +* ... @toDo + +https://github.com/owncloud/ocis/pull/6992 diff --git a/docs/services/general-info/port-ranges.md b/docs/services/general-info/port-ranges.md index f2286dadc5..e3baac04d5 100644 --- a/docs/services/general-info/port-ranges.md +++ b/docs/services/general-info/port-ranges.md @@ -30,7 +30,7 @@ We also suggest to use the last port in your extensions' range as a debug/metric | 9120-9124 | [graph]({{< ref "../graph/_index.md" >}}) | | 9125-9129 | [policies]({{< ref "../policies/_index.md" >}}) | | 9130-9134 | [idp]({{< ref "../idp/_index.md" >}}) | -| 9135-9139 | FREE (formerly used by graph-explorer) | +| 9135-9139 | [sse]({{< ref "../sse/_index.md" >}}) | | 9140-9141 | [frontend]({{< ref "../frontend/_index.md" >}}) | | 9142-9143 | [gateway]({{< ref "../gateway/_index.md" >}}) | | 9144-9145 | [users]({{< ref "../users/_index.md" >}}) | diff --git a/docs/services/sse/_index.md b/docs/services/sse/_index.md new file mode 100644 index 0000000000..d839d16de5 --- /dev/null +++ b/docs/services/sse/_index.md @@ -0,0 +1,16 @@ +--- +title: SSE +date: 2022-08-08T00:00:00+00:00 +weight: 20 +geekdocRepo: https://github.com/owncloud/ocis +geekdocEditPath: edit/master/docs/services/sse +geekdocFilePath: _index.md +geekdocCollapseSection: true +--- + +## Abstract + + +## Table of Contents + +{{< toc-tree >}} diff --git a/docs/services/sse/configuration.md b/docs/services/sse/configuration.md new file mode 100644 index 0000000000..902a2b5529 --- /dev/null +++ b/docs/services/sse/configuration.md @@ -0,0 +1,15 @@ +--- +title: Service Configuration +date: 2018-08-08T00:00:00+00:00 +weight: 20 +geekdocRepo: https://github.com/owncloud/ocis +geekdocEditPath: edit/master/docs/services/sse +geekdocFilePath: configuration.md +geekdocCollapseSection: true +--- + +## Example YAML Config + +{{< include file="services/_includes/app-provider-config-example.yaml" language="yaml" >}} + +{{< include file="services/_includes/app-provider_configvars.md" >}} diff --git a/ocis-pkg/config/config.go b/ocis-pkg/config/config.go index 394945fb22..bf16fd6717 100644 --- a/ocis-pkg/config/config.go +++ b/ocis-pkg/config/config.go @@ -27,6 +27,7 @@ import ( search "github.com/owncloud/ocis/v2/services/search/pkg/config" settings "github.com/owncloud/ocis/v2/services/settings/pkg/config" sharing "github.com/owncloud/ocis/v2/services/sharing/pkg/config" + sse "github.com/owncloud/ocis/v2/services/sse/pkg/config" storagepublic "github.com/owncloud/ocis/v2/services/storage-publiclink/pkg/config" storageshares "github.com/owncloud/ocis/v2/services/storage-shares/pkg/config" storagesystem "github.com/owncloud/ocis/v2/services/storage-system/pkg/config" @@ -99,6 +100,7 @@ type Config struct { Proxy *proxy.Config `yaml:"proxy"` Settings *settings.Config `yaml:"settings"` Sharing *sharing.Config `yaml:"sharing"` + SSE *sse.Config `yaml:"sse"` StorageSystem *storagesystem.Config `yaml:"storage_system"` StoragePublicLink *storagepublic.Config `yaml:"storage_public"` StorageShares *storageshares.Config `yaml:"storage_shares"` diff --git a/ocis-pkg/config/defaultconfig.go b/ocis-pkg/config/defaultconfig.go index 3afd0896c8..bfa03aee63 100644 --- a/ocis-pkg/config/defaultconfig.go +++ b/ocis-pkg/config/defaultconfig.go @@ -26,6 +26,7 @@ import ( search "github.com/owncloud/ocis/v2/services/search/pkg/config/defaults" settings "github.com/owncloud/ocis/v2/services/settings/pkg/config/defaults" sharing "github.com/owncloud/ocis/v2/services/sharing/pkg/config/defaults" + sse "github.com/owncloud/ocis/v2/services/sse/pkg/config/defaults" storagepublic "github.com/owncloud/ocis/v2/services/storage-publiclink/pkg/config/defaults" storageshares "github.com/owncloud/ocis/v2/services/storage-shares/pkg/config/defaults" storageSystem "github.com/owncloud/ocis/v2/services/storage-system/pkg/config/defaults" @@ -72,6 +73,7 @@ func DefaultConfig() *Config { Search: search.FullDefaultConfig(), Settings: settings.DefaultConfig(), Sharing: sharing.DefaultConfig(), + SSE: sse.DefaultConfig(), StoragePublicLink: storagepublic.DefaultConfig(), StorageShares: storageshares.DefaultConfig(), StorageSystem: storageSystem.DefaultConfig(), diff --git a/ocis/pkg/command/services.go b/ocis/pkg/command/services.go index 960abd452d..df143b623e 100644 --- a/ocis/pkg/command/services.go +++ b/ocis/pkg/command/services.go @@ -1,13 +1,13 @@ package command import ( + "github.com/urfave/cli/v2" + "github.com/owncloud/ocis/v2/ocis-pkg/config" "github.com/owncloud/ocis/v2/ocis-pkg/config/configlog" "github.com/owncloud/ocis/v2/ocis-pkg/config/parser" "github.com/owncloud/ocis/v2/ocis/pkg/command/helper" "github.com/owncloud/ocis/v2/ocis/pkg/register" - "github.com/urfave/cli/v2" - antivirus "github.com/owncloud/ocis/v2/services/antivirus/pkg/command" appprovider "github.com/owncloud/ocis/v2/services/app-provider/pkg/command" appregistry "github.com/owncloud/ocis/v2/services/app-registry/pkg/command" @@ -33,6 +33,7 @@ import ( search "github.com/owncloud/ocis/v2/services/search/pkg/command" settings "github.com/owncloud/ocis/v2/services/settings/pkg/command" sharing "github.com/owncloud/ocis/v2/services/sharing/pkg/command" + sse "github.com/owncloud/ocis/v2/services/sse/pkg/command" storagepubliclink "github.com/owncloud/ocis/v2/services/storage-publiclink/pkg/command" storageshares "github.com/owncloud/ocis/v2/services/storage-shares/pkg/command" storagesystem "github.com/owncloud/ocis/v2/services/storage-system/pkg/command" @@ -172,6 +173,11 @@ var svccmds = []register.Command{ cfg.Sharing.Commons = cfg.Commons }) }, + func(cfg *config.Config) *cli.Command { + return ServiceCommand(cfg, cfg.SSE.Service.Name, sse.GetCommands(cfg.SSE), func(c *config.Config) { + cfg.SSE.Commons = cfg.Commons + }) + }, func(cfg *config.Config) *cli.Command { return ServiceCommand(cfg, cfg.StoragePublicLink.Service.Name, storagepubliclink.GetCommands(cfg.StoragePublicLink), func(c *config.Config) { cfg.StoragePublicLink.Commons = cfg.Commons diff --git a/ocis/pkg/runtime/service/service.go b/ocis/pkg/runtime/service/service.go index 958d2d1671..027ef7e551 100644 --- a/ocis/pkg/runtime/service/service.go +++ b/ocis/pkg/runtime/service/service.go @@ -15,6 +15,8 @@ import ( "github.com/mohae/deepcopy" "github.com/olekukonko/tablewriter" + "github.com/thejerf/suture/v4" + ociscfg "github.com/owncloud/ocis/v2/ocis-pkg/config" "github.com/owncloud/ocis/v2/ocis-pkg/log" "github.com/owncloud/ocis/v2/ocis-pkg/shared" @@ -42,6 +44,7 @@ import ( search "github.com/owncloud/ocis/v2/services/search/pkg/command" settings "github.com/owncloud/ocis/v2/services/settings/pkg/command" sharing "github.com/owncloud/ocis/v2/services/sharing/pkg/command" + sse "github.com/owncloud/ocis/v2/services/sse/pkg/command" storagepublic "github.com/owncloud/ocis/v2/services/storage-publiclink/pkg/command" storageshares "github.com/owncloud/ocis/v2/services/storage-shares/pkg/command" storageSystem "github.com/owncloud/ocis/v2/services/storage-system/pkg/command" @@ -53,7 +56,6 @@ import ( web "github.com/owncloud/ocis/v2/services/web/pkg/command" webdav "github.com/owncloud/ocis/v2/services/webdav/pkg/command" webfinger "github.com/owncloud/ocis/v2/services/webfinger/pkg/command" - "github.com/thejerf/suture/v4" ) var ( @@ -298,11 +300,16 @@ func NewService(options ...Option) (*Service, error) { cfg.Sharing.Commons = cfg.Commons return sharing.Execute(cfg.Sharing) }) + dreg(opts.Config.SSE.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error { + cfg.SSE.Context = ctx + cfg.SSE.Commons = cfg.Commons + return sse.Execute(cfg.SSE) + }) return s, nil } -// Start an rpc service. By default the package scope Start will run all default services to provide with a working +// Start a rpc service. By default, the package scope Start will run all default services to provide with a working // oCIS instance. func Start(o ...Option) error { // Start the runtime. Most likely this was called ONLY by the `ocis server` subcommand, but since we cannot protect diff --git a/services/sse/Makefile b/services/sse/Makefile new file mode 100644 index 0000000000..5381f8cfaf --- /dev/null +++ b/services/sse/Makefile @@ -0,0 +1,37 @@ +SHELL := bash +NAME := sse + +include ../../.make/recursion.mk + +############ tooling ############ +ifneq (, $(shell command -v go 2> /dev/null)) # suppress `command not found warnings` for non go targets in CI +include ../../.bingo/Variables.mk +endif + +############ go tooling ############ +include ../../.make/go.mk + +############ release ############ +include ../../.make/release.mk + +############ docs generate ############ +include ../../.make/docs.mk + +.PHONY: docs-generate +docs-generate: config-docs-generate + +############ generate ############ +include ../../.make/generate.mk + +.PHONY: ci-go-generate +ci-go-generate: # CI runs ci-node-generate automatically before this target + +.PHONY: ci-node-generate +ci-node-generate: + +############ licenses ############ +.PHONY: ci-node-check-licenses +ci-node-check-licenses: + +.PHONY: ci-node-save-licenses +ci-node-save-licenses: diff --git a/services/sse/README.md b/services/sse/README.md new file mode 100644 index 0000000000..4c0aabfc0d --- /dev/null +++ b/services/sse/README.md @@ -0,0 +1,3 @@ +# SSE + +@todo diff --git a/services/sse/cmd/sse/main.go b/services/sse/cmd/sse/main.go new file mode 100644 index 0000000000..8bd438f114 --- /dev/null +++ b/services/sse/cmd/sse/main.go @@ -0,0 +1,14 @@ +package main + +import ( + "os" + + "github.com/owncloud/ocis/v2/services/sse/pkg/command" + "github.com/owncloud/ocis/v2/services/sse/pkg/config/defaults" +) + +func main() { + if err := command.Execute(defaults.DefaultConfig()); err != nil { + os.Exit(1) + } +} diff --git a/services/sse/pkg/command/health.go b/services/sse/pkg/command/health.go new file mode 100644 index 0000000000..195dceade2 --- /dev/null +++ b/services/sse/pkg/command/health.go @@ -0,0 +1,62 @@ +package command + +import ( + "fmt" + "net/http" + + "github.com/owncloud/ocis/v2/ocis-pkg/log" + + "github.com/urfave/cli/v2" + + "github.com/owncloud/ocis/v2/ocis-pkg/config/configlog" + "github.com/owncloud/ocis/v2/services/sse/pkg/config" + "github.com/owncloud/ocis/v2/services/sse/pkg/config/parser" +) + +// Health is the entrypoint for the health command. +func Health(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "health", + Usage: "check health status", + Category: "info", + Before: func(c *cli.Context) error { + return configlog.ReturnError(parser.ParseConfig(cfg)) + }, + Action: func(c *cli.Context) error { + logger := log.NewLogger( + log.Name(cfg.Service.Name), + log.Level(cfg.Log.Level), + log.Pretty(cfg.Log.Pretty), + log.Color(cfg.Log.Color), + log.File(cfg.Log.File), + ) + + resp, err := http.Get( + fmt.Sprintf( + "http://%s/healthz", + cfg.Debug.Addr, + ), + ) + + if err != nil { + logger.Fatal(). + Err(err). + Msg("Failed to request health check") + } + + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + logger.Fatal(). + Int("code", resp.StatusCode). + Msg("Health seems to be in bad state") + } + + logger.Debug(). + Int("code", resp.StatusCode). + Msg("Health got a good state") + + return nil + }, + } +} diff --git a/services/sse/pkg/command/root.go b/services/sse/pkg/command/root.go new file mode 100644 index 0000000000..f3a6e4b276 --- /dev/null +++ b/services/sse/pkg/command/root.go @@ -0,0 +1,30 @@ +package command + +import ( + "os" + + "github.com/urfave/cli/v2" + + "github.com/owncloud/ocis/v2/ocis-pkg/clihelper" + "github.com/owncloud/ocis/v2/services/sse/pkg/config" +) + +// GetCommands provides all commands for this service +func GetCommands(cfg *config.Config) cli.Commands { + return []*cli.Command{ + Server(cfg), + Health(cfg), + Version(cfg), + } +} + +// Execute is the entry point for the sse command. +func Execute(cfg *config.Config) error { + app := clihelper.DefaultApp(&cli.App{ + Name: "sse", + Usage: "Serve ownCloud sse for oCIS", + Commands: GetCommands(cfg), + }) + + return app.Run(os.Args) +} diff --git a/services/sse/pkg/command/server.go b/services/sse/pkg/command/server.go new file mode 100644 index 0000000000..7d3d8c32ab --- /dev/null +++ b/services/sse/pkg/command/server.go @@ -0,0 +1,81 @@ +package command + +import ( + "context" + "fmt" + + "github.com/oklog/run" + "github.com/urfave/cli/v2" + + "github.com/owncloud/ocis/v2/ocis-pkg/config/configlog" + "github.com/owncloud/ocis/v2/ocis-pkg/handlers" + "github.com/owncloud/ocis/v2/ocis-pkg/log" + "github.com/owncloud/ocis/v2/ocis-pkg/service/debug" + "github.com/owncloud/ocis/v2/ocis-pkg/version" + "github.com/owncloud/ocis/v2/services/sse/pkg/config" + "github.com/owncloud/ocis/v2/services/sse/pkg/config/parser" + "github.com/owncloud/ocis/v2/services/sse/pkg/service" +) + +// Server is the entrypoint for the server command. +func Server(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "server", + Usage: fmt.Sprintf("start the %s service without runtime (unsupervised mode)", cfg.Service.Name), + Category: "server", + Before: func(c *cli.Context) error { + return configlog.ReturnFatal(parser.ParseConfig(cfg)) + }, + Action: func(c *cli.Context) error { + var ( + gr = run.Group{} + ctx, cancel = func() (context.Context, context.CancelFunc) { + if cfg.Context == nil { + return context.WithCancel(context.Background()) + } + return context.WithCancel(cfg.Context) + }() + logger = log.NewLogger( + log.Name(cfg.Service.Name), + log.Level(cfg.Log.Level), + log.Pretty(cfg.Log.Pretty), + log.Color(cfg.Log.Color), + log.File(cfg.Log.File), + ) + ) + defer cancel() + + { + svc, err := service.NewSSE(cfg, logger) + if err != nil { + return err + } + + gr.Add(svc.Run, func(_ error) { + cancel() + }) + } + + { + server := debug.NewService( + debug.Logger(logger), + debug.Name(cfg.Service.Name), + debug.Version(version.GetString()), + debug.Address(cfg.Debug.Addr), + debug.Token(cfg.Debug.Token), + debug.Pprof(cfg.Debug.Pprof), + debug.Zpages(cfg.Debug.Zpages), + debug.Health(handlers.Health), + debug.Ready(handlers.Ready), + ) + + gr.Add(server.ListenAndServe, func(_ error) { + _ = server.Shutdown(ctx) + cancel() + }) + } + + return gr.Run() + }, + } +} diff --git a/services/sse/pkg/command/version.go b/services/sse/pkg/command/version.go new file mode 100644 index 0000000000..fe3b344f0f --- /dev/null +++ b/services/sse/pkg/command/version.go @@ -0,0 +1,27 @@ +package command + +import ( + "fmt" + + "github.com/owncloud/ocis/v2/ocis-pkg/version" + + "github.com/urfave/cli/v2" + + "github.com/owncloud/ocis/v2/services/sse/pkg/config" +) + +// Version prints the service versions of all running instances. +func Version(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "version", + Usage: "print the version of this binary and the running service instances", + Category: "info", + Action: func(c *cli.Context) error { + fmt.Println("Version: " + version.GetString()) + fmt.Printf("Compiled: %s\n", version.Compiled()) + fmt.Println("") + + return nil + }, + } +} diff --git a/services/sse/pkg/config/config.go b/services/sse/pkg/config/config.go new file mode 100644 index 0000000000..a03c63f459 --- /dev/null +++ b/services/sse/pkg/config/config.go @@ -0,0 +1,51 @@ +package config + +import ( + "context" + + "github.com/owncloud/ocis/v2/ocis-pkg/shared" +) + +// Config combines all available configuration parts. +type Config struct { + Commons *shared.Commons `yaml:"-"` // don't use this directly as configuration for a service + Log *Log + + Debug Debug `mask:"struct" yaml:"debug"` + + Service Service `yaml:"-"` + + Events Events + + Context context.Context `yaml:"-" json:"-"` +} + +// Service defines the available service configuration. +type Service struct { + Name string `yaml:"-"` +} + +// Log defines the available log configuration. +type Log struct { + Level string `mapstructure:"level" env:"OCIS_LOG_LEVEL;SSE_LOG_LEVEL" desc:"The log level. Valid values are: 'panic', 'fatal', 'error', 'warn', 'info', 'debug', 'trace'."` + Pretty bool `mapstructure:"pretty" env:"OCIS_LOG_PRETTY;SSE_LOG_PRETTY" desc:"Activates pretty log output."` + Color bool `mapstructure:"color" env:"OCIS_LOG_COLOR;SSE_LOG_COLOR" desc:"Activates colorized log output."` + File string `mapstructure:"file" env:"OCIS_LOG_FILE;SSE_LOG_FILE" desc:"The path to the log file. Activates logging to this file if set."` +} + +// Debug defines the available debug configuration. +type Debug struct { + Addr string `yaml:"addr" env:"SSE_DEBUG_ADDR" desc:"Bind address of the debug server, where metrics, health, config and debug endpoints will be exposed."` + Token string `yaml:"token" env:"SSE_DEBUG_TOKEN" desc:"Token to secure the metrics endpoint."` + Pprof bool `yaml:"pprof" env:"SSE_DEBUG_PPROF" desc:"Enables pprof, which can be used for profiling."` + Zpages bool `yaml:"zpages" env:"SSE_DEBUG_ZPAGES" desc:"Enables zpages, which can be used for collecting and viewing in-memory traces."` +} + +// Events combines the configuration options for the event bus. +type Events struct { + Endpoint string `yaml:"endpoint" env:"OCIS_EVENTS_ENDPOINT;SSE_EVENTS_ENDPOINT" desc:"The address of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture."` + Cluster string `yaml:"cluster" env:"OCIS_EVENTS_CLUSTER;SSE_EVENTS_CLUSTER" desc:"The clusterID of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture. Mandatory when using NATS as event system."` + TLSInsecure bool `yaml:"tls_insecure" env:"OCIS_INSECURE;SSE_EVENTS_TLS_INSECURE" desc:"Whether to verify the server TLS certificates."` + TLSRootCACertificate string `yaml:"tls_root_ca_certificate" env:"OCIS_EVENTS_TLS_ROOT_CA_CERTIFICATE;SSE_EVENTS_TLS_ROOT_CA_CERTIFICATE" desc:"The root CA certificate used to validate the server's TLS certificate. If provided SSE_EVENTS_TLS_INSECURE will be seen as false."` + EnableTLS bool `yaml:"enable_tls" env:"OCIS_EVENTS_ENABLE_TLS;SSE_EVENTS_ENABLE_TLS" desc:"Enable TLS for the connection to the events broker. The events broker is the ocis service which receives and delivers events between the services."` +} diff --git a/services/sse/pkg/config/defaults/defaultconfig.go b/services/sse/pkg/config/defaults/defaultconfig.go new file mode 100644 index 0000000000..c1e77c262b --- /dev/null +++ b/services/sse/pkg/config/defaults/defaultconfig.go @@ -0,0 +1,42 @@ +package defaults + +import ( + "github.com/owncloud/ocis/v2/services/sse/pkg/config" +) + +// FullDefaultConfig returns a fully initialized default configuration which is needed for doc generation. +func FullDefaultConfig() *config.Config { + cfg := DefaultConfig() + EnsureDefaults(cfg) + Sanitize(cfg) + return cfg +} + +// DefaultConfig returns the services default config +func DefaultConfig() *config.Config { + return &config.Config{ + Debug: config.Debug{ + Addr: "127.0.0.1:9135", + Token: "", + }, + Service: config.Service{ + Name: "sse", + }, + Events: config.Events{ + Endpoint: "127.0.0.1:9233", + Cluster: "ocis-cluster", + }, + } +} + +// EnsureDefaults adds default values to the configuration if they are not set yet +func EnsureDefaults(cfg *config.Config) { + if cfg.Log == nil { + cfg.Log = &config.Log{} + } +} + +// Sanitize sanitizes the configuration +func Sanitize(cfg *config.Config) { + +} diff --git a/services/sse/pkg/config/parser/parse.go b/services/sse/pkg/config/parser/parse.go new file mode 100644 index 0000000000..dc9375210d --- /dev/null +++ b/services/sse/pkg/config/parser/parse.go @@ -0,0 +1,38 @@ +package parser + +import ( + "errors" + + ociscfg "github.com/owncloud/ocis/v2/ocis-pkg/config" + "github.com/owncloud/ocis/v2/services/sse/pkg/config" + "github.com/owncloud/ocis/v2/services/sse/pkg/config/defaults" + + "github.com/owncloud/ocis/v2/ocis-pkg/config/envdecode" +) + +// ParseConfig loads configuration from known paths. +func ParseConfig(cfg *config.Config) error { + _, err := ociscfg.BindSourcesToStructs(cfg.Service.Name, cfg) + if err != nil { + return err + } + + defaults.EnsureDefaults(cfg) + + // load all env variables relevant to the config in the current context. + if err := envdecode.Decode(cfg); err != nil { + // no environment variable set for this config is an expected "error" + if !errors.Is(err, envdecode.ErrNoTargetFieldsAreSet) { + return err + } + } + + defaults.Sanitize(cfg) + + return Validate(cfg) +} + +// Validate validates our little config +func Validate(cfg *config.Config) error { + return nil +} diff --git a/services/sse/pkg/service/service.go b/services/sse/pkg/service/service.go new file mode 100644 index 0000000000..72be9d020a --- /dev/null +++ b/services/sse/pkg/service/service.go @@ -0,0 +1,71 @@ +package service + +import ( + "bytes" + "crypto/x509" + "fmt" + "io" + "net/http" + "os" + + "github.com/cs3org/reva/v2/pkg/events" + "github.com/cs3org/reva/v2/pkg/events/stream" + "github.com/cs3org/reva/v2/pkg/rhttp" + + "github.com/owncloud/ocis/v2/ocis-pkg/log" + "github.com/owncloud/ocis/v2/services/sse/pkg/config" +) + +// NewSSE returns a service implementation for Service. +func NewSSE(c *config.Config, l log.Logger) (SSE, error) { + s := SSE{c: c, l: l, client: rhttp.GetHTTPClient(rhttp.Insecure(true))} + + return s, nil +} + +// SSE defines implements the business logic for Service. +type SSE struct { + c *config.Config + l log.Logger + m uint64 + + client *http.Client +} + +// Run runs the service +func (s SSE) Run() error { + evtsCfg := s.c.Events + + var rootCAPool *x509.CertPool + if evtsCfg.TLSRootCACertificate != "" { + rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate) + if err != nil { + return err + } + + var certBytes bytes.Buffer + if _, err := io.Copy(&certBytes, rootCrtFile); err != nil { + return err + } + + rootCAPool = x509.NewCertPool() + rootCAPool.AppendCertsFromPEM(certBytes.Bytes()) + evtsCfg.TLSInsecure = false + } + + natsStream, err := stream.NatsFromConfig(stream.NatsConfig(s.c.Events)) + if err != nil { + return err + } + + ch, err := events.Consume(natsStream, "sse", events.StartPostprocessingStep{}) + if err != nil { + return err + } + + for e := range ch { + fmt.Println(e) // todo + } + + return nil +}