Merge pull request #6979 from kobergj/NatsNamedConnections

[full-ci] Name nats connections
This commit is contained in:
kobergj
2023-08-16 11:16:37 +02:00
committed by GitHub
32 changed files with 114 additions and 428 deletions

View File

@@ -0,0 +1,5 @@
Enhancement: Bump reva to latest edge
bumps reva to latest edge
https://github.com/owncloud/ocis/pull/6979

View File

@@ -0,0 +1,5 @@
Enhancement: Nats named connections
Names the nats connections for easier debugging
https://github.com/owncloud/ocis/pull/6979

6
go.mod
View File

@@ -13,7 +13,7 @@ require (
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/coreos/go-oidc/v3 v3.6.0
github.com/cs3org/go-cs3apis v0.0.0-20230516150832-730ac860c71d
github.com/cs3org/reva/v2 v2.15.1-0.20230810092810-8d195c7859c7
github.com/cs3org/reva/v2 v2.15.1-0.20230816081257-e3a2be91bc4f
github.com/disintegration/imaging v1.6.2
github.com/dutchcoders/go-clamd v0.0.0-20170520113014-b970184f4d9e
github.com/egirna/icap-client v0.1.1
@@ -25,7 +25,6 @@ require (
github.com/go-ldap/ldap/v3 v3.4.5
github.com/go-ldap/ldif v0.0.0-20200320164324-fd88d9b715b3
github.com/go-micro/plugins/v4/client/grpc v1.2.0
github.com/go-micro/plugins/v4/events/natsjs v1.2.1
github.com/go-micro/plugins/v4/logger/zerolog v1.2.0
github.com/go-micro/plugins/v4/registry/consul v1.2.1
github.com/go-micro/plugins/v4/registry/etcd v1.2.0
@@ -187,6 +186,7 @@ require (
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-micro/plugins/v4/events/natsjs v1.2.2-0.20230807070816-bc05fb076ce7 // indirect
github.com/go-micro/plugins/v4/store/nats-js v1.1.0 // indirect
github.com/go-micro/plugins/v4/store/redis v1.2.1-0.20230510195111-07cd57e1bc9d // indirect
github.com/go-redis/redis/v8 v8.11.5 // indirect
@@ -337,5 +337,3 @@ require (
)
replace github.com/cs3org/go-cs3apis => github.com/2403905/go-cs3apis v0.0.0-20230517122726-727045414fd1
// replace github.com/cs3org/reva/v2 => github.com/micbar/reva/v2 v2.0.0-20230626125956-c381fe19a108

8
go.sum
View File

@@ -864,8 +864,8 @@ github.com/crewjam/httperr v0.2.0 h1:b2BfXR8U3AlIHwNeFFvZ+BV1LFvKLlzMjzaTnZMybNo
github.com/crewjam/httperr v0.2.0/go.mod h1:Jlz+Sg/XqBQhyMjdDiC+GNNRzZTD7x39Gu3pglZ5oH4=
github.com/crewjam/saml v0.4.13 h1:TYHggH/hwP7eArqiXSJUvtOPNzQDyQ7vwmwEqlFWhMc=
github.com/crewjam/saml v0.4.13/go.mod h1:igEejV+fihTIlHXYP8zOec3V5A8y3lws5bQBFsTm4gA=
github.com/cs3org/reva/v2 v2.15.1-0.20230810092810-8d195c7859c7 h1:Rc0YEihSvHXIO3UnRxErsi2uHqhk8/7AMdGGg4NERZ4=
github.com/cs3org/reva/v2 v2.15.1-0.20230810092810-8d195c7859c7/go.mod h1:F5wAUTPMvq+ze77PU/xl7qhc21YsEIfcl2RuI4H7yJo=
github.com/cs3org/reva/v2 v2.15.1-0.20230816081257-e3a2be91bc4f h1:s0sBJbIB8atyhujVx/OaadujuRHer8ODPpWxyGWfw/s=
github.com/cs3org/reva/v2 v2.15.1-0.20230816081257-e3a2be91bc4f/go.mod h1:6GyXffmxluCqQxXaYuVC2Dg10gj0QW199iVlxV0EAJg=
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
@@ -1030,8 +1030,8 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-micro/plugins/v4/client/grpc v1.2.0 h1:Z8BB6jqslXM2aMMhjZ+QfNuzR+msCMtGd83DGlsQQG0=
github.com/go-micro/plugins/v4/client/grpc v1.2.0/go.mod h1:3fDuzyfYLwEImn8+lkhKl3W4Ay1jFevkTeC32PBlgQs=
github.com/go-micro/plugins/v4/events/natsjs v1.2.1 h1:wCq5pyUkHSJ31QAtTLiIRowNMe2OFQysg7fjw3SPJZ4=
github.com/go-micro/plugins/v4/events/natsjs v1.2.1/go.mod h1:lYuiEYKQTpbE2LA8HEcC8D6kQ29M7ILfEak3dzeucEg=
github.com/go-micro/plugins/v4/events/natsjs v1.2.2-0.20230807070816-bc05fb076ce7 h1:/RpJVLKmKT2OcEnKCPaS6n+zygNzYDzwoYgPQEgcEiQ=
github.com/go-micro/plugins/v4/events/natsjs v1.2.2-0.20230807070816-bc05fb076ce7/go.mod h1:lYuiEYKQTpbE2LA8HEcC8D6kQ29M7ILfEak3dzeucEg=
github.com/go-micro/plugins/v4/logger/zerolog v1.2.0 h1:JZ516VQ9zekRoi868XG7x0EWxZ2AMq/euHIBChITsTI=
github.com/go-micro/plugins/v4/logger/zerolog v1.2.0/go.mod h1:AieYOIeOxobYa5B8WGEqxXM3Ndi26tDIu9fZ4RYkCvQ=
github.com/go-micro/plugins/v4/registry/consul v1.2.1 h1:3wctYMtstwQLCjoJ1HA6mKGGFF1hcdKDv5MzHakB1jE=

View File

@@ -87,7 +87,7 @@ func (av Antivirus) Run() error {
evtsCfg.TLSInsecure = false
}
stream, err := stream.NatsFromConfig(stream.NatsConfig(av.c.Events))
stream, err := stream.NatsFromConfig(av.c.Service.Name, stream.NatsConfig(av.c.Events))
if err != nil {
return err
}

View File

@@ -2,17 +2,12 @@ package command
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"os"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/events/stream"
"github.com/go-micro/plugins/v4/events/natsjs"
"github.com/oklog/run"
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
ociscrypto "github.com/owncloud/ocis/v2/ocis-pkg/crypto"
"github.com/owncloud/ocis/v2/ocis-pkg/handlers"
"github.com/owncloud/ocis/v2/ocis-pkg/service/debug"
"github.com/owncloud/ocis/v2/ocis-pkg/version"
@@ -47,39 +42,11 @@ func Server(cfg *config.Config) *cli.Command {
)
defer cancel()
evtsCfg := cfg.Events
var tlsConf *tls.Config
if evtsCfg.EnableTLS {
var rootCAPool *x509.CertPool
if evtsCfg.TLSRootCACertificate != "" {
rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate)
if err != nil {
return err
}
rootCAPool, err = ociscrypto.NewCertPoolFromPEM(rootCrtFile)
if err != nil {
return err
}
evtsCfg.TLSInsecure = false
}
tlsConf = &tls.Config{
MinVersion: tls.VersionTLS12,
InsecureSkipVerify: evtsCfg.TLSInsecure, //nolint:gosec
RootCAs: rootCAPool,
}
}
client, err := stream.Nats(
natsjs.TLSConfig(tlsConf),
natsjs.Address(evtsCfg.Endpoint),
natsjs.ClusterID(evtsCfg.Cluster),
)
client, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events))
if err != nil {
return err
}
evts, err := events.Consume(client, evtsCfg.ConsumerGroup, types.RegisteredEvents()...)
evts, err := events.Consume(client, "audit", types.RegisteredEvents()...)
if err != nil {
return err
}

