diff --git a/pkg/generators/natsnames.go b/pkg/generators/natsnames.go new file mode 100644 index 000000000..93cb28f12 --- /dev/null +++ b/pkg/generators/natsnames.go @@ -0,0 +1,38 @@ +package generators + +import ( + "os" + "strconv" +) + +type NType int + +const ( + NTYPE_BUS NType = iota + NTYPE_KEYVALUE + NTYPE_REGISTRY +) + +func (n NType) String() string { + return []string{"bus", "kv", "reg"}[n] +} + +func GenerateConnectionName(service string, ntype NType) string { + host, err := os.Hostname() + if err != nil { + host = "" + } + + return firstNRunes(host, 5) + ":" + strconv.Itoa(os.Getpid()) + ":" + service + ":" + ntype.String() +} + +func firstNRunes(s string, n int) string { + i := 0 + for j := range s { + if i == n { + return s[:j] + } + i++ + } + return s +} diff --git a/services/activitylog/pkg/command/server.go b/services/activitylog/pkg/command/server.go index 70884d31a..32156d355 100644 --- a/services/activitylog/pkg/command/server.go +++ b/services/activitylog/pkg/command/server.go @@ -13,6 +13,7 @@ import ( microstore "go-micro.dev/v4/store" "github.com/opencloud-eu/opencloud/pkg/config/configlog" + "github.com/opencloud-eu/opencloud/pkg/generators" "github.com/opencloud-eu/opencloud/pkg/registry" ogrpc "github.com/opencloud-eu/opencloud/pkg/service/grpc" "github.com/opencloud-eu/opencloud/pkg/tracing" @@ -70,7 +71,8 @@ func Server(cfg *config.Config) *cli.Command { defer cancel() - evStream, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events)) + connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTYPE_BUS) + evStream, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Events)) if err != nil { logger.Error().Err(err).Msg("Failed to initialize event stream") return err diff --git a/services/antivirus/pkg/service/service.go b/services/antivirus/pkg/service/service.go index 10cbe9ac6..9c9e2cca1 100644 --- a/services/antivirus/pkg/service/service.go +++ b/services/antivirus/pkg/service/service.go @@ -20,6 +20,7 @@ import ( "github.com/opencloud-eu/reva/v2/pkg/rhttp" "go.opentelemetry.io/otel/trace" + "github.com/opencloud-eu/opencloud/pkg/generators" "github.com/opencloud-eu/opencloud/pkg/log" "github.com/opencloud-eu/opencloud/services/antivirus/pkg/config" "github.com/opencloud-eu/opencloud/services/antivirus/pkg/scanners" @@ -114,7 +115,8 @@ func (av Antivirus) Run() error { av.config.Events.TLSInsecure = false } - natsStream, err := stream.NatsFromConfig(av.config.Service.Name, false, stream.NatsConfig(av.config.Events)) + connName := generators.GenerateConnectionName(av.config.Service.Name, generators.NTYPE_BUS) + natsStream, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(av.config.Events)) if err != nil { return err } diff --git a/services/audit/pkg/command/server.go b/services/audit/pkg/command/server.go index 30ca082c6..99286f7eb 100644 --- a/services/audit/pkg/command/server.go +++ b/services/audit/pkg/command/server.go @@ -10,6 +10,7 @@ import ( "github.com/urfave/cli/v2" "github.com/opencloud-eu/opencloud/pkg/config/configlog" + "github.com/opencloud-eu/opencloud/pkg/generators" "github.com/opencloud-eu/opencloud/services/audit/pkg/config" "github.com/opencloud-eu/opencloud/services/audit/pkg/config/parser" "github.com/opencloud-eu/opencloud/services/audit/pkg/logging" @@ -36,7 +37,8 @@ func Server(cfg *config.Config) *cli.Command { ) defer cancel() - client, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events)) + connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTYPE_BUS) + client, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Events)) if err != nil { return err } diff --git a/services/clientlog/pkg/command/server.go b/services/clientlog/pkg/command/server.go index 334c5b729..f4b70990e 100644 --- a/services/clientlog/pkg/command/server.go +++ b/services/clientlog/pkg/command/server.go @@ -11,6 +11,7 @@ import ( "github.com/urfave/cli/v2" "github.com/opencloud-eu/opencloud/pkg/config/configlog" + "github.com/opencloud-eu/opencloud/pkg/generators" "github.com/opencloud-eu/opencloud/pkg/registry" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/pkg/version" @@ -68,7 +69,8 @@ func Server(cfg *config.Config) *cli.Command { defer cancel() - s, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events)) + connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTYPE_BUS) + s, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Events)) if err != nil { return err } diff --git a/services/eventhistory/pkg/command/server.go b/services/eventhistory/pkg/command/server.go index 6c31a47ab..25f9eb048 100644 --- a/services/eventhistory/pkg/command/server.go +++ b/services/eventhistory/pkg/command/server.go @@ -11,6 +11,7 @@ import ( microstore "go-micro.dev/v4/store" "github.com/opencloud-eu/opencloud/pkg/config/configlog" + "github.com/opencloud-eu/opencloud/pkg/generators" ogrpc "github.com/opencloud-eu/opencloud/pkg/service/grpc" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/pkg/version" @@ -55,7 +56,8 @@ func Server(cfg *config.Config) *cli.Command { m.BuildInfo.WithLabelValues(version.GetString()).Set(1) - consumer, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events)) + connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTYPE_BUS) + consumer, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Events)) if err != nil { return err } diff --git a/services/frontend/pkg/command/events.go b/services/frontend/pkg/command/events.go index 14aa292a9..60d988330 100644 --- a/services/frontend/pkg/command/events.go +++ b/services/frontend/pkg/command/events.go @@ -18,6 +18,7 @@ import ( "go-micro.dev/v4/metadata" "google.golang.org/protobuf/types/known/fieldmaskpb" + "github.com/opencloud-eu/opencloud/pkg/generators" "github.com/opencloud-eu/opencloud/pkg/log" "github.com/opencloud-eu/opencloud/pkg/middleware" "github.com/opencloud-eu/opencloud/pkg/registry" @@ -34,7 +35,8 @@ var _registeredEvents = []events.Unmarshaller{ // ListenForEvents listens for events and acts accordingly func ListenForEvents(ctx context.Context, cfg *config.Config, l log.Logger) error { - bus, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events)) + connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTYPE_BUS) + bus, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Events)) if err != nil { l.Error().Err(err).Msg("cannot connect to nats") return err diff --git a/services/graph/pkg/server/http/server.go b/services/graph/pkg/server/http/server.go index ab07d9657..2fe68b876 100644 --- a/services/graph/pkg/server/http/server.go +++ b/services/graph/pkg/server/http/server.go @@ -16,6 +16,7 @@ import ( "github.com/opencloud-eu/opencloud/pkg/account" "github.com/opencloud-eu/opencloud/pkg/cors" + "github.com/opencloud-eu/opencloud/pkg/generators" "github.com/opencloud-eu/opencloud/pkg/keycloak" "github.com/opencloud-eu/opencloud/pkg/middleware" "github.com/opencloud-eu/opencloud/pkg/registry" @@ -56,7 +57,8 @@ func Server(opts ...Option) (http.Service, error) { if options.Config.Events.Endpoint != "" { var err error - eventsStream, err = stream.NatsFromConfig(options.Config.Service.Name, false, stream.NatsConfig(options.Config.Events)) + connName := generators.GenerateConnectionName(options.Config.Service.Name, generators.NTYPE_BUS) + eventsStream, err = stream.NatsFromConfig(connName, false, stream.NatsConfig(options.Config.Events)) if err != nil { options.Logger.Error(). Err(err). diff --git a/services/notifications/pkg/command/send_email.go b/services/notifications/pkg/command/send_email.go index b903794d4..857b699bc 100644 --- a/services/notifications/pkg/command/send_email.go +++ b/services/notifications/pkg/command/send_email.go @@ -1,6 +1,7 @@ package command import ( + "github.com/opencloud-eu/opencloud/pkg/generators" "github.com/opencloud-eu/opencloud/services/notifications/pkg/config" "github.com/opencloud-eu/reva/v2/pkg/events" "github.com/opencloud-eu/reva/v2/pkg/events/stream" @@ -31,7 +32,8 @@ func SendEmail(cfg *config.Config) *cli.Command { if !daily && !weekly { return errors.New("at least one of '--daily' or '--weekly' must be set") } - s, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Notifications.Events)) + connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTYPE_BUS) + s, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Notifications.Events)) if err != nil { return err } diff --git a/services/notifications/pkg/command/server.go b/services/notifications/pkg/command/server.go index 7955849c5..447a11bcb 100644 --- a/services/notifications/pkg/command/server.go +++ b/services/notifications/pkg/command/server.go @@ -17,6 +17,7 @@ import ( "github.com/opencloud-eu/reva/v2/pkg/rgrpc/todo/pool" "github.com/opencloud-eu/opencloud/pkg/config/configlog" + "github.com/opencloud-eu/opencloud/pkg/generators" "github.com/opencloud-eu/opencloud/pkg/registry" "github.com/opencloud-eu/opencloud/pkg/service/grpc" "github.com/opencloud-eu/opencloud/pkg/tracing" @@ -94,7 +95,8 @@ func Server(cfg *config.Config) *cli.Command { registeredEvents[typ.String()] = e } - client, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Notifications.Events)) + connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTYPE_BUS) + client, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Notifications.Events)) if err != nil { return err } diff --git a/services/policies/pkg/command/server.go b/services/policies/pkg/command/server.go index 5d7d221c0..e70f06c93 100644 --- a/services/policies/pkg/command/server.go +++ b/services/policies/pkg/command/server.go @@ -9,6 +9,7 @@ import ( "github.com/urfave/cli/v2" "github.com/opencloud-eu/opencloud/pkg/config/configlog" + "github.com/opencloud-eu/opencloud/pkg/generators" "github.com/opencloud-eu/opencloud/pkg/log" "github.com/opencloud-eu/opencloud/pkg/service/grpc" "github.com/opencloud-eu/opencloud/pkg/tracing" @@ -104,7 +105,8 @@ func Server(cfg *config.Config) *cli.Command { { - bus, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events)) + connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTYPE_BUS) + bus, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Events)) if err != nil { return err } diff --git a/services/postprocessing/pkg/command/postprocessing.go b/services/postprocessing/pkg/command/postprocessing.go index 1494b335b..97a86d7cb 100644 --- a/services/postprocessing/pkg/command/postprocessing.go +++ b/services/postprocessing/pkg/command/postprocessing.go @@ -4,6 +4,7 @@ import ( "context" "github.com/opencloud-eu/opencloud/pkg/config/configlog" + "github.com/opencloud-eu/opencloud/pkg/generators" "github.com/opencloud-eu/opencloud/services/postprocessing/pkg/config" "github.com/opencloud-eu/opencloud/services/postprocessing/pkg/config/parser" "github.com/opencloud-eu/reva/v2/pkg/events" @@ -40,7 +41,8 @@ func RestartPostprocessing(cfg *config.Config) *cli.Command { return configlog.ReturnFatal(parser.ParseConfig(cfg)) }, Action: func(c *cli.Context) error { - stream, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig{ + connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTYPE_BUS) + stream, err := stream.NatsFromConfig(connName, false, stream.NatsConfig{ Endpoint: cfg.Postprocessing.Events.Endpoint, Cluster: cfg.Postprocessing.Events.Cluster, EnableTLS: cfg.Postprocessing.Events.EnableTLS, diff --git a/services/postprocessing/pkg/service/service.go b/services/postprocessing/pkg/service/service.go index c57082509..f6af1d3b6 100644 --- a/services/postprocessing/pkg/service/service.go +++ b/services/postprocessing/pkg/service/service.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/opencloud-eu/opencloud/pkg/generators" "github.com/opencloud-eu/opencloud/pkg/log" "github.com/opencloud-eu/opencloud/pkg/version" "github.com/opencloud-eu/opencloud/services/postprocessing/pkg/config" @@ -46,7 +47,8 @@ var ( // NewPostprocessingService returns a new instance of a postprocessing service func NewPostprocessingService(ctx context.Context, logger log.Logger, sto store.Store, tp trace.TracerProvider, cfg *config.Config) (*PostprocessingService, error) { - pub, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig{ + connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTYPE_BUS) + pub, err := stream.NatsFromConfig(connName, false, stream.NatsConfig{ Endpoint: cfg.Postprocessing.Events.Endpoint, Cluster: cfg.Postprocessing.Events.Cluster, EnableTLS: cfg.Postprocessing.Events.EnableTLS, @@ -55,11 +57,11 @@ func NewPostprocessingService(ctx context.Context, logger log.Logger, sto store. AuthUsername: cfg.Postprocessing.Events.AuthUsername, AuthPassword: cfg.Postprocessing.Events.AuthPassword, }) - if err != nil { return nil, err } - raw, err := raw.FromConfig(ctx, cfg.Service.Name, raw.Config{ + + raw, err := raw.FromConfig(ctx, connName, raw.Config{ Endpoint: cfg.Postprocessing.Events.Endpoint, Cluster: cfg.Postprocessing.Events.Cluster, EnableTLS: cfg.Postprocessing.Events.EnableTLS, diff --git a/services/proxy/pkg/command/server.go b/services/proxy/pkg/command/server.go index ca0d75998..6046b9fab 100644 --- a/services/proxy/pkg/command/server.go +++ b/services/proxy/pkg/command/server.go @@ -12,6 +12,7 @@ import ( "github.com/justinas/alice" "github.com/oklog/run" "github.com/opencloud-eu/opencloud/pkg/config/configlog" + "github.com/opencloud-eu/opencloud/pkg/generators" "github.com/opencloud-eu/opencloud/pkg/log" pkgmiddleware "github.com/opencloud-eu/opencloud/pkg/middleware" "github.com/opencloud-eu/opencloud/pkg/oidc" @@ -157,7 +158,8 @@ func Server(cfg *config.Config) *cli.Command { var publisher events.Stream if cfg.Events.Endpoint != "" { var err error - publisher, err = stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events)) + connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTYPE_BUS) + publisher, err = stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Events)) if err != nil { logger.Error(). Err(err). diff --git a/services/search/pkg/service/grpc/v0/service.go b/services/search/pkg/service/grpc/v0/service.go index 914000ff7..ca25d93b2 100644 --- a/services/search/pkg/service/grpc/v0/service.go +++ b/services/search/pkg/service/grpc/v0/service.go @@ -24,6 +24,7 @@ import ( "go-micro.dev/v4/metadata" grpcmetadata "google.golang.org/grpc/metadata" + "github.com/opencloud-eu/opencloud/pkg/generators" "github.com/opencloud-eu/opencloud/pkg/log" "github.com/opencloud-eu/opencloud/pkg/registry" v0 "github.com/opencloud-eu/opencloud/protogen/gen/opencloud/messages/search/v0" @@ -115,7 +116,8 @@ func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, func(), error) // setup event handling - stream, err := raw.FromConfig(context.Background(), cfg.Service.Name, raw.Config{ + connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTYPE_BUS) + stream, err := raw.FromConfig(context.Background(), connName, raw.Config{ Endpoint: cfg.Events.Endpoint, Cluster: cfg.Events.Cluster, EnableTLS: cfg.Events.EnableTLS, diff --git a/services/sse/pkg/command/server.go b/services/sse/pkg/command/server.go index 7dcecb327..7607a7ce7 100644 --- a/services/sse/pkg/command/server.go +++ b/services/sse/pkg/command/server.go @@ -10,6 +10,7 @@ import ( "github.com/urfave/cli/v2" "github.com/opencloud-eu/opencloud/pkg/config/configlog" + "github.com/opencloud-eu/opencloud/pkg/generators" "github.com/opencloud-eu/opencloud/pkg/log" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/services/sse/pkg/config" @@ -52,7 +53,8 @@ func Server(cfg *config.Config) *cli.Command { } { - natsStream, err := stream.NatsFromConfig(cfg.Service.Name, true, stream.NatsConfig(cfg.Events)) + connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTYPE_BUS) + natsStream, err := stream.NatsFromConfig(connName, true, stream.NatsConfig(cfg.Events)) if err != nil { return err } diff --git a/services/storage-users/pkg/event/event.go b/services/storage-users/pkg/event/event.go index 7b0104ca5..e699ca2da 100644 --- a/services/storage-users/pkg/event/event.go +++ b/services/storage-users/pkg/event/event.go @@ -1,6 +1,7 @@ package event import ( + "github.com/opencloud-eu/opencloud/pkg/generators" "github.com/opencloud-eu/opencloud/services/storage-users/pkg/config" "github.com/opencloud-eu/reva/v2/pkg/events/stream" "go-micro.dev/v4/events" @@ -8,7 +9,8 @@ import ( // NewStream prepares the requested nats stream and returns it. func NewStream(cfg *config.Config) (events.Stream, error) { - return stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig{ + connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTYPE_BUS) + return stream.NatsFromConfig(connName, false, stream.NatsConfig{ Endpoint: cfg.Events.Addr, Cluster: cfg.Events.ClusterID, EnableTLS: cfg.Events.EnableTLS, diff --git a/services/userlog/pkg/command/server.go b/services/userlog/pkg/command/server.go index b8f185f3d..7147d9120 100644 --- a/services/userlog/pkg/command/server.go +++ b/services/userlog/pkg/command/server.go @@ -13,6 +13,7 @@ import ( microstore "go-micro.dev/v4/store" "github.com/opencloud-eu/opencloud/pkg/config/configlog" + "github.com/opencloud-eu/opencloud/pkg/generators" "github.com/opencloud-eu/opencloud/pkg/registry" ogrpc "github.com/opencloud-eu/opencloud/pkg/service/grpc" "github.com/opencloud-eu/opencloud/pkg/tracing" @@ -76,7 +77,8 @@ func Server(cfg *config.Config) *cli.Command { defer cancel() - stream, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events)) + connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTYPE_BUS) + stream, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Events)) if err != nil { return err }