diff --git a/changelog/unreleased/named-nats-connections.md b/changelog/unreleased/named-nats-connections.md new file mode 100644 index 0000000000..2401b91ee2 --- /dev/null +++ b/changelog/unreleased/named-nats-connections.md @@ -0,0 +1,5 @@ +Enhancement: Nats named connections + +Names the nats connections for easier debugging + +https://github.com/owncloud/ocis/pull/6979 diff --git a/services/antivirus/pkg/service/service.go b/services/antivirus/pkg/service/service.go index a23d8a54a3..ac03dbcf99 100644 --- a/services/antivirus/pkg/service/service.go +++ b/services/antivirus/pkg/service/service.go @@ -87,7 +87,7 @@ func (av Antivirus) Run() error { evtsCfg.TLSInsecure = false } - stream, err := stream.NatsFromConfig(stream.NatsConfig(av.c.Events)) + stream, err := stream.NatsFromConfig(av.c.Service.Name, stream.NatsConfig(av.c.Events)) if err != nil { return err } diff --git a/services/audit/pkg/command/server.go b/services/audit/pkg/command/server.go index f869ce29b1..fe8f270cfc 100644 --- a/services/audit/pkg/command/server.go +++ b/services/audit/pkg/command/server.go @@ -2,17 +2,12 @@ package command import ( "context" - "crypto/tls" - "crypto/x509" "fmt" - "os" "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/events/stream" - "github.com/go-micro/plugins/v4/events/natsjs" "github.com/oklog/run" "github.com/owncloud/ocis/v2/ocis-pkg/config/configlog" - ociscrypto "github.com/owncloud/ocis/v2/ocis-pkg/crypto" "github.com/owncloud/ocis/v2/ocis-pkg/handlers" "github.com/owncloud/ocis/v2/ocis-pkg/service/debug" "github.com/owncloud/ocis/v2/ocis-pkg/version" @@ -47,39 +42,11 @@ func Server(cfg *config.Config) *cli.Command { ) defer cancel() - evtsCfg := cfg.Events - - var tlsConf *tls.Config - if evtsCfg.EnableTLS { - var rootCAPool *x509.CertPool - if evtsCfg.TLSRootCACertificate != "" { - rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate) - if err != nil { - return err - } - - rootCAPool, err = ociscrypto.NewCertPoolFromPEM(rootCrtFile) - if err != nil { - return err - } - evtsCfg.TLSInsecure = false - } - - tlsConf = &tls.Config{ - MinVersion: tls.VersionTLS12, - InsecureSkipVerify: evtsCfg.TLSInsecure, //nolint:gosec - RootCAs: rootCAPool, - } - } - client, err := stream.Nats( - natsjs.TLSConfig(tlsConf), - natsjs.Address(evtsCfg.Endpoint), - natsjs.ClusterID(evtsCfg.Cluster), - ) + client, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events)) if err != nil { return err } - evts, err := events.Consume(client, evtsCfg.ConsumerGroup, types.RegisteredEvents()...) + evts, err := events.Consume(client, "audit", types.RegisteredEvents()...) if err != nil { return err } diff --git a/services/audit/pkg/config/config.go b/services/audit/pkg/config/config.go index 5bbdf1f9dd..54384e54ac 100644 --- a/services/audit/pkg/config/config.go +++ b/services/audit/pkg/config/config.go @@ -26,7 +26,6 @@ type Config struct { type Events struct { Endpoint string `yaml:"endpoint" env:"OCIS_EVENTS_ENDPOINT;AUDIT_EVENTS_ENDPOINT" desc:"The address of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture."` Cluster string `yaml:"cluster" env:"OCIS_EVENTS_CLUSTER;AUDIT_EVENTS_CLUSTER" desc:"The clusterID of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture. Mandatory when using NATS as event system."` - ConsumerGroup string `yaml:"group" env:"AUDIT_EVENTS_GROUP" desc:"The consumergroup of the service. One group will only get one copy of an event."` TLSInsecure bool `yaml:"tls_insecure" env:"OCIS_INSECURE;AUDIT_EVENTS_TLS_INSECURE" desc:"Whether to verify the server TLS certificates."` TLSRootCACertificate string `yaml:"tls_root_ca_certificate" env:"OCIS_EVENTS_TLS_ROOT_CA_CERTIFICATE;AUDIT_EVENTS_TLS_ROOT_CA_CERTIFICATE" desc:"The root CA certificate used to validate the server's TLS certificate. If provided AUDIT_EVENTS_TLS_INSECURE will be seen as false."` EnableTLS bool `yaml:"enable_tls" env:"OCIS_EVENTS_ENABLE_TLS;AUDIT_EVENTS_ENABLE_TLS" desc:"Enable TLS for the connection to the events broker. The events broker is the ocis service which receives and delivers events between the services.."` diff --git a/services/audit/pkg/config/defaults/defaultconfig.go b/services/audit/pkg/config/defaults/defaultconfig.go index 96ef55af55..a85abaec72 100644 --- a/services/audit/pkg/config/defaults/defaultconfig.go +++ b/services/audit/pkg/config/defaults/defaultconfig.go @@ -24,10 +24,9 @@ func DefaultConfig() *config.Config { Name: "audit", }, Events: config.Events{ - Endpoint: "127.0.0.1:9233", - Cluster: "ocis-cluster", - ConsumerGroup: "audit", - EnableTLS: false, + Endpoint: "127.0.0.1:9233", + Cluster: "ocis-cluster", + EnableTLS: false, }, Auditlog: config.Auditlog{ LogToConsole: true, diff --git a/services/eventhistory/pkg/command/server.go b/services/eventhistory/pkg/command/server.go index cee95fd4ae..4ba389f1a1 100644 --- a/services/eventhistory/pkg/command/server.go +++ b/services/eventhistory/pkg/command/server.go @@ -60,7 +60,7 @@ func Server(cfg *config.Config) *cli.Command { metrics.BuildInfo.WithLabelValues(version.GetString()).Set(1) - consumer, err := stream.NatsFromConfig(stream.NatsConfig(cfg.Events)) + consumer, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events)) if err != nil { return err } diff --git a/services/graph/pkg/server/http/server.go b/services/graph/pkg/server/http/server.go index 00e29f091f..ddf860356b 100644 --- a/services/graph/pkg/server/http/server.go +++ b/services/graph/pkg/server/http/server.go @@ -1,20 +1,15 @@ package http import ( - "crypto/tls" - "crypto/x509" "fmt" stdhttp "net/http" - "os" gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" "github.com/cs3org/reva/v2/pkg/events/stream" "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" chimiddleware "github.com/go-chi/chi/v5/middleware" - "github.com/go-micro/plugins/v4/events/natsjs" "github.com/owncloud/ocis/v2/ocis-pkg/account" "github.com/owncloud/ocis/v2/ocis-pkg/cors" - ociscrypto "github.com/owncloud/ocis/v2/ocis-pkg/crypto" "github.com/owncloud/ocis/v2/ocis-pkg/keycloak" "github.com/owncloud/ocis/v2/ocis-pkg/middleware" "github.com/owncloud/ocis/v2/ocis-pkg/registry" @@ -57,34 +52,7 @@ func Server(opts ...Option) (http.Service, error) { if options.Config.Events.Endpoint != "" { var err error - - var tlsConf *tls.Config - if options.Config.Events.EnableTLS { - var rootCAPool *x509.CertPool - if options.Config.Events.TLSRootCACertificate != "" { - rootCrtFile, err := os.Open(options.Config.Events.TLSRootCACertificate) - if err != nil { - return http.Service{}, err - } - - rootCAPool, err = ociscrypto.NewCertPoolFromPEM(rootCrtFile) - if err != nil { - return http.Service{}, err - } - options.Config.Events.TLSInsecure = false - } - - tlsConf = &tls.Config{ - MinVersion: tls.VersionTLS12, - InsecureSkipVerify: options.Config.Events.TLSInsecure, //nolint:gosec - RootCAs: rootCAPool, - } - } - publisher, err = stream.Nats( - natsjs.TLSConfig(tlsConf), - natsjs.Address(options.Config.Events.Endpoint), - natsjs.ClusterID(options.Config.Events.Cluster), - ) + publisher, err = stream.NatsFromConfig(options.Config.Service.Name, stream.NatsConfig(options.Config.Events)) if err != nil { options.Logger.Error(). Err(err). diff --git a/services/notifications/pkg/command/server.go b/services/notifications/pkg/command/server.go index 7d0e51512e..49466fdf3d 100644 --- a/services/notifications/pkg/command/server.go +++ b/services/notifications/pkg/command/server.go @@ -2,18 +2,13 @@ package command import ( "context" - "crypto/tls" - "crypto/x509" "fmt" - "os" "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/events/stream" "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" - "github.com/go-micro/plugins/v4/events/natsjs" "github.com/oklog/run" "github.com/owncloud/ocis/v2/ocis-pkg/config/configlog" - "github.com/owncloud/ocis/v2/ocis-pkg/crypto" "github.com/owncloud/ocis/v2/ocis-pkg/handlers" "github.com/owncloud/ocis/v2/ocis-pkg/registry" "github.com/owncloud/ocis/v2/ocis-pkg/service/debug" @@ -94,40 +89,11 @@ func Server(cfg *config.Config) *cli.Command { events.SpaceUnshared{}, events.SpaceMembershipExpired{}, } - - evtsCfg := cfg.Notifications.Events - - var tlsConf *tls.Config - if evtsCfg.EnableTLS { - var rootCAPool *x509.CertPool - if evtsCfg.TLSRootCACertificate != "" { - rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate) - if err != nil { - return err - } - - rootCAPool, err = crypto.NewCertPoolFromPEM(rootCrtFile) - if err != nil { - return err - } - evtsCfg.TLSInsecure = false - } - - tlsConf = &tls.Config{ - MinVersion: tls.VersionTLS12, - InsecureSkipVerify: evtsCfg.TLSInsecure, //nolint:gosec - RootCAs: rootCAPool, - } - } - client, err := stream.Nats( - natsjs.TLSConfig(tlsConf), - natsjs.Address(evtsCfg.Endpoint), - natsjs.ClusterID(evtsCfg.Cluster), - ) + client, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Notifications.Events)) if err != nil { return err } - evts, err := events.Consume(client, evtsCfg.ConsumerGroup, evs...) + evts, err := events.Consume(client, "notifications", evs...) if err != nil { return err } diff --git a/services/notifications/pkg/config/config.go b/services/notifications/pkg/config/config.go index 4f34e2de28..0bdc392729 100644 --- a/services/notifications/pkg/config/config.go +++ b/services/notifications/pkg/config/config.go @@ -51,7 +51,6 @@ type SMTP struct { type Events struct { Endpoint string `yaml:"endpoint" env:"OCIS_EVENTS_ENDPOINT;NOTIFICATIONS_EVENTS_ENDPOINT" desc:"The address of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture."` Cluster string `yaml:"cluster" env:"OCIS_EVENTS_CLUSTER;NOTIFICATIONS_EVENTS_CLUSTER" desc:"The clusterID of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture. Mandatory when using NATS as event system."` - ConsumerGroup string `yaml:"group" env:"NOTIFICATIONS_EVENTS_GROUP" desc:"Name of the event group / queue on the event system."` TLSInsecure bool `yaml:"tls_insecure" env:"OCIS_INSECURE;NOTIFICATIONS_EVENTS_TLS_INSECURE" desc:"Whether to verify the server TLS certificates."` TLSRootCACertificate string `yaml:"tls_root_ca_certificate" env:"OCIS_EVENTS_TLS_ROOT_CA_CERTIFICATE;NOTIFICATIONS_EVENTS_TLS_ROOT_CA_CERTIFICATE" desc:"The root CA certificate used to validate the server's TLS certificate. If provided NOTIFICATIONS_EVENTS_TLS_INSECURE will be seen as false."` EnableTLS bool `yaml:"enable_tls" env:"OCIS_EVENTS_ENABLE_TLS;NOTIFICATIONS_EVENTS_ENABLE_TLS" desc:"Enable TLS for the connection to the events broker. The events broker is the ocis service which receives and delivers events between the services.."` diff --git a/services/notifications/pkg/config/defaults/defaultconfig.go b/services/notifications/pkg/config/defaults/defaultconfig.go index 5f3e1961c1..f643baaa00 100644 --- a/services/notifications/pkg/config/defaults/defaultconfig.go +++ b/services/notifications/pkg/config/defaults/defaultconfig.go @@ -38,10 +38,9 @@ func DefaultConfig() *config.Config { Encryption: "none", }, Events: config.Events{ - Endpoint: "127.0.0.1:9233", - Cluster: "ocis-cluster", - ConsumerGroup: "notifications", - EnableTLS: false, + Endpoint: "127.0.0.1:9233", + Cluster: "ocis-cluster", + EnableTLS: false, }, RevaGateway: shared.DefaultRevaConfig().Address, }, diff --git a/services/policies/pkg/command/server.go b/services/policies/pkg/command/server.go index f9951118b2..8644d11f70 100644 --- a/services/policies/pkg/command/server.go +++ b/services/policies/pkg/command/server.go @@ -2,18 +2,13 @@ package command import ( "context" - "crypto/tls" - "crypto/x509" "fmt" "io" "net/http" - "os" "github.com/cs3org/reva/v2/pkg/events/stream" - "github.com/go-micro/plugins/v4/events/natsjs" "github.com/oklog/run" "github.com/owncloud/ocis/v2/ocis-pkg/config/configlog" - ociscrypto "github.com/owncloud/ocis/v2/ocis-pkg/crypto" "github.com/owncloud/ocis/v2/ocis-pkg/log" "github.com/owncloud/ocis/v2/ocis-pkg/service/debug" "github.com/owncloud/ocis/v2/ocis-pkg/service/grpc" @@ -102,33 +97,8 @@ func Server(cfg *config.Config) *cli.Command { } { - var tlsConf *tls.Config - if cfg.Events.EnableTLS { - var rootCAPool *x509.CertPool - if cfg.Events.TLSRootCACertificate != "" { - rootCrtFile, err := os.Open(cfg.Events.TLSRootCACertificate) - if err != nil { - return err - } - - rootCAPool, err = ociscrypto.NewCertPoolFromPEM(rootCrtFile) - if err != nil { - return err - } - cfg.Events.TLSInsecure = false - } - - tlsConf = &tls.Config{ - RootCAs: rootCAPool, - } - } - - bus, err := stream.Nats( - natsjs.TLSConfig(tlsConf), - natsjs.Address(cfg.Events.Endpoint), - natsjs.ClusterID(cfg.Events.Cluster), - ) + bus, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events)) if err != nil { return err } diff --git a/services/postprocessing/pkg/command/postprocessing.go b/services/postprocessing/pkg/command/postprocessing.go index 48848f13fd..c0108af734 100644 --- a/services/postprocessing/pkg/command/postprocessing.go +++ b/services/postprocessing/pkg/command/postprocessing.go @@ -5,6 +5,7 @@ import ( "time" "github.com/cs3org/reva/v2/pkg/events" + "github.com/cs3org/reva/v2/pkg/events/stream" "github.com/cs3org/reva/v2/pkg/utils" "github.com/owncloud/ocis/v2/ocis-pkg/config/configlog" "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config" @@ -29,7 +30,7 @@ func RestartPostprocessing(cfg *config.Config) *cli.Command { return configlog.ReturnFatal(parser.ParseConfig(cfg)) }, Action: func(c *cli.Context) error { - stream, err := getEventBus(cfg.Postprocessing.Events) + stream, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Postprocessing.Events)) if err != nil { return err } diff --git a/services/postprocessing/pkg/command/server.go b/services/postprocessing/pkg/command/server.go index fb81fc7147..bf6f15f5b8 100644 --- a/services/postprocessing/pkg/command/server.go +++ b/services/postprocessing/pkg/command/server.go @@ -2,17 +2,12 @@ package command import ( "context" - "crypto/tls" - "crypto/x509" "fmt" "os" - "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/events/stream" "github.com/cs3org/reva/v2/pkg/store" - "github.com/go-micro/plugins/v4/events/natsjs" "github.com/oklog/run" - ociscrypto "github.com/owncloud/ocis/v2/ocis-pkg/crypto" "github.com/owncloud/ocis/v2/ocis-pkg/handlers" "github.com/owncloud/ocis/v2/ocis-pkg/service/debug" "github.com/owncloud/ocis/v2/ocis-pkg/version" @@ -53,7 +48,7 @@ func Server(cfg *config.Config) *cli.Command { defer cancel() { - bus, err := getEventBus(cfg.Postprocessing.Events) + bus, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Postprocessing.Events)) if err != nil { return err } @@ -110,32 +105,3 @@ func Server(cfg *config.Config) *cli.Command { }, } } - -func getEventBus(evtsCfg config.Events) (events.Stream, error) { - var tlsConf *tls.Config - if evtsCfg.EnableTLS { - var rootCAPool *x509.CertPool - if evtsCfg.TLSRootCACertificate != "" { - rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate) - if err != nil { - return nil, err - } - - rootCAPool, err = ociscrypto.NewCertPoolFromPEM(rootCrtFile) - if err != nil { - return nil, err - } - evtsCfg.TLSInsecure = false - } - - tlsConf = &tls.Config{ - RootCAs: rootCAPool, - } - } - - return stream.Nats( - natsjs.TLSConfig(tlsConf), - natsjs.Address(evtsCfg.Endpoint), - natsjs.ClusterID(evtsCfg.Cluster), - ) -} diff --git a/services/search/pkg/service/grpc/v0/service.go b/services/search/pkg/service/grpc/v0/service.go index 5d8d7ea118..a3e4cda1da 100644 --- a/services/search/pkg/service/grpc/v0/service.go +++ b/services/search/pkg/service/grpc/v0/service.go @@ -2,11 +2,8 @@ package service import ( "context" - "crypto/tls" - "crypto/x509" "errors" "fmt" - "os" "time" user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" @@ -17,9 +14,7 @@ import ( "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" "github.com/cs3org/reva/v2/pkg/token" "github.com/cs3org/reva/v2/pkg/token/manager/jwt" - "github.com/go-micro/plugins/v4/events/natsjs" "github.com/jellydator/ttlcache/v2" - ociscrypto "github.com/owncloud/ocis/v2/ocis-pkg/crypto" "github.com/owncloud/ocis/v2/ocis-pkg/log" "github.com/owncloud/ocis/v2/ocis-pkg/registry" v0 "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/search/v0" @@ -78,33 +73,13 @@ func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, func(), error) return nil, teardown, fmt.Errorf("unknown search extractor: %s", cfg.Extractor.Type) } - var tlsConf *tls.Config - if cfg.Events.EnableTLS { - var rootCAPool *x509.CertPool - if cfg.Events.TLSRootCACertificate != "" { - rootCrtFile, err := os.Open(cfg.Events.TLSRootCACertificate) - if err != nil { - return nil, teardown, err - } - - rootCAPool, err = ociscrypto.NewCertPoolFromPEM(rootCrtFile) - if err != nil { - return nil, teardown, err - } - cfg.Events.TLSInsecure = false - } - - tlsConf = &tls.Config{ - MinVersion: tls.VersionTLS12, - InsecureSkipVerify: cfg.Events.TLSInsecure, //nolint:gosec - RootCAs: rootCAPool, - } - } - bus, err := stream.Nats( - natsjs.TLSConfig(tlsConf), - natsjs.Address(cfg.Events.Endpoint), - natsjs.ClusterID(cfg.Events.Cluster), - ) + bus, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig{ + Endpoint: cfg.Events.Endpoint, + Cluster: cfg.Events.Cluster, + EnableTLS: cfg.Events.EnableTLS, + TLSInsecure: cfg.Events.TLSInsecure, + TLSRootCACertificate: cfg.Events.TLSRootCACertificate, + }) if err != nil { return nil, teardown, err } diff --git a/services/sharing/pkg/revaconfig/config.go b/services/sharing/pkg/revaconfig/config.go index ff698a0320..c01e6b4997 100644 --- a/services/sharing/pkg/revaconfig/config.go +++ b/services/sharing/pkg/revaconfig/config.go @@ -123,6 +123,7 @@ func SharingConfigFromStruct(cfg *config.Config) map[string]interface{} { "tls-insecure": cfg.Events.TLSInsecure, "tls-root-ca-cert": cfg.Events.TLSRootCaCertPath, "enable-tls": cfg.Events.EnableTLS, + "name": "sharing-eventsmiddleware", }, "prometheus": map[string]interface{}{ "namespace": "ocis", diff --git a/services/storage-users/pkg/command/server.go b/services/storage-users/pkg/command/server.go index abd06ac172..4f544ce121 100644 --- a/services/storage-users/pkg/command/server.go +++ b/services/storage-users/pkg/command/server.go @@ -89,7 +89,7 @@ func Server(cfg *config.Config) *cli.Command { } { - stream, err := event.NewStream(cfg.Events) + stream, err := event.NewStream(cfg) if err != nil { logger.Fatal().Err(err).Msg("can't connect to nats") } diff --git a/services/storage-users/pkg/command/trash_bin.go b/services/storage-users/pkg/command/trash_bin.go index 0c552a6996..8489ff90bc 100644 --- a/services/storage-users/pkg/command/trash_bin.go +++ b/services/storage-users/pkg/command/trash_bin.go @@ -32,7 +32,7 @@ func PurgeExpiredResources(cfg *config.Config) *cli.Command { return configlog.ReturnFatal(parser.ParseConfig(cfg)) }, Action: func(c *cli.Context) error { - stream, err := event.NewStream(cfg.Events) + stream, err := event.NewStream(cfg) if err != nil { return err } diff --git a/services/storage-users/pkg/event/event.go b/services/storage-users/pkg/event/event.go index bd5273f7dc..957dc71f7b 100644 --- a/services/storage-users/pkg/event/event.go +++ b/services/storage-users/pkg/event/event.go @@ -1,51 +1,18 @@ package event import ( - "crypto/tls" - "crypto/x509" - "os" - "github.com/cs3org/reva/v2/pkg/events/stream" - "github.com/go-micro/plugins/v4/events/natsjs" - ociscrypto "github.com/owncloud/ocis/v2/ocis-pkg/crypto" "github.com/owncloud/ocis/v2/services/storage-users/pkg/config" "go-micro.dev/v4/events" ) // NewStream prepares the requested nats stream and returns it. -func NewStream(cfg config.Events) (events.Stream, error) { - var tlsConf *tls.Config - - if cfg.EnableTLS { - var rootCAPool *x509.CertPool - if cfg.TLSRootCaCertPath != "" { - rootCrtFile, err := os.Open(cfg.TLSRootCaCertPath) - if err != nil { - return nil, err - } - - rootCAPool, err = ociscrypto.NewCertPoolFromPEM(rootCrtFile) - if err != nil { - return nil, err - } - cfg.TLSInsecure = false - } - - tlsConf = &tls.Config{ - MinVersion: tls.VersionTLS12, - RootCAs: rootCAPool, - } - } - - s, err := stream.Nats( - natsjs.TLSConfig(tlsConf), - natsjs.Address(cfg.Addr), - natsjs.ClusterID(cfg.ClusterID), - ) - - if err != nil { - return nil, err - } - - return s, nil +func NewStream(cfg *config.Config) (events.Stream, error) { + return stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig{ + Endpoint: cfg.Events.Addr, + Cluster: cfg.Events.ClusterID, + EnableTLS: cfg.Events.EnableTLS, + TLSInsecure: cfg.Events.TLSInsecure, + TLSRootCACertificate: cfg.Events.TLSRootCaCertPath, + }) } diff --git a/services/storage-users/pkg/revaconfig/config.go b/services/storage-users/pkg/revaconfig/config.go index dd4980bb4f..ea70cd8500 100644 --- a/services/storage-users/pkg/revaconfig/config.go +++ b/services/storage-users/pkg/revaconfig/config.go @@ -47,6 +47,7 @@ func StorageUsersConfigFromStruct(cfg *config.Config) map[string]interface{} { "tls-insecure": cfg.Events.TLSInsecure, "tls-root-ca-cert": cfg.Events.TLSRootCaCertPath, "enable-tls": cfg.Events.EnableTLS, + "name": "storage-users-eventsmiddleware", }, "prometheus": map[string]interface{}{ "namespace": "ocis", diff --git a/services/userlog/pkg/command/server.go b/services/userlog/pkg/command/server.go index 4c046d5bb2..6002bf4f3e 100644 --- a/services/userlog/pkg/command/server.go +++ b/services/userlog/pkg/command/server.go @@ -81,7 +81,7 @@ func Server(cfg *config.Config) *cli.Command { defer cancel() - consumer, err := stream.NatsFromConfig(stream.NatsConfig(cfg.Events)) + consumer, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events)) if err != nil { return err }