View File

@@ -26,7 +26,6 @@ type Config struct {
type Events struct {
Endpoint string `yaml:"endpoint" env:"OCIS_EVENTS_ENDPOINT;AUDIT_EVENTS_ENDPOINT" desc:"The address of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture."`
Cluster string `yaml:"cluster" env:"OCIS_EVENTS_CLUSTER;AUDIT_EVENTS_CLUSTER" desc:"The clusterID of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture. Mandatory when using NATS as event system."`
ConsumerGroup string `yaml:"group" env:"AUDIT_EVENTS_GROUP" desc:"The consumergroup of the service. One group will only get one copy of an event."`
TLSInsecure bool `yaml:"tls_insecure" env:"OCIS_INSECURE;AUDIT_EVENTS_TLS_INSECURE" desc:"Whether to verify the server TLS certificates."`
TLSRootCACertificate string `yaml:"tls_root_ca_certificate" env:"OCIS_EVENTS_TLS_ROOT_CA_CERTIFICATE;AUDIT_EVENTS_TLS_ROOT_CA_CERTIFICATE" desc:"The root CA certificate used to validate the server's TLS certificate. If provided AUDIT_EVENTS_TLS_INSECURE will be seen as false."`
EnableTLS bool `yaml:"enable_tls" env:"OCIS_EVENTS_ENABLE_TLS;AUDIT_EVENTS_ENABLE_TLS" desc:"Enable TLS for the connection to the events broker. The events broker is the ocis service which receives and delivers events between the services.."`

View File

@@ -24,10 +24,9 @@ func DefaultConfig() *config.Config {
Name: "audit",
},
Events: config.Events{
Endpoint: "127.0.0.1:9233",
Cluster: "ocis-cluster",
ConsumerGroup: "audit",
EnableTLS: false,
Endpoint: "127.0.0.1:9233",
Cluster: "ocis-cluster",
EnableTLS: false,
},
Auditlog: config.Auditlog{
LogToConsole: true,

View File

@@ -60,7 +60,7 @@ func Server(cfg *config.Config) *cli.Command {
metrics.BuildInfo.WithLabelValues(version.GetString()).Set(1)
consumer, err := stream.NatsFromConfig(stream.NatsConfig(cfg.Events))
consumer, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events))
if err != nil {
return err
}

View File

@@ -1,20 +1,15 @@
package http
import (
"crypto/tls"
"crypto/x509"
"fmt"
stdhttp "net/http"
"os"
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
"github.com/cs3org/reva/v2/pkg/events/stream"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
chimiddleware "github.com/go-chi/chi/v5/middleware"
"github.com/go-micro/plugins/v4/events/natsjs"
"github.com/owncloud/ocis/v2/ocis-pkg/account"
"github.com/owncloud/ocis/v2/ocis-pkg/cors"
ociscrypto "github.com/owncloud/ocis/v2/ocis-pkg/crypto"
"github.com/owncloud/ocis/v2/ocis-pkg/keycloak"
"github.com/owncloud/ocis/v2/ocis-pkg/middleware"
"github.com/owncloud/ocis/v2/ocis-pkg/registry"
@@ -57,34 +52,7 @@ func Server(opts ...Option) (http.Service, error) {
if options.Config.Events.Endpoint != "" {
var err error
var tlsConf *tls.Config
if options.Config.Events.EnableTLS {
var rootCAPool *x509.CertPool
if options.Config.Events.TLSRootCACertificate != "" {
rootCrtFile, err := os.Open(options.Config.Events.TLSRootCACertificate)
if err != nil {
return http.Service{}, err
}
rootCAPool, err = ociscrypto.NewCertPoolFromPEM(rootCrtFile)
if err != nil {
return http.Service{}, err
}
options.Config.Events.TLSInsecure = false
}
tlsConf = &tls.Config{
MinVersion: tls.VersionTLS12,
InsecureSkipVerify: options.Config.Events.TLSInsecure, //nolint:gosec
RootCAs: rootCAPool,
}
}
publisher, err = stream.Nats(
natsjs.TLSConfig(tlsConf),
natsjs.Address(options.Config.Events.Endpoint),
natsjs.ClusterID(options.Config.Events.Cluster),
)
publisher, err = stream.NatsFromConfig(options.Config.Service.Name, stream.NatsConfig(options.Config.Events))
if err != nil {
options.Logger.Error().
Err(err).

View File

@@ -2,18 +2,13 @@ package command
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"os"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/events/stream"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/go-micro/plugins/v4/events/natsjs"
"github.com/oklog/run"
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
"github.com/owncloud/ocis/v2/ocis-pkg/crypto"
"github.com/owncloud/ocis/v2/ocis-pkg/handlers"
"github.com/owncloud/ocis/v2/ocis-pkg/registry"
"github.com/owncloud/ocis/v2/ocis-pkg/service/debug"
@@ -94,40 +89,11 @@ func Server(cfg *config.Config) *cli.Command {
events.SpaceUnshared{},
events.SpaceMembershipExpired{},
}
evtsCfg := cfg.Notifications.Events
var tlsConf *tls.Config
if evtsCfg.EnableTLS {
var rootCAPool *x509.CertPool
if evtsCfg.TLSRootCACertificate != "" {
rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate)
if err != nil {
return err
}
rootCAPool, err = crypto.NewCertPoolFromPEM(rootCrtFile)
if err != nil {
return err
}
evtsCfg.TLSInsecure = false
}
tlsConf = &tls.Config{
MinVersion: tls.VersionTLS12,
InsecureSkipVerify: evtsCfg.TLSInsecure, //nolint:gosec
RootCAs: rootCAPool,
}
}
client, err := stream.Nats(
natsjs.TLSConfig(tlsConf),
natsjs.Address(evtsCfg.Endpoint),
natsjs.ClusterID(evtsCfg.Cluster),
)
client, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Notifications.Events))
if err != nil {
return err
}
evts, err := events.Consume(client, evtsCfg.ConsumerGroup, evs...)
evts, err := events.Consume(client, "notifications", evs...)
if err != nil {
return err
}

View File

@@ -51,7 +51,6 @@ type SMTP struct {
type Events struct {
Endpoint string `yaml:"endpoint" env:"OCIS_EVENTS_ENDPOINT;NOTIFICATIONS_EVENTS_ENDPOINT" desc:"The address of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture."`
Cluster string `yaml:"cluster" env:"OCIS_EVENTS_CLUSTER;NOTIFICATIONS_EVENTS_CLUSTER" desc:"The clusterID of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture. Mandatory when using NATS as event system."`
ConsumerGroup string `yaml:"group" env:"NOTIFICATIONS_EVENTS_GROUP" desc:"Name of the event group / queue on the event system."`
TLSInsecure bool `yaml:"tls_insecure" env:"OCIS_INSECURE;NOTIFICATIONS_EVENTS_TLS_INSECURE" desc:"Whether to verify the server TLS certificates."`
TLSRootCACertificate string `yaml:"tls_root_ca_certificate" env:"OCIS_EVENTS_TLS_ROOT_CA_CERTIFICATE;NOTIFICATIONS_EVENTS_TLS_ROOT_CA_CERTIFICATE" desc:"The root CA certificate used to validate the server's TLS certificate. If provided NOTIFICATIONS_EVENTS_TLS_INSECURE will be seen as false."`
EnableTLS bool `yaml:"enable_tls" env:"OCIS_EVENTS_ENABLE_TLS;NOTIFICATIONS_EVENTS_ENABLE_TLS" desc:"Enable TLS for the connection to the events broker. The events broker is the ocis service which receives and delivers events between the services.."`

View File

@@ -38,10 +38,9 @@ func DefaultConfig() *config.Config {
Encryption: "none",
},
Events: config.Events{
Endpoint: "127.0.0.1:9233",
Cluster: "ocis-cluster",
ConsumerGroup: "notifications",
EnableTLS: false,
Endpoint: "127.0.0.1:9233",
Cluster: "ocis-cluster",
EnableTLS: false,
},
RevaGateway: shared.DefaultRevaConfig().Address,
},

View File

@@ -2,18 +2,13 @@ package command
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"net/http"
"os"
"github.com/cs3org/reva/v2/pkg/events/stream"
"github.com/go-micro/plugins/v4/events/natsjs"
"github.com/oklog/run"
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
ociscrypto "github.com/owncloud/ocis/v2/ocis-pkg/crypto"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/ocis-pkg/service/debug"
"github.com/owncloud/ocis/v2/ocis-pkg/service/grpc"
@@ -102,33 +97,8 @@ func Server(cfg *config.Config) *cli.Command {
}
{
var tlsConf *tls.Config
if cfg.Events.EnableTLS {
var rootCAPool *x509.CertPool
if cfg.Events.TLSRootCACertificate != "" {
rootCrtFile, err := os.Open(cfg.Events.TLSRootCACertificate)
if err != nil {
return err
}
rootCAPool, err = ociscrypto.NewCertPoolFromPEM(rootCrtFile)
if err != nil {
return err
}
cfg.Events.TLSInsecure = false
}
tlsConf = &tls.Config{
RootCAs: rootCAPool,
}
}
bus, err := stream.Nats(
natsjs.TLSConfig(tlsConf),
natsjs.Address(cfg.Events.Endpoint),
natsjs.ClusterID(cfg.Events.Cluster),
)
bus, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events))
if err != nil {
return err
}

View File

@@ -5,6 +5,7 @@ import (
"time"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/events/stream"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config"
@@ -29,7 +30,7 @@ func RestartPostprocessing(cfg *config.Config) *cli.Command {
return configlog.ReturnFatal(parser.ParseConfig(cfg))
},
Action: func(c *cli.Context) error {
stream, err := getEventBus(cfg.Postprocessing.Events)
stream, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Postprocessing.Events))
if err != nil {
return err
}

View File

@@ -2,17 +2,12 @@ package command
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"os"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/events/stream"
"github.com/cs3org/reva/v2/pkg/store"
"github.com/go-micro/plugins/v4/events/natsjs"
"github.com/oklog/run"
ociscrypto "github.com/owncloud/ocis/v2/ocis-pkg/crypto"
"github.com/owncloud/ocis/v2/ocis-pkg/handlers"
"github.com/owncloud/ocis/v2/ocis-pkg/service/debug"
"github.com/owncloud/ocis/v2/ocis-pkg/version"
@@ -53,7 +48,7 @@ func Server(cfg *config.Config) *cli.Command {
defer cancel()
{
bus, err := getEventBus(cfg.Postprocessing.Events)
bus, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Postprocessing.Events))
if err != nil {
return err
}
@@ -110,32 +105,3 @@ func Server(cfg *config.Config) *cli.Command {
},
}
}
func getEventBus(evtsCfg config.Events) (events.Stream, error) {
var tlsConf *tls.Config
if evtsCfg.EnableTLS {
var rootCAPool *x509.CertPool
if evtsCfg.TLSRootCACertificate != "" {
rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate)
if err != nil {
return nil, err
}
rootCAPool, err = ociscrypto.NewCertPoolFromPEM(rootCrtFile)
if err != nil {
return nil, err
}
evtsCfg.TLSInsecure = false
}
tlsConf = &tls.Config{
RootCAs: rootCAPool,
}
}
return stream.Nats(
natsjs.TLSConfig(tlsConf),
natsjs.Address(evtsCfg.Endpoint),
natsjs.ClusterID(evtsCfg.Cluster),
)
}

