mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2025-12-30 17:00:57 -06:00
Nats tls (#4781)
* use tls for nats connections * add config options for nats client tls config * add nats tls config to CI * add function to create a certpool * add option to provide a rootCA to validate the server's TLS certificate * add option to provide a rootCA to validate the server's TLS certificate * add option to provide a rootCA to validate the server's TLS certificate * add option to provide a rootCA to validate the server's TLS certificate * configure nats clients in reva to use tls
This commit is contained in:
@@ -1889,6 +1889,11 @@ def ocisServer(storage, accounts_hash_difficulty = 4, volumes = [], depends_on =
|
||||
"IDM_CREATE_DEMO_USERS": True,
|
||||
"IDM_ADMIN_PASSWORD": "admin", # override the random admin password from `ocis init`
|
||||
"FRONTEND_SEARCH_MIN_LENGTH": "2",
|
||||
"AUDIT_EVENTS_TLS_INSECURE": True,
|
||||
"GRAPH_EVENTS_TLS_INSECURE": True,
|
||||
"NOTIFICATIONS_EVENTS_TLS_INSECURE": True,
|
||||
"SEARCH_EVENTS_TLS_INSECURE": True,
|
||||
"NATS_TLS_SKIP_VERIFY_CLIENT_CERT": True,
|
||||
}
|
||||
wait_for_ocis = {
|
||||
"name": "wait-for-ocis-server",
|
||||
|
||||
5
changelog/unreleased/nats-tls.md
Normal file
5
changelog/unreleased/nats-tls.md
Normal file
@@ -0,0 +1,5 @@
|
||||
Enhancement: Secure the nats connectin with TLS
|
||||
|
||||
Encyrpted the connection to the event broker using TLS.
|
||||
|
||||
https://github.com/owncloud/ocis/pull/4781
|
||||
4
go.mod
4
go.mod
@@ -9,8 +9,8 @@ require (
|
||||
github.com/armon/go-radix v1.0.0
|
||||
github.com/blevesearch/bleve/v2 v2.3.4
|
||||
github.com/coreos/go-oidc/v3 v3.4.0
|
||||
github.com/cs3org/go-cs3apis v0.0.0-20220929083235-bb0b1a236d6c
|
||||
github.com/cs3org/reva/v2 v2.10.1-0.20221006145249-70bf0e95ec00
|
||||
github.com/cs3org/go-cs3apis v0.0.0-20221005085457-19ea8088a512
|
||||
github.com/cs3org/reva/v2 v2.10.1-0.20221012104058-ae7c58b9bffa
|
||||
github.com/disintegration/imaging v1.6.2
|
||||
github.com/ggwhite/go-masker v1.0.9
|
||||
github.com/go-chi/chi/v5 v5.0.7
|
||||
|
||||
8
go.sum
8
go.sum
@@ -293,10 +293,10 @@ github.com/crewjam/httperr v0.2.0 h1:b2BfXR8U3AlIHwNeFFvZ+BV1LFvKLlzMjzaTnZMybNo
|
||||
github.com/crewjam/httperr v0.2.0/go.mod h1:Jlz+Sg/XqBQhyMjdDiC+GNNRzZTD7x39Gu3pglZ5oH4=
|
||||
github.com/crewjam/saml v0.4.6 h1:XCUFPkQSJLvzyl4cW9OvpWUbRf0gE7VUpU8ZnilbeM4=
|
||||
github.com/crewjam/saml v0.4.6/go.mod h1:ZBOXnNPFzB3CgOkRm7Nd6IVdkG+l/wF+0ZXLqD96t1A=
|
||||
github.com/cs3org/go-cs3apis v0.0.0-20220929083235-bb0b1a236d6c h1:b+YTmOGlf43mnF8MzO0fsy8/Ho8JLu44Iq5Y0fKLJMM=
|
||||
github.com/cs3org/go-cs3apis v0.0.0-20220929083235-bb0b1a236d6c/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY=
|
||||
github.com/cs3org/reva/v2 v2.10.1-0.20221006145249-70bf0e95ec00 h1:cmXDi1Uv/96k8lv7/mWShKIMd4cvOqmhtbrXtVxz+uM=
|
||||
github.com/cs3org/reva/v2 v2.10.1-0.20221006145249-70bf0e95ec00/go.mod h1:UQVMlPZtcoPXLZSJ4ZRTruzn6PlRVmpEantwf1wGknU=
|
||||
github.com/cs3org/go-cs3apis v0.0.0-20221005085457-19ea8088a512 h1:xTvaIsLu1ezoWOJKnV0ehgiowkOiEhMaylaI1lD/Axw=
|
||||
github.com/cs3org/go-cs3apis v0.0.0-20221005085457-19ea8088a512/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY=
|
||||
github.com/cs3org/reva/v2 v2.10.1-0.20221012104058-ae7c58b9bffa h1:DSeaakiPW5zYrGGEDO0BkSZWhqq6LS+rd1DQ1DPztJo=
|
||||
github.com/cs3org/reva/v2 v2.10.1-0.20221012104058-ae7c58b9bffa/go.mod h1:QUHLTf/ACFG2ueNP3u1dslv1bIWTTQAqvWFCorVke6o=
|
||||
github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8 h1:Z9lwXumT5ACSmJ7WGnFl+OMLLjpz5uR2fyz7dC255FI=
|
||||
github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8/go.mod h1:4abs/jPXcmJzYoYGF91JF9Uq9s/KL5n1jvFDix8KcqY=
|
||||
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
|
||||
|
||||
28
ocis-pkg/crypto/crypto.go
Normal file
28
ocis-pkg/crypto/crypto.go
Normal file
@@ -0,0 +1,28 @@
|
||||
// Package crypto implements utility functions for handling crypto related files.
|
||||
package crypto
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/x509"
|
||||
"errors"
|
||||
"io"
|
||||
)
|
||||
|
||||
// NewCertPoolFromPEM reads certificates from io.Reader and returns a x509.CertPool
|
||||
// containing those certificates.
|
||||
func NewCertPoolFromPEM(crts ...io.Reader) (*x509.CertPool, error) {
|
||||
certPool := x509.NewCertPool()
|
||||
|
||||
var buf bytes.Buffer
|
||||
for _, c := range crts {
|
||||
if _, err := io.Copy(&buf, c); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !certPool.AppendCertsFromPEM(buf.Bytes()) {
|
||||
return nil, errors.New("failed to append cert from PEM")
|
||||
}
|
||||
buf.Reset()
|
||||
}
|
||||
|
||||
return certPool, nil
|
||||
}
|
||||
@@ -40,7 +40,7 @@ var _ = Describe("Crypto", func() {
|
||||
Describe("Creating key / certificate pair", func() {
|
||||
Context("For ocis-proxy in the location of the user config directory", func() {
|
||||
It(fmt.Sprintf("Creates the cert / key tuple in: %s", filepath.Join(userConfigDir, "ocis")), func() {
|
||||
if err := crypto.GenCert(config.Proxy.HTTP.TLSCert, config.Proxy.HTTP.TLSKey, log.NewLogger()); err != nil {
|
||||
if err := crypto.GenCert(config.Proxy.HTTP.TLSCert, config.Proxy.HTTP.TLSKey, log.NopLogger()); err != nil {
|
||||
Fail(err.Error())
|
||||
}
|
||||
|
||||
@@ -54,4 +54,50 @@ var _ = Describe("Crypto", func() {
|
||||
})
|
||||
})
|
||||
})
|
||||
Describe("Creating a new cert pool", func() {
|
||||
var (
|
||||
crtOne string
|
||||
keyOne string
|
||||
crtTwo string
|
||||
keyTwo string
|
||||
)
|
||||
BeforeEach(func() {
|
||||
crtOne = filepath.Join(userConfigDir, "ocis/one.cert")
|
||||
keyOne = filepath.Join(userConfigDir, "ocis/one.key")
|
||||
crtTwo = filepath.Join(userConfigDir, "ocis/two.cert")
|
||||
keyTwo = filepath.Join(userConfigDir, "ocis/two.key")
|
||||
if err := crypto.GenCert(crtOne, keyOne, log.NopLogger()); err != nil {
|
||||
Fail(err.Error())
|
||||
}
|
||||
if err := crypto.GenCert(crtTwo, keyTwo, log.NopLogger()); err != nil {
|
||||
Fail(err.Error())
|
||||
}
|
||||
})
|
||||
It("handles one certificate", func() {
|
||||
f1, _ := os.Open(crtOne)
|
||||
defer f1.Close()
|
||||
|
||||
c, err := crypto.NewCertPoolFromPEM(f1)
|
||||
if err != nil {
|
||||
Fail(err.Error())
|
||||
}
|
||||
if len(c.Subjects()) != 1 {
|
||||
Fail("expected 1 certificate in the cert pool")
|
||||
}
|
||||
})
|
||||
It("handles multiple certificates", func() {
|
||||
f1, _ := os.Open(crtOne)
|
||||
f2, _ := os.Open(crtTwo)
|
||||
defer f1.Close()
|
||||
defer f2.Close()
|
||||
|
||||
c, err := crypto.NewCertPoolFromPEM(f1, f2)
|
||||
if err != nil {
|
||||
Fail(err.Error())
|
||||
}
|
||||
if len(c.Subjects()) != 2 {
|
||||
Fail("expected 2 certificates in the cert pool")
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
@@ -58,7 +58,6 @@ func TestPersistKey(t *testing.T) {
|
||||
|
||||
type args struct {
|
||||
keyName string
|
||||
l log.Logger
|
||||
pk interface{}
|
||||
}
|
||||
tests := []struct {
|
||||
@@ -69,7 +68,6 @@ func TestPersistKey(t *testing.T) {
|
||||
name: "writes a private key (rsa) to the specified location",
|
||||
args: args{
|
||||
keyName: keyPath,
|
||||
l: log.NewLogger(),
|
||||
pk: rsaPk,
|
||||
},
|
||||
},
|
||||
@@ -77,14 +75,13 @@ func TestPersistKey(t *testing.T) {
|
||||
name: "writes a private key (ecdsa) to the specified location",
|
||||
args: args{
|
||||
keyName: keyPath,
|
||||
l: log.NewLogger(),
|
||||
pk: ecdsaPk,
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
if err := persistKey(tt.args.keyName, tt.args.l, tt.args.pk); err != nil {
|
||||
if err := persistKey(tt.args.keyName, log.NopLogger(), tt.args.pk); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
})
|
||||
@@ -107,7 +104,6 @@ func TestPersistCertificate(t *testing.T) {
|
||||
|
||||
type args struct {
|
||||
certName string
|
||||
l log.Logger
|
||||
pk interface{}
|
||||
}
|
||||
tests := []struct {
|
||||
@@ -119,7 +115,6 @@ func TestPersistCertificate(t *testing.T) {
|
||||
name: "store a certificate with an rsa private key",
|
||||
args: args{
|
||||
certName: certPath,
|
||||
l: log.NewLogger(),
|
||||
pk: rsaPk,
|
||||
},
|
||||
wantErr: false,
|
||||
@@ -128,7 +123,6 @@ func TestPersistCertificate(t *testing.T) {
|
||||
name: "store a certificate with an ecdsa private key",
|
||||
args: args{
|
||||
certName: certPath,
|
||||
l: log.NewLogger(),
|
||||
pk: ecdsaPk,
|
||||
},
|
||||
wantErr: false,
|
||||
@@ -137,7 +131,6 @@ func TestPersistCertificate(t *testing.T) {
|
||||
name: "should fail",
|
||||
args: args{
|
||||
certName: certPath,
|
||||
l: log.NewLogger(),
|
||||
pk: 42,
|
||||
},
|
||||
wantErr: true,
|
||||
@@ -146,7 +139,7 @@ func TestPersistCertificate(t *testing.T) {
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
if err := persistCertificate(tt.args.certName, tt.args.l, tt.args.pk); err != nil {
|
||||
if err := persistCertificate(tt.args.certName, log.NopLogger(), tt.args.pk); err != nil {
|
||||
if !tt.wantErr {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -53,6 +53,11 @@ func LoggerFromConfig(name string, cfg *shared.Log) Logger {
|
||||
)
|
||||
}
|
||||
|
||||
// NopLogger initializes a no-operation logger.
|
||||
func NopLogger() Logger {
|
||||
return Logger{zerolog.Nop()}
|
||||
}
|
||||
|
||||
// NewLogger initializes a new logger instance.
|
||||
func NewLogger(opts ...Option) Logger {
|
||||
options := newOptions(opts...)
|
||||
|
||||
@@ -2,12 +2,16 @@ package command
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/cs3org/reva/v2/pkg/events"
|
||||
"github.com/cs3org/reva/v2/pkg/events/server"
|
||||
"github.com/go-micro/plugins/v4/events/natsjs"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
|
||||
ociscrypto "github.com/owncloud/ocis/v2/ocis-pkg/crypto"
|
||||
"github.com/owncloud/ocis/v2/services/audit/pkg/config"
|
||||
"github.com/owncloud/ocis/v2/services/audit/pkg/config/parser"
|
||||
"github.com/owncloud/ocis/v2/services/audit/pkg/logging"
|
||||
@@ -36,7 +40,27 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
defer cancel()
|
||||
|
||||
evtsCfg := cfg.Events
|
||||
|
||||
var rootCAPool *x509.CertPool
|
||||
if evtsCfg.TLSRootCACertificate != "" {
|
||||
rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rootCAPool, err = ociscrypto.NewCertPoolFromPEM(rootCrtFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
evtsCfg.TLSInsecure = false
|
||||
}
|
||||
|
||||
tlsConf := &tls.Config{
|
||||
InsecureSkipVerify: evtsCfg.TLSInsecure, //nolint:gosec
|
||||
RootCAs: rootCAPool,
|
||||
}
|
||||
client, err := server.NewNatsStream(
|
||||
natsjs.TLSConfig(tlsConf),
|
||||
natsjs.Address(evtsCfg.Endpoint),
|
||||
natsjs.ClusterID(evtsCfg.Cluster),
|
||||
)
|
||||
|
||||
@@ -23,9 +23,11 @@ type Config struct {
|
||||
|
||||
// Events combines the configuration options for the event bus.
|
||||
type Events struct {
|
||||
Endpoint string `yaml:"endpoint" env:"AUDIT_EVENTS_ENDPOINT" desc:"The address of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture."`
|
||||
Cluster string `yaml:"cluster" env:"AUDIT_EVENTS_CLUSTER" desc:"The clusterID of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture. Mandatory when using NATS as event system."`
|
||||
ConsumerGroup string `yaml:"group" env:"AUDIT_EVENTS_GROUP" desc:"The consumergroup of the service. One group will only get one copy of an event."`
|
||||
Endpoint string `yaml:"endpoint" env:"AUDIT_EVENTS_ENDPOINT" desc:"The address of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture."`
|
||||
Cluster string `yaml:"cluster" env:"AUDIT_EVENTS_CLUSTER" desc:"The clusterID of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture. Mandatory when using NATS as event system."`
|
||||
ConsumerGroup string `yaml:"group" env:"AUDIT_EVENTS_GROUP" desc:"The consumergroup of the service. One group will only get one copy of an event."`
|
||||
TLSInsecure bool `yaml:"tls_insecure" env:"OCIS_INSECURE;AUDIT_EVENTS_TLS_INSECURE" desc:"Whether to verify the server TLS certificates."`
|
||||
TLSRootCACertificate string `yaml:"tls_root_ca_certificate" env:"AUDIT_EVENTS_TLS_ROOT_CA_CERTIFICATE" desc:"The root CA certificate used to validate the server's TLS certificate. If provided AUDIT_EVENTS_TLS_INSECURE will be seen as false."`
|
||||
}
|
||||
|
||||
// Auditlog holds audit log information
|
||||
|
||||
@@ -70,6 +70,8 @@ type Identity struct {
|
||||
|
||||
// Events combines the configuration options for the event bus.
|
||||
type Events struct {
|
||||
Endpoint string `yaml:"endpoint" env:"GRAPH_EVENTS_ENDPOINT" desc:"The address of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture. Set to a empty string to disable emitting events."`
|
||||
Cluster string `yaml:"cluster" env:"GRAPH_EVENTS_CLUSTER" desc:"The clusterID of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture."`
|
||||
Endpoint string `yaml:"endpoint" env:"GRAPH_EVENTS_ENDPOINT" desc:"The address of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture. Set to a empty string to disable emitting events."`
|
||||
Cluster string `yaml:"cluster" env:"GRAPH_EVENTS_CLUSTER" desc:"The clusterID of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture."`
|
||||
TLSInsecure bool `yaml:"tls_insecure" env:"OCIS_INSECURE;GRAPH_EVENTS_TLS_INSECURE" desc:"Whether to verify the server TLS certificates."`
|
||||
TLSRootCACertificate string `yaml:"tls_root_ca_certificate" env:"GRAPH_EVENTS_TLS_ROOT_CA_CERTIFICATE" desc:"The root CA certificate used to validate the server's TLS certificate. If provided GRAPH_EVENTS_TLS_INSECURE will be seen as false."`
|
||||
}
|
||||
|
||||
@@ -1,10 +1,15 @@
|
||||
package http
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"os"
|
||||
|
||||
"github.com/cs3org/reva/v2/pkg/events/server"
|
||||
chimiddleware "github.com/go-chi/chi/v5/middleware"
|
||||
"github.com/go-micro/plugins/v4/events/natsjs"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/account"
|
||||
ociscrypto "github.com/owncloud/ocis/v2/ocis-pkg/crypto"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/middleware"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/service/http"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/version"
|
||||
@@ -33,7 +38,26 @@ func Server(opts ...Option) (http.Service, error) {
|
||||
|
||||
if options.Config.Events.Endpoint != "" {
|
||||
var err error
|
||||
var rootCAPool *x509.CertPool
|
||||
if options.Config.Events.TLSRootCACertificate != "" {
|
||||
rootCrtFile, err := os.Open(options.Config.Events.TLSRootCACertificate)
|
||||
if err != nil {
|
||||
return http.Service{}, err
|
||||
}
|
||||
|
||||
rootCAPool, err = ociscrypto.NewCertPoolFromPEM(rootCrtFile)
|
||||
if err != nil {
|
||||
return http.Service{}, err
|
||||
}
|
||||
options.Config.Events.TLSInsecure = false
|
||||
}
|
||||
|
||||
tlsConf := &tls.Config{
|
||||
InsecureSkipVerify: options.Config.Events.TLSInsecure, //nolint:gosec
|
||||
RootCAs: rootCAPool,
|
||||
}
|
||||
publisher, err = server.NewNatsStream(
|
||||
natsjs.TLSConfig(tlsConf),
|
||||
natsjs.Address(options.Config.Events.Endpoint),
|
||||
natsjs.ClusterID(options.Config.Events.Cluster),
|
||||
)
|
||||
|
||||
@@ -2,11 +2,13 @@ package command
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
|
||||
"github.com/oklog/run"
|
||||
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
|
||||
pkgcrypto "github.com/owncloud/ocis/v2/ocis-pkg/crypto"
|
||||
"github.com/owncloud/ocis/v2/services/nats/pkg/config"
|
||||
"github.com/owncloud/ocis/v2/services/nats/pkg/config/parser"
|
||||
"github.com/owncloud/ocis/v2/services/nats/pkg/logging"
|
||||
@@ -36,6 +38,26 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
|
||||
defer cancel()
|
||||
|
||||
// Generate a self-signing cert if no certificate is present
|
||||
if err := pkgcrypto.GenCert(cfg.Nats.TLSCert, cfg.Nats.TLSKey, logger); err != nil {
|
||||
logger.Fatal().Err(err).Msgf("Could not generate test-certificate")
|
||||
}
|
||||
|
||||
crt, err := tls.LoadX509KeyPair(cfg.Nats.TLSCert, cfg.Nats.TLSKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
clientAuth := tls.RequireAndVerifyClientCert
|
||||
if cfg.Nats.TLSSkipVerifyClientCert {
|
||||
clientAuth = tls.NoClientCert
|
||||
}
|
||||
|
||||
tlsConf := &tls.Config{
|
||||
MinVersion: tls.VersionTLS12,
|
||||
ClientAuth: clientAuth,
|
||||
Certificates: []tls.Certificate{crt},
|
||||
}
|
||||
natsServer, err := nats.NewNATSServer(
|
||||
ctx,
|
||||
logging.NewLogWrapper(logger),
|
||||
@@ -43,6 +65,7 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
nats.Port(cfg.Nats.Port),
|
||||
nats.ClusterID(cfg.Nats.ClusterID),
|
||||
nats.StoreDir(cfg.Nats.StoreDir),
|
||||
nats.TLSConfig(tlsConf),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -22,8 +22,11 @@ type Config struct {
|
||||
|
||||
// Nats is the nats config
|
||||
type Nats struct {
|
||||
Host string `yaml:"host" env:"NATS_NATS_HOST" desc:"Bind address."`
|
||||
Port int `yaml:"port" env:"NATS_NATS_PORT" desc:"Bind port."`
|
||||
ClusterID string `yaml:"clusterid" env:"NATS_NATS_CLUSTER_ID" desc:"ID of the NATS cluster."`
|
||||
StoreDir string `yaml:"store_dir" env:"NATS_NATS_STORE_DIR" desc:"Path for the NATS JetStream persistence directory."`
|
||||
Host string `yaml:"host" env:"NATS_NATS_HOST" desc:"Bind address."`
|
||||
Port int `yaml:"port" env:"NATS_NATS_PORT" desc:"Bind port."`
|
||||
ClusterID string `yaml:"clusterid" env:"NATS_NATS_CLUSTER_ID" desc:"ID of the NATS cluster."`
|
||||
StoreDir string `yaml:"store_dir" env:"NATS_NATS_STORE_DIR" desc:"Path for the NATS JetStream persistence directory."`
|
||||
TLSCert string `yaml:"tls_cert" env:"NATS_TLS_CERT" desc:"File name of the TLS server certificate for the nats listener."`
|
||||
TLSKey string `yaml:"tls_key" env:"NATS_TLS_KEY" desc:"File name for the TLS certificate key for the server certificate."`
|
||||
TLSSkipVerifyClientCert bool `yaml:"tls_skip_verify_client_cert" env:"OCIS_INSECURE;NATS_TLS_SKIP_VERIFY_CLIENT_CERT" desc:"Whether the nats server should skip the client certificate verification during the TLS handshake."`
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package defaults
|
||||
|
||||
import (
|
||||
"path"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/config/defaults"
|
||||
"github.com/owncloud/ocis/v2/services/nats/pkg/config"
|
||||
@@ -29,7 +29,9 @@ func DefaultConfig() *config.Config {
|
||||
Host: "127.0.0.1",
|
||||
Port: 9233,
|
||||
ClusterID: "ocis-cluster",
|
||||
StoreDir: path.Join(defaults.BaseDataPath(), "nats"),
|
||||
StoreDir: filepath.Join(defaults.BaseDataPath(), "nats"),
|
||||
TLSCert: filepath.Join(defaults.BaseDataPath(), "nats/tls.crt"),
|
||||
TLSKey: filepath.Join(defaults.BaseDataPath(), "nats/tls.key"),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ func NewNATSServer(ctx context.Context, logger nserver.Logger, opts ...NatsOptio
|
||||
|
||||
// enable JetStream
|
||||
natsOpts.JetStream = true
|
||||
natsOpts.AllowNonTLS = false
|
||||
|
||||
server, err := nserver.NewServer(natsOpts)
|
||||
if err != nil {
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package nats
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
|
||||
nserver "github.com/nats-io/nats-server/v2/server"
|
||||
)
|
||||
|
||||
@@ -34,3 +36,10 @@ func StoreDir(StoreDir string) NatsOption {
|
||||
o.StoreDir = StoreDir
|
||||
}
|
||||
}
|
||||
|
||||
// TLSConfig sets the tls config for the nats server
|
||||
func TLSConfig(c *tls.Config) NatsOption {
|
||||
return func(o *nserver.Options) {
|
||||
o.TLSConfig = c
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,13 +1,17 @@
|
||||
package command
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/cs3org/reva/v2/pkg/events"
|
||||
"github.com/cs3org/reva/v2/pkg/events/server"
|
||||
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
|
||||
"github.com/go-micro/plugins/v4/events/natsjs"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/crypto"
|
||||
"github.com/owncloud/ocis/v2/services/notifications/pkg/channels"
|
||||
"github.com/owncloud/ocis/v2/services/notifications/pkg/config"
|
||||
"github.com/owncloud/ocis/v2/services/notifications/pkg/config/parser"
|
||||
@@ -35,7 +39,26 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
}
|
||||
|
||||
evtsCfg := cfg.Notifications.Events
|
||||
var rootCAPool *x509.CertPool
|
||||
if evtsCfg.TLSRootCACertificate != "" {
|
||||
rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rootCAPool, err = crypto.NewCertPoolFromPEM(rootCrtFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
evtsCfg.TLSInsecure = false
|
||||
}
|
||||
|
||||
tlsConf := &tls.Config{
|
||||
InsecureSkipVerify: evtsCfg.TLSInsecure, //nolint:gosec
|
||||
RootCAs: rootCAPool,
|
||||
}
|
||||
client, err := server.NewNatsStream(
|
||||
natsjs.TLSConfig(tlsConf),
|
||||
natsjs.Address(evtsCfg.Endpoint),
|
||||
natsjs.ClusterID(evtsCfg.Cluster),
|
||||
)
|
||||
|
||||
@@ -43,7 +43,9 @@ type SMTP struct {
|
||||
|
||||
// Events combines the configuration options for the event bus.
|
||||
type Events struct {
|
||||
Endpoint string `yaml:"endpoint" env:"NOTIFICATIONS_EVENTS_ENDPOINT" desc:"The address of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture."`
|
||||
Cluster string `yaml:"cluster" env:"NOTIFICATIONS_EVENTS_CLUSTER" desc:"The clusterID of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture. Mandatory when using NATS as event system."`
|
||||
ConsumerGroup string `yaml:"group" env:"NOTIFICATIONS_EVENTS_GROUP" desc:"Name of the event group / queue on the event system."`
|
||||
Endpoint string `yaml:"endpoint" env:"NOTIFICATIONS_EVENTS_ENDPOINT" desc:"The address of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture."`
|
||||
Cluster string `yaml:"cluster" env:"NOTIFICATIONS_EVENTS_CLUSTER" desc:"The clusterID of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture. Mandatory when using NATS as event system."`
|
||||
ConsumerGroup string `yaml:"group" env:"NOTIFICATIONS_EVENTS_GROUP" desc:"Name of the event group / queue on the event system."`
|
||||
TLSInsecure bool `yaml:"tls_insecure" env:"OCIS_INSECURE;NOTIFICATIONS_EVENTS_TLS_INSECURE" desc:"Whether to verify the server TLS certificates."`
|
||||
TLSRootCACertificate string `yaml:"tls_root_ca_certificate" env:"NOTIFICATIONS_EVENTS_TLS_ROOT_CA_CERTIFICATE" desc:"The root CA certificate used to validate the server's TLS certificate. If provided NOTIFICATIONS_EVENTS_TLS_INSECURE will be seen as false."`
|
||||
}
|
||||
|
||||
@@ -29,7 +29,9 @@ type Config struct {
|
||||
|
||||
// Events combines the configuration options for the event bus.
|
||||
type Events struct {
|
||||
Endpoint string `yaml:"endpoint" env:"SEARCH_EVENTS_ENDPOINT" desc:"The address of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture."`
|
||||
Cluster string `yaml:"cluster" env:"SEARCH_EVENTS_CLUSTER" desc:"The clusterID of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture. Mandatory when using NATS as event system."`
|
||||
ConsumerGroup string `yaml:"group" env:"SEARCH_EVENTS_GROUP" desc:"The customer group of the service. One group will only get one copy of an event"`
|
||||
Endpoint string `yaml:"endpoint" env:"SEARCH_EVENTS_ENDPOINT" desc:"The address of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture."`
|
||||
Cluster string `yaml:"cluster" env:"SEARCH_EVENTS_CLUSTER" desc:"The clusterID of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture. Mandatory when using NATS as event system."`
|
||||
ConsumerGroup string `yaml:"group" env:"SEARCH_EVENTS_GROUP" desc:"The customer group of the service. One group will only get one copy of an event"`
|
||||
TLSInsecure bool `yaml:"tls_insecure" env:"OCIS_INSECURE;SEARCH_EVENTS_TLS_INSECURE" desc:"Whether to verify the server TLS certificates."`
|
||||
TLSRootCACertificate string `yaml:"tls_root_ca_certificate" env:"SEARCH_EVENTS_TLS_ROOT_CA_CERTIFICATE" desc:"The root CA certificate used to validate the server's TLS certificate. If provided SEARCH_EVENTS_TLS_INSECURE will be seen as false."`
|
||||
}
|
||||
|
||||
@@ -2,7 +2,10 @@ package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"errors"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/blevesearch/bleve/v2"
|
||||
@@ -16,6 +19,7 @@ import (
|
||||
"go-micro.dev/v4/metadata"
|
||||
grpcmetadata "google.golang.org/grpc/metadata"
|
||||
|
||||
ociscrypto "github.com/owncloud/ocis/v2/ocis-pkg/crypto"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/log"
|
||||
searchsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/search/v0"
|
||||
"github.com/owncloud/ocis/v2/services/search/pkg/config"
|
||||
@@ -32,7 +36,27 @@ func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, error) {
|
||||
|
||||
// Connect to nats to listen for changes that need to trigger an index update
|
||||
evtsCfg := cfg.Events
|
||||
|
||||
var rootCAPool *x509.CertPool
|
||||
if evtsCfg.TLSRootCACertificate != "" {
|
||||
rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rootCAPool, err = ociscrypto.NewCertPoolFromPEM(rootCrtFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
evtsCfg.TLSInsecure = false
|
||||
}
|
||||
|
||||
tlsConf := &tls.Config{
|
||||
InsecureSkipVerify: evtsCfg.TLSInsecure, //nolint:gosec
|
||||
RootCAs: rootCAPool,
|
||||
}
|
||||
client, err := server.NewNatsStream(
|
||||
natsjs.TLSConfig(tlsConf),
|
||||
natsjs.Address(evtsCfg.Endpoint),
|
||||
natsjs.ClusterID(evtsCfg.Cluster),
|
||||
)
|
||||
|
||||
@@ -150,6 +150,8 @@ type PublicSharingJSONCS3Driver struct {
|
||||
}
|
||||
|
||||
type Events struct {
|
||||
Addr string `yaml:"endpoint" env:"SHARING_EVENTS_ENDPOINT" desc:"The address of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture."`
|
||||
ClusterID string `yaml:"cluster" env:"SHARING_EVENTS_CLUSTER" desc:"The clusterID of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture. Mandatory when using NATS as event system."`
|
||||
Addr string `yaml:"endpoint" env:"SHARING_EVENTS_ENDPOINT" desc:"The address of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture."`
|
||||
ClusterID string `yaml:"cluster" env:"SHARING_EVENTS_CLUSTER" desc:"The clusterID of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture. Mandatory when using NATS as event system."`
|
||||
TLSInsecure bool `yaml:"tls_insecure" env:"OCIS_INSECURE;SHARING_EVENTS_TLS_INSECURE" desc:"Whether to verify the server TLS certificates."`
|
||||
TLSRootCaCertPath string `yaml:"tls_root_ca_cert_path" env:"SHARING_EVENTS_TLS_ROOT_CA_CERT" desc:"The root CA certificate used to validate the server's TLS certificate. If provided SHARING_EVENTS_TLS_INSECURE will be seen as false."`
|
||||
}
|
||||
|
||||
@@ -69,8 +69,9 @@ func DefaultConfig() *config.Config {
|
||||
// TODO implement and add owncloudsql publicshare driver
|
||||
},
|
||||
Events: config.Events{
|
||||
Addr: "127.0.0.1:9233",
|
||||
ClusterID: "ocis-cluster",
|
||||
Addr: "127.0.0.1:9233",
|
||||
ClusterID: "ocis-cluster",
|
||||
TLSInsecure: true,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -102,10 +102,12 @@ func SharingConfigFromStruct(cfg *config.Config) map[string]interface{} {
|
||||
},
|
||||
"interceptors": map[string]interface{}{
|
||||
"eventsmiddleware": map[string]interface{}{
|
||||
"group": "sharing",
|
||||
"type": "nats",
|
||||
"address": cfg.Events.Addr,
|
||||
"clusterID": cfg.Events.ClusterID,
|
||||
"group": "sharing",
|
||||
"type": "nats",
|
||||
"address": cfg.Events.Addr,
|
||||
"clusterID": cfg.Events.ClusterID,
|
||||
"tls-insecure": cfg.Events.TLSInsecure,
|
||||
"tls-root-ca-cert": cfg.Events.TLSRootCaCertPath,
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
@@ -133,8 +133,10 @@ type OwnCloudSQLDriver struct {
|
||||
}
|
||||
|
||||
type Events struct {
|
||||
Addr string `yaml:"endpoint" env:"STORAGE_USERS_EVENTS_ENDPOINT" desc:"The address of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture."`
|
||||
ClusterID string `yaml:"cluster" env:"STORAGE_USERS_EVENTS_CLUSTER" desc:"The clusterID of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture. Mandatory when using NATS as event system."`
|
||||
Addr string `yaml:"endpoint" env:"STORAGE_USERS_EVENTS_ENDPOINT" desc:"The address of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture."`
|
||||
ClusterID string `yaml:"cluster" env:"STORAGE_USERS_EVENTS_CLUSTER" desc:"The clusterID of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture. Mandatory when using NATS as event system."`
|
||||
TLSInsecure bool `yaml:"tls_insecure" env:"OCIS_INSECURE;STORAGE_USERS_EVENTS_TLS_INSECURE" desc:"Whether to verify the server TLS certificates."`
|
||||
TLSRootCaCertPath string `yaml:"tls_root_ca_cert_path" env:"STORAGE_USERS_EVENTS_TLS_ROOT_CA_CERT" desc:"The root CA certificate used to validate the server's TLS certificate. If provided STORAGE_USERS_EVENTS_TLS_INSECURE will be seen as false."`
|
||||
}
|
||||
|
||||
// Cache holds cache config
|
||||
|
||||
@@ -75,8 +75,9 @@ func DefaultConfig() *config.Config {
|
||||
},
|
||||
},
|
||||
Events: config.Events{
|
||||
Addr: "127.0.0.1:9233",
|
||||
ClusterID: "ocis-cluster",
|
||||
Addr: "127.0.0.1:9233",
|
||||
ClusterID: "ocis-cluster",
|
||||
TLSInsecure: true,
|
||||
},
|
||||
Cache: config.Cache{
|
||||
Store: "memory",
|
||||
|
||||
@@ -34,10 +34,12 @@ func StorageUsersConfigFromStruct(cfg *config.Config) map[string]interface{} {
|
||||
},
|
||||
"interceptors": map[string]interface{}{
|
||||
"eventsmiddleware": map[string]interface{}{
|
||||
"group": "sharing",
|
||||
"type": "nats",
|
||||
"address": cfg.Events.Addr,
|
||||
"clusterID": cfg.Events.ClusterID,
|
||||
"group": "sharing",
|
||||
"type": "nats",
|
||||
"address": cfg.Events.Addr,
|
||||
"clusterID": cfg.Events.ClusterID,
|
||||
"tls-insecure": cfg.Events.TLSInsecure,
|
||||
"tls-root-ca-cert": cfg.Events.TLSRootCaCertPath,
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -47,11 +49,13 @@ func StorageUsersConfigFromStruct(cfg *config.Config) map[string]interface{} {
|
||||
// TODO build services dynamically
|
||||
"services": map[string]interface{}{
|
||||
"dataprovider": map[string]interface{}{
|
||||
"prefix": cfg.HTTP.Prefix,
|
||||
"driver": cfg.Driver,
|
||||
"drivers": UserDrivers(cfg),
|
||||
"nats_address": cfg.Events.Addr,
|
||||
"nats_clusterID": cfg.Events.ClusterID,
|
||||
"prefix": cfg.HTTP.Prefix,
|
||||
"driver": cfg.Driver,
|
||||
"drivers": UserDrivers(cfg),
|
||||
"nats_address": cfg.Events.Addr,
|
||||
"nats_clusterID": cfg.Events.ClusterID,
|
||||
"nats_tls_insecure": cfg.Events.TLSInsecure,
|
||||
"nats_root_ca_cert_path": cfg.Events.TLSRootCaCertPath,
|
||||
"data_txs": map[string]interface{}{
|
||||
"simple": map[string]interface{}{
|
||||
"cache_store": cfg.Cache.Store,
|
||||
|
||||
Reference in New Issue
Block a user