mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-01-06 12:19:37 -06:00
Merge pull request #1416 from opencloud-eu/nats-connection-names
Nats connection names
This commit is contained in:
41
pkg/generators/natsnames.go
Normal file
41
pkg/generators/natsnames.go
Normal file
@@ -0,0 +1,41 @@
|
||||
package generators
|
||||
|
||||
import (
|
||||
"os"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
// NType is an enum type for the different types of NATS connections
|
||||
type NType int
|
||||
|
||||
// Enum values for NType
|
||||
const (
|
||||
NTypeBus NType = iota
|
||||
NTypeKeyValue
|
||||
NTypeRegistry
|
||||
)
|
||||
|
||||
// String returns the string representation of a NType
|
||||
func (n NType) String() string {
|
||||
return []string{"bus", "kv", "reg"}[n]
|
||||
}
|
||||
|
||||
// GenerateConnectionName generates a connection name for a NATS connection
|
||||
// The connection name will be formatted as follows: "hostname:pid:service:type"
|
||||
func GenerateConnectionName(service string, ntype NType) string {
|
||||
host, err := os.Hostname()
|
||||
if err != nil {
|
||||
host = ""
|
||||
}
|
||||
|
||||
return firstNRunes(host, 5) + ":" + strconv.Itoa(os.Getpid()) + ":" + service + ":" + ntype.String()
|
||||
}
|
||||
|
||||
// firstNRunes returns the first n runes of a string
|
||||
func firstNRunes(s string, n int) string {
|
||||
runes := []rune(s)
|
||||
if n > len(runes) {
|
||||
n = len(runes)
|
||||
}
|
||||
return string(runes[:n])
|
||||
}
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
|
||||
type storeOptionsKey struct{}
|
||||
type defaultTTLKey struct{}
|
||||
type serviceNameKey struct{}
|
||||
|
||||
// StoreOptions sets the options for the underlying store
|
||||
func StoreOptions(opts []store.Option) registry.Option {
|
||||
@@ -30,3 +31,14 @@ func DefaultTTL(t time.Duration) registry.Option {
|
||||
o.Context = context.WithValue(o.Context, defaultTTLKey{}, t)
|
||||
}
|
||||
}
|
||||
|
||||
// ServiceName links the service name to the registry if possible.
|
||||
// The name will be part of the connection name to the Nats registry
|
||||
func ServiceName(name string) registry.Option {
|
||||
return func(o *registry.Options) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, serviceNameKey{}, name)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
|
||||
natsjskv "github.com/go-micro/plugins/v4/store/nats-js-kv"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/opencloud-eu/opencloud/pkg/generators"
|
||||
"go-micro.dev/v4/registry"
|
||||
"go-micro.dev/v4/server"
|
||||
"go-micro.dev/v4/store"
|
||||
@@ -29,7 +30,15 @@ var (
|
||||
)
|
||||
|
||||
func init() {
|
||||
cmd.DefaultRegistries[_registryName] = NewRegistry
|
||||
cmd.DefaultRegistries[_registryName] = NewRegistryMicro
|
||||
}
|
||||
|
||||
// NewRegistryMicro returns a new natsjs registry, forcing the service name
|
||||
// to be "_go-micro". This is the registry that is intended to be used by
|
||||
// go-micro
|
||||
func NewRegistryMicro(opts ...registry.Option) registry.Registry {
|
||||
overwrittenOpts := append(opts, ServiceName("_go-micro"))
|
||||
return NewRegistry(overwrittenOpts...)
|
||||
}
|
||||
|
||||
// NewRegistry returns a new natsjs registry
|
||||
@@ -186,6 +195,11 @@ func (n *storeregistry) storeOptions(opts registry.Options) []store.Option {
|
||||
storeoptions = append(storeoptions, natsjskv.DefaultTTL(defaultTTL))
|
||||
}
|
||||
|
||||
serviceName := "_unknown" // use "_unknown" as default service name if nothing else is provided
|
||||
if name, ok := opts.Context.Value(serviceNameKey{}).(string); ok {
|
||||
serviceName = name
|
||||
}
|
||||
|
||||
addr := []string{"127.0.0.1:9233"}
|
||||
if len(opts.Addrs) > 0 {
|
||||
addr = opts.Addrs
|
||||
@@ -195,7 +209,7 @@ func (n *storeregistry) storeOptions(opts registry.Options) []store.Option {
|
||||
storeoptions = append(storeoptions, store.Nodes(addr...))
|
||||
|
||||
natsOptions := nats.GetDefaultOptions()
|
||||
natsOptions.Name = "nats-js-kv-registry"
|
||||
natsOptions.Name = generators.GenerateConnectionName(serviceName, generators.NTypeRegistry)
|
||||
natsOptions.User, natsOptions.Password = getAuth()
|
||||
natsOptions.ReconnectedCB = func(_ *nats.Conn) {
|
||||
if err := n.Init(); err != nil {
|
||||
|
||||
@@ -59,6 +59,7 @@ func GetRegistry(opts ...Option) mRegistry.Registry {
|
||||
_reg = natsjsregistry.NewRegistry(
|
||||
mRegistry.Addrs(cfg.Addresses...),
|
||||
natsjsregistry.DefaultTTL(cfg.RegisterTTL),
|
||||
natsjsregistry.ServiceName("_oc"),
|
||||
)
|
||||
case "memory":
|
||||
_reg = memr.NewRegistry()
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
microstore "go-micro.dev/v4/store"
|
||||
|
||||
"github.com/opencloud-eu/opencloud/pkg/config/configlog"
|
||||
"github.com/opencloud-eu/opencloud/pkg/generators"
|
||||
"github.com/opencloud-eu/opencloud/pkg/registry"
|
||||
ogrpc "github.com/opencloud-eu/opencloud/pkg/service/grpc"
|
||||
"github.com/opencloud-eu/opencloud/pkg/tracing"
|
||||
@@ -70,7 +71,8 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
|
||||
defer cancel()
|
||||
|
||||
evStream, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events))
|
||||
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus)
|
||||
evStream, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Events))
|
||||
if err != nil {
|
||||
logger.Error().Err(err).Msg("Failed to initialize event stream")
|
||||
return err
|
||||
|
||||
@@ -20,6 +20,7 @@ import (
|
||||
"github.com/opencloud-eu/reva/v2/pkg/rhttp"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
||||
"github.com/opencloud-eu/opencloud/pkg/generators"
|
||||
"github.com/opencloud-eu/opencloud/pkg/log"
|
||||
"github.com/opencloud-eu/opencloud/services/antivirus/pkg/config"
|
||||
"github.com/opencloud-eu/opencloud/services/antivirus/pkg/scanners"
|
||||
@@ -114,7 +115,8 @@ func (av Antivirus) Run() error {
|
||||
av.config.Events.TLSInsecure = false
|
||||
}
|
||||
|
||||
natsStream, err := stream.NatsFromConfig(av.config.Service.Name, false, stream.NatsConfig(av.config.Events))
|
||||
connName := generators.GenerateConnectionName(av.config.Service.Name, generators.NTypeBus)
|
||||
natsStream, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(av.config.Events))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"github.com/urfave/cli/v2"
|
||||
|
||||
"github.com/opencloud-eu/opencloud/pkg/config/configlog"
|
||||
"github.com/opencloud-eu/opencloud/pkg/generators"
|
||||
"github.com/opencloud-eu/opencloud/services/audit/pkg/config"
|
||||
"github.com/opencloud-eu/opencloud/services/audit/pkg/config/parser"
|
||||
"github.com/opencloud-eu/opencloud/services/audit/pkg/logging"
|
||||
@@ -36,7 +37,8 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
)
|
||||
defer cancel()
|
||||
|
||||
client, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events))
|
||||
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus)
|
||||
client, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Events))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"github.com/urfave/cli/v2"
|
||||
|
||||
"github.com/opencloud-eu/opencloud/pkg/config/configlog"
|
||||
"github.com/opencloud-eu/opencloud/pkg/generators"
|
||||
"github.com/opencloud-eu/opencloud/pkg/registry"
|
||||
"github.com/opencloud-eu/opencloud/pkg/tracing"
|
||||
"github.com/opencloud-eu/opencloud/pkg/version"
|
||||
@@ -68,7 +69,8 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
|
||||
defer cancel()
|
||||
|
||||
s, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events))
|
||||
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus)
|
||||
s, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Events))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
microstore "go-micro.dev/v4/store"
|
||||
|
||||
"github.com/opencloud-eu/opencloud/pkg/config/configlog"
|
||||
"github.com/opencloud-eu/opencloud/pkg/generators"
|
||||
ogrpc "github.com/opencloud-eu/opencloud/pkg/service/grpc"
|
||||
"github.com/opencloud-eu/opencloud/pkg/tracing"
|
||||
"github.com/opencloud-eu/opencloud/pkg/version"
|
||||
@@ -55,7 +56,8 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
|
||||
m.BuildInfo.WithLabelValues(version.GetString()).Set(1)
|
||||
|
||||
consumer, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events))
|
||||
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus)
|
||||
consumer, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Events))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
"go-micro.dev/v4/metadata"
|
||||
"google.golang.org/protobuf/types/known/fieldmaskpb"
|
||||
|
||||
"github.com/opencloud-eu/opencloud/pkg/generators"
|
||||
"github.com/opencloud-eu/opencloud/pkg/log"
|
||||
"github.com/opencloud-eu/opencloud/pkg/middleware"
|
||||
"github.com/opencloud-eu/opencloud/pkg/registry"
|
||||
@@ -34,7 +35,8 @@ var _registeredEvents = []events.Unmarshaller{
|
||||
|
||||
// ListenForEvents listens for events and acts accordingly
|
||||
func ListenForEvents(ctx context.Context, cfg *config.Config, l log.Logger) error {
|
||||
bus, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events))
|
||||
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus)
|
||||
bus, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Events))
|
||||
if err != nil {
|
||||
l.Error().Err(err).Msg("cannot connect to nats")
|
||||
return err
|
||||
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
|
||||
"github.com/opencloud-eu/opencloud/pkg/account"
|
||||
"github.com/opencloud-eu/opencloud/pkg/cors"
|
||||
"github.com/opencloud-eu/opencloud/pkg/generators"
|
||||
"github.com/opencloud-eu/opencloud/pkg/keycloak"
|
||||
"github.com/opencloud-eu/opencloud/pkg/middleware"
|
||||
"github.com/opencloud-eu/opencloud/pkg/registry"
|
||||
@@ -56,7 +57,8 @@ func Server(opts ...Option) (http.Service, error) {
|
||||
|
||||
if options.Config.Events.Endpoint != "" {
|
||||
var err error
|
||||
eventsStream, err = stream.NatsFromConfig(options.Config.Service.Name, false, stream.NatsConfig(options.Config.Events))
|
||||
connName := generators.GenerateConnectionName(options.Config.Service.Name, generators.NTypeBus)
|
||||
eventsStream, err = stream.NatsFromConfig(connName, false, stream.NatsConfig(options.Config.Events))
|
||||
if err != nil {
|
||||
options.Logger.Error().
|
||||
Err(err).
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package command
|
||||
|
||||
import (
|
||||
"github.com/opencloud-eu/opencloud/pkg/generators"
|
||||
"github.com/opencloud-eu/opencloud/services/notifications/pkg/config"
|
||||
"github.com/opencloud-eu/reva/v2/pkg/events"
|
||||
"github.com/opencloud-eu/reva/v2/pkg/events/stream"
|
||||
@@ -31,7 +32,8 @@ func SendEmail(cfg *config.Config) *cli.Command {
|
||||
if !daily && !weekly {
|
||||
return errors.New("at least one of '--daily' or '--weekly' must be set")
|
||||
}
|
||||
s, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Notifications.Events))
|
||||
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus)
|
||||
s, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Notifications.Events))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@ import (
|
||||
"github.com/opencloud-eu/reva/v2/pkg/rgrpc/todo/pool"
|
||||
|
||||
"github.com/opencloud-eu/opencloud/pkg/config/configlog"
|
||||
"github.com/opencloud-eu/opencloud/pkg/generators"
|
||||
"github.com/opencloud-eu/opencloud/pkg/registry"
|
||||
"github.com/opencloud-eu/opencloud/pkg/service/grpc"
|
||||
"github.com/opencloud-eu/opencloud/pkg/tracing"
|
||||
@@ -94,7 +95,8 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
registeredEvents[typ.String()] = e
|
||||
}
|
||||
|
||||
client, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Notifications.Events))
|
||||
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus)
|
||||
client, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Notifications.Events))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"github.com/urfave/cli/v2"
|
||||
|
||||
"github.com/opencloud-eu/opencloud/pkg/config/configlog"
|
||||
"github.com/opencloud-eu/opencloud/pkg/generators"
|
||||
"github.com/opencloud-eu/opencloud/pkg/log"
|
||||
"github.com/opencloud-eu/opencloud/pkg/service/grpc"
|
||||
"github.com/opencloud-eu/opencloud/pkg/tracing"
|
||||
@@ -104,7 +105,8 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
|
||||
{
|
||||
|
||||
bus, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events))
|
||||
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus)
|
||||
bus, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Events))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
|
||||
"github.com/opencloud-eu/opencloud/pkg/config/configlog"
|
||||
"github.com/opencloud-eu/opencloud/pkg/generators"
|
||||
"github.com/opencloud-eu/opencloud/services/postprocessing/pkg/config"
|
||||
"github.com/opencloud-eu/opencloud/services/postprocessing/pkg/config/parser"
|
||||
"github.com/opencloud-eu/reva/v2/pkg/events"
|
||||
@@ -40,7 +41,8 @@ func RestartPostprocessing(cfg *config.Config) *cli.Command {
|
||||
return configlog.ReturnFatal(parser.ParseConfig(cfg))
|
||||
},
|
||||
Action: func(c *cli.Context) error {
|
||||
stream, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig{
|
||||
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus)
|
||||
stream, err := stream.NatsFromConfig(connName, false, stream.NatsConfig{
|
||||
Endpoint: cfg.Postprocessing.Events.Endpoint,
|
||||
Cluster: cfg.Postprocessing.Events.Cluster,
|
||||
EnableTLS: cfg.Postprocessing.Events.EnableTLS,
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/opencloud-eu/opencloud/pkg/generators"
|
||||
"github.com/opencloud-eu/opencloud/pkg/log"
|
||||
"github.com/opencloud-eu/opencloud/pkg/version"
|
||||
"github.com/opencloud-eu/opencloud/services/postprocessing/pkg/config"
|
||||
@@ -46,7 +47,8 @@ var (
|
||||
|
||||
// NewPostprocessingService returns a new instance of a postprocessing service
|
||||
func NewPostprocessingService(ctx context.Context, logger log.Logger, sto store.Store, tp trace.TracerProvider, cfg *config.Config) (*PostprocessingService, error) {
|
||||
pub, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig{
|
||||
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus)
|
||||
pub, err := stream.NatsFromConfig(connName, false, stream.NatsConfig{
|
||||
Endpoint: cfg.Postprocessing.Events.Endpoint,
|
||||
Cluster: cfg.Postprocessing.Events.Cluster,
|
||||
EnableTLS: cfg.Postprocessing.Events.EnableTLS,
|
||||
@@ -55,11 +57,11 @@ func NewPostprocessingService(ctx context.Context, logger log.Logger, sto store.
|
||||
AuthUsername: cfg.Postprocessing.Events.AuthUsername,
|
||||
AuthPassword: cfg.Postprocessing.Events.AuthPassword,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
raw, err := raw.FromConfig(ctx, cfg.Service.Name, raw.Config{
|
||||
|
||||
raw, err := raw.FromConfig(ctx, connName, raw.Config{
|
||||
Endpoint: cfg.Postprocessing.Events.Endpoint,
|
||||
Cluster: cfg.Postprocessing.Events.Cluster,
|
||||
EnableTLS: cfg.Postprocessing.Events.EnableTLS,
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"github.com/justinas/alice"
|
||||
"github.com/oklog/run"
|
||||
"github.com/opencloud-eu/opencloud/pkg/config/configlog"
|
||||
"github.com/opencloud-eu/opencloud/pkg/generators"
|
||||
"github.com/opencloud-eu/opencloud/pkg/log"
|
||||
pkgmiddleware "github.com/opencloud-eu/opencloud/pkg/middleware"
|
||||
"github.com/opencloud-eu/opencloud/pkg/oidc"
|
||||
@@ -157,7 +158,8 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
var publisher events.Stream
|
||||
if cfg.Events.Endpoint != "" {
|
||||
var err error
|
||||
publisher, err = stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events))
|
||||
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus)
|
||||
publisher, err = stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Events))
|
||||
if err != nil {
|
||||
logger.Error().
|
||||
Err(err).
|
||||
|
||||
@@ -24,6 +24,7 @@ import (
|
||||
"go-micro.dev/v4/metadata"
|
||||
grpcmetadata "google.golang.org/grpc/metadata"
|
||||
|
||||
"github.com/opencloud-eu/opencloud/pkg/generators"
|
||||
"github.com/opencloud-eu/opencloud/pkg/log"
|
||||
"github.com/opencloud-eu/opencloud/pkg/registry"
|
||||
v0 "github.com/opencloud-eu/opencloud/protogen/gen/opencloud/messages/search/v0"
|
||||
@@ -115,7 +116,8 @@ func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, func(), error)
|
||||
|
||||
// setup event handling
|
||||
|
||||
stream, err := raw.FromConfig(context.Background(), cfg.Service.Name, raw.Config{
|
||||
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus)
|
||||
stream, err := raw.FromConfig(context.Background(), connName, raw.Config{
|
||||
Endpoint: cfg.Events.Endpoint,
|
||||
Cluster: cfg.Events.Cluster,
|
||||
EnableTLS: cfg.Events.EnableTLS,
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"path/filepath"
|
||||
|
||||
"github.com/opencloud-eu/opencloud/pkg/config/defaults"
|
||||
"github.com/opencloud-eu/opencloud/pkg/generators"
|
||||
"github.com/opencloud-eu/opencloud/pkg/log"
|
||||
"github.com/opencloud-eu/opencloud/services/sharing/pkg/config"
|
||||
)
|
||||
@@ -135,7 +136,7 @@ func SharingConfigFromStruct(cfg *config.Config, logger log.Logger) (map[string]
|
||||
"tls-insecure": cfg.Events.TLSInsecure,
|
||||
"tls-root-ca-cert": cfg.Events.TLSRootCaCertPath,
|
||||
"enable-tls": cfg.Events.EnableTLS,
|
||||
"name": "sharing-eventsmiddleware",
|
||||
"name": generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus),
|
||||
"username": cfg.Events.AuthUsername,
|
||||
"password": cfg.Events.AuthPassword,
|
||||
},
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"github.com/urfave/cli/v2"
|
||||
|
||||
"github.com/opencloud-eu/opencloud/pkg/config/configlog"
|
||||
"github.com/opencloud-eu/opencloud/pkg/generators"
|
||||
"github.com/opencloud-eu/opencloud/pkg/log"
|
||||
"github.com/opencloud-eu/opencloud/pkg/tracing"
|
||||
"github.com/opencloud-eu/opencloud/services/sse/pkg/config"
|
||||
@@ -52,7 +53,8 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
}
|
||||
|
||||
{
|
||||
natsStream, err := stream.NatsFromConfig(cfg.Service.Name, true, stream.NatsConfig(cfg.Events))
|
||||
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus)
|
||||
natsStream, err := stream.NatsFromConfig(connName, true, stream.NatsConfig(cfg.Events))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package event
|
||||
|
||||
import (
|
||||
"github.com/opencloud-eu/opencloud/pkg/generators"
|
||||
"github.com/opencloud-eu/opencloud/services/storage-users/pkg/config"
|
||||
"github.com/opencloud-eu/reva/v2/pkg/events/stream"
|
||||
"go-micro.dev/v4/events"
|
||||
@@ -8,7 +9,8 @@ import (
|
||||
|
||||
// NewStream prepares the requested nats stream and returns it.
|
||||
func NewStream(cfg *config.Config) (events.Stream, error) {
|
||||
return stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig{
|
||||
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus)
|
||||
return stream.NatsFromConfig(connName, false, stream.NatsConfig{
|
||||
Endpoint: cfg.Events.Addr,
|
||||
Cluster: cfg.Events.ClusterID,
|
||||
EnableTLS: cfg.Events.EnableTLS,
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/opencloud-eu/opencloud/pkg/generators"
|
||||
"github.com/opencloud-eu/opencloud/services/storage-users/pkg/config"
|
||||
)
|
||||
|
||||
@@ -57,7 +58,7 @@ func StorageUsersConfigFromStruct(cfg *config.Config) map[string]interface{} {
|
||||
"tls-insecure": cfg.Events.TLSInsecure,
|
||||
"tls-root-ca-cert": cfg.Events.TLSRootCaCertPath,
|
||||
"enable-tls": cfg.Events.EnableTLS,
|
||||
"name": "storage-users-eventsmiddleware",
|
||||
"name": generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus),
|
||||
"username": cfg.Events.AuthUsername,
|
||||
"password": cfg.Events.AuthPassword,
|
||||
},
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
microstore "go-micro.dev/v4/store"
|
||||
|
||||
"github.com/opencloud-eu/opencloud/pkg/config/configlog"
|
||||
"github.com/opencloud-eu/opencloud/pkg/generators"
|
||||
"github.com/opencloud-eu/opencloud/pkg/registry"
|
||||
ogrpc "github.com/opencloud-eu/opencloud/pkg/service/grpc"
|
||||
"github.com/opencloud-eu/opencloud/pkg/tracing"
|
||||
@@ -76,7 +77,8 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
|
||||
defer cancel()
|
||||
|
||||
stream, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events))
|
||||
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus)
|
||||
stream, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Events))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user