View File

@@ -2,11 +2,8 @@ package service
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"os"
"time"
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
@@ -17,9 +14,7 @@ import (
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/v2/pkg/token"
"github.com/cs3org/reva/v2/pkg/token/manager/jwt"
"github.com/go-micro/plugins/v4/events/natsjs"
"github.com/jellydator/ttlcache/v2"
ociscrypto "github.com/owncloud/ocis/v2/ocis-pkg/crypto"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/ocis-pkg/registry"
v0 "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/search/v0"
@@ -78,33 +73,13 @@ func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, func(), error)
return nil, teardown, fmt.Errorf("unknown search extractor: %s", cfg.Extractor.Type)
}
var tlsConf *tls.Config
if cfg.Events.EnableTLS {
var rootCAPool *x509.CertPool
if cfg.Events.TLSRootCACertificate != "" {
rootCrtFile, err := os.Open(cfg.Events.TLSRootCACertificate)
if err != nil {
return nil, teardown, err
}
rootCAPool, err = ociscrypto.NewCertPoolFromPEM(rootCrtFile)
if err != nil {
return nil, teardown, err
}
cfg.Events.TLSInsecure = false
}
tlsConf = &tls.Config{
MinVersion: tls.VersionTLS12,
InsecureSkipVerify: cfg.Events.TLSInsecure, //nolint:gosec
RootCAs: rootCAPool,
}
}
bus, err := stream.Nats(
natsjs.TLSConfig(tlsConf),
natsjs.Address(cfg.Events.Endpoint),
natsjs.ClusterID(cfg.Events.Cluster),
)
bus, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig{
Endpoint: cfg.Events.Endpoint,
Cluster: cfg.Events.Cluster,
EnableTLS: cfg.Events.EnableTLS,
TLSInsecure: cfg.Events.TLSInsecure,
TLSRootCACertificate: cfg.Events.TLSRootCACertificate,
})
if err != nil {
return nil, teardown, err
}

View File

@@ -123,6 +123,7 @@ func SharingConfigFromStruct(cfg *config.Config) map[string]interface{} {
"tls-insecure": cfg.Events.TLSInsecure,
"tls-root-ca-cert": cfg.Events.TLSRootCaCertPath,
"enable-tls": cfg.Events.EnableTLS,
"name": "sharing-eventsmiddleware",
},
"prometheus": map[string]interface{}{
"namespace": "ocis",

View File

@@ -89,7 +89,7 @@ func Server(cfg *config.Config) *cli.Command {
}
{
stream, err := event.NewStream(cfg.Events)
stream, err := event.NewStream(cfg)
if err != nil {
logger.Fatal().Err(err).Msg("can't connect to nats")
}

View File

@@ -32,7 +32,7 @@ func PurgeExpiredResources(cfg *config.Config) *cli.Command {
return configlog.ReturnFatal(parser.ParseConfig(cfg))
},
Action: func(c *cli.Context) error {
stream, err := event.NewStream(cfg.Events)
stream, err := event.NewStream(cfg)
if err != nil {
return err
}

View File

@@ -1,51 +1,18 @@
package event
import (
"crypto/tls"
"crypto/x509"
"os"
"github.com/cs3org/reva/v2/pkg/events/stream"
"github.com/go-micro/plugins/v4/events/natsjs"
ociscrypto "github.com/owncloud/ocis/v2/ocis-pkg/crypto"
"github.com/owncloud/ocis/v2/services/storage-users/pkg/config"
"go-micro.dev/v4/events"
)
// NewStream prepares the requested nats stream and returns it.
func NewStream(cfg config.Events) (events.Stream, error) {
var tlsConf *tls.Config
if cfg.EnableTLS {
var rootCAPool *x509.CertPool
if cfg.TLSRootCaCertPath != "" {
rootCrtFile, err := os.Open(cfg.TLSRootCaCertPath)
if err != nil {
return nil, err
}
rootCAPool, err = ociscrypto.NewCertPoolFromPEM(rootCrtFile)
if err != nil {
return nil, err
}
cfg.TLSInsecure = false
}
tlsConf = &tls.Config{
MinVersion: tls.VersionTLS12,
RootCAs: rootCAPool,
}
}
s, err := stream.Nats(
natsjs.TLSConfig(tlsConf),
natsjs.Address(cfg.Addr),
natsjs.ClusterID(cfg.ClusterID),
)
if err != nil {
return nil, err
}
return s, nil
func NewStream(cfg *config.Config) (events.Stream, error) {
return stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig{
Endpoint: cfg.Events.Addr,
Cluster: cfg.Events.ClusterID,
EnableTLS: cfg.Events.EnableTLS,
TLSInsecure: cfg.Events.TLSInsecure,
TLSRootCACertificate: cfg.Events.TLSRootCaCertPath,
})
}

View File

@@ -47,6 +47,7 @@ func StorageUsersConfigFromStruct(cfg *config.Config) map[string]interface{} {
"tls-insecure": cfg.Events.TLSInsecure,
"tls-root-ca-cert": cfg.Events.TLSRootCaCertPath,
"enable-tls": cfg.Events.EnableTLS,
"name": "storage-users-eventsmiddleware",
},
"prometheus": map[string]interface{}{
"namespace": "ocis",

View File

@@ -81,7 +81,7 @@ func Server(cfg *config.Config) *cli.Command {
defer cancel()
consumer, err := stream.NatsFromConfig(stream.NatsConfig(cfg.Events))
consumer, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events))
if err != nil {
return err
}

View File

@@ -19,13 +19,8 @@
package eventsmiddleware
import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"os"
"go-micro.dev/v4/util/log"
"google.golang.org/grpc"
@@ -42,7 +37,6 @@ import (
"github.com/cs3org/reva/v2/pkg/rgrpc"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/go-micro/plugins/v4/events/natsjs"
)
const (
@@ -229,38 +223,17 @@ func publisherFromConfig(m map[string]interface{}) (events.Publisher, error) {
default:
return nil, fmt.Errorf("stream type '%s' not supported", typ)
case "nats":
address := m["address"].(string)
cid := m["clusterID"].(string)
enableTLS := m["enable-tls"].(bool)
var tlsConf *tls.Config
if enableTLS {
skipVerify := m["tls-insecure"].(bool)
var rootCAPool *x509.CertPool
if val, ok := m["tls-root-ca-cert"]; ok {
rootCACertPath := val.(string)
if rootCACertPath != "" {
f, err := os.Open(rootCACertPath)
if err != nil {
return nil, err
}
var certBytes bytes.Buffer
if _, err := io.Copy(&certBytes, f); err != nil {
return nil, err
}
rootCAPool = x509.NewCertPool()
rootCAPool.AppendCertsFromPEM(certBytes.Bytes())
skipVerify = false
}
}
tlsConf = &tls.Config{
InsecureSkipVerify: skipVerify,
RootCAs: rootCAPool,
}
var tlsCert string
val, ok := m["tls-root-ca-cert"]
if ok {
tlsCert = val.(string)
}
return stream.Nats(natsjs.TLSConfig(tlsConf), natsjs.Address(address), natsjs.ClusterID(cid))
return stream.NatsFromConfig(m["name"].(string), stream.NatsConfig{
Endpoint: m["address"].(string),
Cluster: m["clusterID"].(string),
EnableTLS: m["enable-tls"].(bool),
TLSInsecure: m["tls-insecure"].(bool),
TLSRootCACertificate: tlsCert,
})
}
}

View File

@@ -19,13 +19,9 @@
package storageprovider
import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io"
"net/url"
"os"
"path"
@@ -49,7 +45,6 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/fs/registry"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/go-micro/plugins/v4/events/natsjs"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/attribute"
@@ -76,11 +71,11 @@ type config struct {
}
type eventconfig struct {
NatsAddress string `mapstructure:"nats_address" docs:"address of the nats server"`
NatsClusterID string `mapstructure:"nats_clusterid" docs:"clusterid of the nats server"`
EnableTLS bool `mapstructure:"nats_enable_tls" docs:"events tls switch"`
Endpoint string `mapstructure:"nats_address" docs:"address of the nats server"`
Cluster string `mapstructure:"nats_clusterid" docs:"clusterid of the nats server"`
TLSInsecure bool `mapstructure:"tls_insecure" docs:"Whether to verify the server TLS certificates."`
TLSRootCACertificate string `mapstructure:"tls_root_ca_cert" docs:"The root CA certificate used to validate the server's TLS certificate."`
EnableTLS bool `mapstructure:"nats_enable_tls" docs:"events tls switch"`
}
func (c *config) init() {
@@ -1245,38 +1240,9 @@ func (v descendingMtime) Swap(i, j int) {
}
func estreamFromConfig(c eventconfig) (events.Stream, error) {
if c.NatsAddress == "" {
if c.Endpoint == "" {
return nil, nil
}
var (
rootCAPool *x509.CertPool
tlsConf *tls.Config
)
if c.TLSRootCACertificate != "" {
rootCrtFile, err := os.Open(c.TLSRootCACertificate)
if err != nil {
return nil, err
}
var certBytes bytes.Buffer
if _, err := io.Copy(&certBytes, rootCrtFile); err != nil {
return nil, err
}
rootCAPool = x509.NewCertPool()
rootCAPool.AppendCertsFromPEM(certBytes.Bytes())
c.TLSInsecure = false
tlsConf = &tls.Config{
InsecureSkipVerify: c.TLSInsecure, //nolint:gosec
RootCAs: rootCAPool,
}
}
s, err := stream.Nats(natsjs.Address(c.NatsAddress), natsjs.ClusterID(c.NatsClusterID), natsjs.TLSConfig(tlsConf))
if err != nil {
return nil, err
}
return s, nil
return stream.NatsFromConfig("storageprovider", stream.NatsConfig(c))
}

View File

@@ -19,13 +19,8 @@
package dataprovider
import (
"bytes"
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"net/http"
"os"
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/events"
@@ -35,7 +30,6 @@ import (
"github.com/cs3org/reva/v2/pkg/rhttp/router"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/fs/registry"
"github.com/go-micro/plugins/v4/events/natsjs"
"github.com/mitchellh/mapstructure"
"github.com/rs/zerolog"
)
@@ -86,30 +80,13 @@ func New(m map[string]interface{}, log *zerolog.Logger) (global.Service, error)
if conf.NatsAddress == "" || conf.NatsClusterID == "" {
log.Warn().Msg("missing or incomplete nats configuration. Events will not be published.")
} else {
var tlsConf *tls.Config
if conf.NatsEnableTLS {
var rootCAPool *x509.CertPool
if conf.NatsRootCACertPath != "" {
f, err := os.Open(conf.NatsRootCACertPath)
if err != nil {
return nil, err
}
var certBytes bytes.Buffer
if _, err := io.Copy(&certBytes, f); err != nil {
return nil, err
}
rootCAPool = x509.NewCertPool()
rootCAPool.AppendCertsFromPEM(certBytes.Bytes())
conf.NatsTLSInsecure = false
}
tlsConf = &tls.Config{
InsecureSkipVerify: conf.NatsTLSInsecure,
RootCAs: rootCAPool,
}
}
s, err := stream.Nats(natsjs.TLSConfig(tlsConf), natsjs.Address(conf.NatsAddress), natsjs.ClusterID(conf.NatsClusterID))
s, err := stream.NatsFromConfig("dataprovider", stream.NatsConfig{
Endpoint: conf.NatsAddress,
Cluster: conf.NatsClusterID,
EnableTLS: conf.NatsEnableTLS,
TLSInsecure: conf.NatsTLSInsecure,
TLSRootCACertificate: conf.NatsRootCACertPath,
})
if err != nil {
return nil, err
}

View File

@@ -25,7 +25,7 @@ type NatsConfig struct {
}
// NatsFromConfig returns a nats stream from the given config
func NatsFromConfig(cfg NatsConfig) (events.Stream, error) {
func NatsFromConfig(connName string, cfg NatsConfig) (events.Stream, error) {
var tlsConf *tls.Config
if cfg.EnableTLS {
var rootCAPool *x509.CertPool
@@ -53,11 +53,12 @@ func NatsFromConfig(cfg NatsConfig) (events.Stream, error) {
natsjs.Address(cfg.Endpoint),
natsjs.ClusterID(cfg.Cluster),
natsjs.SynchronousPublish(true),
natsjs.Name(connName),
)
}
// Nats returns a nats streaming client
// nats returns a nats streaming client
// retries exponentially to connect to a nats server
func Nats(opts ...natsjs.Option) (events.Stream, error) {
b := backoff.NewExponentialBackOff()

View File

@@ -19,12 +19,7 @@
package jsoncs3
import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"io"
"os"
"strings"
"sync"
"time"
@@ -34,7 +29,6 @@ import (
rpcv1beta1 "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/go-micro/plugins/v4/events/natsjs"
"github.com/google/uuid"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
@@ -131,10 +125,11 @@ type config struct {
// EventOptions are the configurable options for events
type EventOptions struct {
NatsAddress string `mapstructure:"natsaddress"`
NatsClusterID string `mapstructure:"natsclusterid"`
Endpoint string `mapstructure:"natsaddress"`
Cluster string `mapstructure:"natsclusterid"`
TLSInsecure bool `mapstructure:"tlsinsecure"`
TLSRootCACertificate string `mapstructure:"tlsrootcacertificate"`
EnableTLS bool `mapstructure:"enabletls"`
}
// Manager implements a share manager using a cs3 storage backend with local caching
@@ -176,38 +171,8 @@ func NewDefault(m map[string]interface{}) (share.Manager, error) {
}
var es events.Stream
if c.Events.NatsAddress != "" {
evtsCfg := c.Events
var (
rootCAPool *x509.CertPool
tlsConf *tls.Config
)
if evtsCfg.TLSRootCACertificate != "" {
rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate)
if err != nil {
return nil, err
}
var certBytes bytes.Buffer
if _, err := io.Copy(&certBytes, rootCrtFile); err != nil {
return nil, err
}
rootCAPool = x509.NewCertPool()
rootCAPool.AppendCertsFromPEM(certBytes.Bytes())
evtsCfg.TLSInsecure = false
tlsConf = &tls.Config{
InsecureSkipVerify: evtsCfg.TLSInsecure, //nolint:gosec
RootCAs: rootCAPool,
}
}
es, err = stream.Nats(
natsjs.TLSConfig(tlsConf),
natsjs.Address(evtsCfg.NatsAddress),
natsjs.ClusterID(evtsCfg.NatsClusterID),
)
if c.Events.Endpoint != "" {
es, err = stream.NatsFromConfig("jsoncs3-share-manager", stream.NatsConfig(c.Events))
if err != nil {
return nil, err
}

View File

@@ -350,17 +350,12 @@ func initNewNode(upload *Upload, n *node.Node, fsize uint64) (*lockedfile.File,
// link child name to parent if it is new
childNameLink := filepath.Join(n.ParentPath(), n.Name)
link, err := os.Readlink(childNameLink)
if err == nil && link != "../"+n.ID {
if err := os.Remove(childNameLink); err != nil {
return f, errors.Wrap(err, "Decomposedfs: could not remove symlink child entry")
}
}
if errors.Is(err, iofs.ErrNotExist) || link != "../"+n.ID {
relativeNodePath := filepath.Join("../../../../../", lookup.Pathify(n.ID, 4, 2))
if err = os.Symlink(relativeNodePath, childNameLink); err != nil {
return f, errors.Wrap(err, "Decomposedfs: could not symlink child entry")
relativeNodePath := filepath.Join("../../../../../", lookup.Pathify(n.ID, 4, 2))
if err = os.Symlink(relativeNodePath, childNameLink); err != nil {
if errors.Is(err, iofs.ErrExist) {
return nil, errtypes.AlreadyExists(n.Name)
}
return f, errors.Wrap(err, "Decomposedfs: could not symlink child entry")
}
// on a new file the sizeDiff is the fileSize

View File

@@ -52,10 +52,17 @@ func connectToNatsJetStream(options Options) (nats.JetStreamContext, error) {
nopts.Secure = true
nopts.TLSConfig = options.TLSConfig
}
if options.NkeyConfig != "" {
nopts.Nkey = options.NkeyConfig
}
if len(options.Address) > 0 {
nopts.Servers = strings.Split(options.Address, ",")
}
if options.Name != "" {
nopts.Name = options.Name
}
conn, err := nopts.Connect()
if err != nil {
return nil, fmt.Errorf("error connecting to nats at %v with tls enabled (%v): %v", options.Address, nopts.TLSConfig != nil, err)

View File

@@ -11,9 +11,11 @@ type Options struct {
ClusterID string
ClientID string
Address string
NkeyConfig string
TLSConfig *tls.Config
Logger logger.Logger
SyncPublish bool
Name string
}
// Option is a function which configures options.
@@ -47,6 +49,13 @@ func TLSConfig(t *tls.Config) Option {
}
}
// Nkey string to use when connecting to the cluster.
func NkeyConfig(nkey string) Option {
return func(o *Options) {
o.NkeyConfig = nkey
}
}
// Logger sets the underlyin logger
func Logger(log logger.Logger) Option {
return func(o *Options) {
@@ -60,3 +69,10 @@ func SynchronousPublish(sync bool) Option {
o.SyncPublish = sync
}
}
// Name allows to add a name to the natsjs connection
func Name(name string) Option {
return func(o *Options) {
o.Name = name
}
}

4
vendor/modules.txt vendored
View File

@@ -352,7 +352,7 @@ github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1
github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1
github.com/cs3org/go-cs3apis/cs3/tx/v1beta1
github.com/cs3org/go-cs3apis/cs3/types/v1beta1
# github.com/cs3org/reva/v2 v2.15.1-0.20230810092810-8d195c7859c7
# github.com/cs3org/reva/v2 v2.15.1-0.20230816081257-e3a2be91bc4f
## explicit; go 1.20
github.com/cs3org/reva/v2/cmd/revad/internal/grace
github.com/cs3org/reva/v2/cmd/revad/runtime
@@ -882,7 +882,7 @@ github.com/go-logr/stdr
# github.com/go-micro/plugins/v4/client/grpc v1.2.0
## explicit; go 1.17
github.com/go-micro/plugins/v4/client/grpc
# github.com/go-micro/plugins/v4/events/natsjs v1.2.1
# github.com/go-micro/plugins/v4/events/natsjs v1.2.2-0.20230807070816-bc05fb076ce7
## explicit; go 1.17
github.com/go-micro/plugins/v4/events/natsjs
# github.com/go-micro/plugins/v4/logger/zerolog v1.2.0