feat: use names for connections to the nats event bus

This commit is contained in:
Juan Pablo Villafáñez
2025-03-18 11:27:09 +01:00
committed by Jörn Friedrich Dreyer
parent df10de7498
commit ca2dc823ef
18 changed files with 91 additions and 19 deletions

View File

@@ -0,0 +1,38 @@
package generators
import (
"os"
"strconv"
)
type NType int
const (
NTYPE_BUS NType = iota
NTYPE_KEYVALUE
NTYPE_REGISTRY
)
func (n NType) String() string {
return []string{"bus", "kv", "reg"}[n]
}
func GenerateConnectionName(service string, ntype NType) string {
host, err := os.Hostname()
if err != nil {
host = ""
}
return firstNRunes(host, 5) + ":" + strconv.Itoa(os.Getpid()) + ":" + service + ":" + ntype.String()
}
func firstNRunes(s string, n int) string {
i := 0
for j := range s {
if i == n {
return s[:j]
}
i++
}
return s
}

View File

@@ -13,6 +13,7 @@ import (
microstore "go-micro.dev/v4/store"
"github.com/opencloud-eu/opencloud/pkg/config/configlog"
"github.com/opencloud-eu/opencloud/pkg/generators"
"github.com/opencloud-eu/opencloud/pkg/registry"
ogrpc "github.com/opencloud-eu/opencloud/pkg/service/grpc"
"github.com/opencloud-eu/opencloud/pkg/tracing"
@@ -70,7 +71,8 @@ func Server(cfg *config.Config) *cli.Command {
defer cancel()
evStream, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events))
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTYPE_BUS)
evStream, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Events))
if err != nil {
logger.Error().Err(err).Msg("Failed to initialize event stream")
return err

View File

@@ -20,6 +20,7 @@ import (
"github.com/opencloud-eu/reva/v2/pkg/rhttp"
"go.opentelemetry.io/otel/trace"
"github.com/opencloud-eu/opencloud/pkg/generators"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/services/antivirus/pkg/config"
"github.com/opencloud-eu/opencloud/services/antivirus/pkg/scanners"
@@ -114,7 +115,8 @@ func (av Antivirus) Run() error {
av.config.Events.TLSInsecure = false
}
natsStream, err := stream.NatsFromConfig(av.config.Service.Name, false, stream.NatsConfig(av.config.Events))
connName := generators.GenerateConnectionName(av.config.Service.Name, generators.NTYPE_BUS)
natsStream, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(av.config.Events))
if err != nil {
return err
}

View File

@@ -10,6 +10,7 @@ import (
"github.com/urfave/cli/v2"
"github.com/opencloud-eu/opencloud/pkg/config/configlog"
"github.com/opencloud-eu/opencloud/pkg/generators"
"github.com/opencloud-eu/opencloud/services/audit/pkg/config"
"github.com/opencloud-eu/opencloud/services/audit/pkg/config/parser"
"github.com/opencloud-eu/opencloud/services/audit/pkg/logging"
@@ -36,7 +37,8 @@ func Server(cfg *config.Config) *cli.Command {
)
defer cancel()
client, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events))
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTYPE_BUS)
client, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Events))
if err != nil {
return err
}

View File

@@ -11,6 +11,7 @@ import (
"github.com/urfave/cli/v2"
"github.com/opencloud-eu/opencloud/pkg/config/configlog"
"github.com/opencloud-eu/opencloud/pkg/generators"
"github.com/opencloud-eu/opencloud/pkg/registry"
"github.com/opencloud-eu/opencloud/pkg/tracing"
"github.com/opencloud-eu/opencloud/pkg/version"
@@ -68,7 +69,8 @@ func Server(cfg *config.Config) *cli.Command {
defer cancel()
s, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events))
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTYPE_BUS)
s, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Events))
if err != nil {
return err
}

View File

@@ -11,6 +11,7 @@ import (
microstore "go-micro.dev/v4/store"
"github.com/opencloud-eu/opencloud/pkg/config/configlog"
"github.com/opencloud-eu/opencloud/pkg/generators"
ogrpc "github.com/opencloud-eu/opencloud/pkg/service/grpc"
"github.com/opencloud-eu/opencloud/pkg/tracing"
"github.com/opencloud-eu/opencloud/pkg/version"
@@ -55,7 +56,8 @@ func Server(cfg *config.Config) *cli.Command {
m.BuildInfo.WithLabelValues(version.GetString()).Set(1)
consumer, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events))
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTYPE_BUS)
consumer, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Events))
if err != nil {
return err
}

View File

@@ -18,6 +18,7 @@ import (
"go-micro.dev/v4/metadata"
"google.golang.org/protobuf/types/known/fieldmaskpb"
"github.com/opencloud-eu/opencloud/pkg/generators"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/pkg/middleware"
"github.com/opencloud-eu/opencloud/pkg/registry"
@@ -34,7 +35,8 @@ var _registeredEvents = []events.Unmarshaller{
// ListenForEvents listens for events and acts accordingly
func ListenForEvents(ctx context.Context, cfg *config.Config, l log.Logger) error {
bus, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events))
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTYPE_BUS)
bus, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Events))
if err != nil {
l.Error().Err(err).Msg("cannot connect to nats")
return err

View File

@@ -16,6 +16,7 @@ import (
"github.com/opencloud-eu/opencloud/pkg/account"
"github.com/opencloud-eu/opencloud/pkg/cors"
"github.com/opencloud-eu/opencloud/pkg/generators"
"github.com/opencloud-eu/opencloud/pkg/keycloak"
"github.com/opencloud-eu/opencloud/pkg/middleware"
"github.com/opencloud-eu/opencloud/pkg/registry"
@@ -56,7 +57,8 @@ func Server(opts ...Option) (http.Service, error) {
if options.Config.Events.Endpoint != "" {
var err error
eventsStream, err = stream.NatsFromConfig(options.Config.Service.Name, false, stream.NatsConfig(options.Config.Events))
connName := generators.GenerateConnectionName(options.Config.Service.Name, generators.NTYPE_BUS)
eventsStream, err = stream.NatsFromConfig(connName, false, stream.NatsConfig(options.Config.Events))
if err != nil {
options.Logger.Error().
Err(err).

View File

@@ -1,6 +1,7 @@
package command
import (
"github.com/opencloud-eu/opencloud/pkg/generators"
"github.com/opencloud-eu/opencloud/services/notifications/pkg/config"
"github.com/opencloud-eu/reva/v2/pkg/events"
"github.com/opencloud-eu/reva/v2/pkg/events/stream"
@@ -31,7 +32,8 @@ func SendEmail(cfg *config.Config) *cli.Command {
if !daily && !weekly {
return errors.New("at least one of '--daily' or '--weekly' must be set")
}
s, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Notifications.Events))
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTYPE_BUS)
s, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Notifications.Events))
if err != nil {
return err
}

View File

@@ -17,6 +17,7 @@ import (
"github.com/opencloud-eu/reva/v2/pkg/rgrpc/todo/pool"
"github.com/opencloud-eu/opencloud/pkg/config/configlog"
"github.com/opencloud-eu/opencloud/pkg/generators"
"github.com/opencloud-eu/opencloud/pkg/registry"
"github.com/opencloud-eu/opencloud/pkg/service/grpc"
"github.com/opencloud-eu/opencloud/pkg/tracing"
@@ -94,7 +95,8 @@ func Server(cfg *config.Config) *cli.Command {
registeredEvents[typ.String()] = e
}
client, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Notifications.Events))
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTYPE_BUS)
client, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Notifications.Events))
if err != nil {
return err
}

View File

@@ -9,6 +9,7 @@ import (
"github.com/urfave/cli/v2"
"github.com/opencloud-eu/opencloud/pkg/config/configlog"
"github.com/opencloud-eu/opencloud/pkg/generators"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/pkg/service/grpc"
"github.com/opencloud-eu/opencloud/pkg/tracing"
@@ -104,7 +105,8 @@ func Server(cfg *config.Config) *cli.Command {
{
bus, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events))
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTYPE_BUS)
bus, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Events))
if err != nil {
return err
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"github.com/opencloud-eu/opencloud/pkg/config/configlog"
"github.com/opencloud-eu/opencloud/pkg/generators"
"github.com/opencloud-eu/opencloud/services/postprocessing/pkg/config"
"github.com/opencloud-eu/opencloud/services/postprocessing/pkg/config/parser"
"github.com/opencloud-eu/reva/v2/pkg/events"
@@ -40,7 +41,8 @@ func RestartPostprocessing(cfg *config.Config) *cli.Command {
return configlog.ReturnFatal(parser.ParseConfig(cfg))
},
Action: func(c *cli.Context) error {
stream, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig{
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTYPE_BUS)
stream, err := stream.NatsFromConfig(connName, false, stream.NatsConfig{
Endpoint: cfg.Postprocessing.Events.Endpoint,
Cluster: cfg.Postprocessing.Events.Cluster,
EnableTLS: cfg.Postprocessing.Events.EnableTLS,

View File

@@ -8,6 +8,7 @@ import (
"sync"
"time"
"github.com/opencloud-eu/opencloud/pkg/generators"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/pkg/version"
"github.com/opencloud-eu/opencloud/services/postprocessing/pkg/config"
@@ -46,7 +47,8 @@ var (
// NewPostprocessingService returns a new instance of a postprocessing service
func NewPostprocessingService(ctx context.Context, logger log.Logger, sto store.Store, tp trace.TracerProvider, cfg *config.Config) (*PostprocessingService, error) {
pub, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig{
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTYPE_BUS)
pub, err := stream.NatsFromConfig(connName, false, stream.NatsConfig{
Endpoint: cfg.Postprocessing.Events.Endpoint,
Cluster: cfg.Postprocessing.Events.Cluster,
EnableTLS: cfg.Postprocessing.Events.EnableTLS,
@@ -55,11 +57,11 @@ func NewPostprocessingService(ctx context.Context, logger log.Logger, sto store.
AuthUsername: cfg.Postprocessing.Events.AuthUsername,
AuthPassword: cfg.Postprocessing.Events.AuthPassword,
})
if err != nil {
return nil, err
}
raw, err := raw.FromConfig(ctx, cfg.Service.Name, raw.Config{
raw, err := raw.FromConfig(ctx, connName, raw.Config{
Endpoint: cfg.Postprocessing.Events.Endpoint,
Cluster: cfg.Postprocessing.Events.Cluster,
EnableTLS: cfg.Postprocessing.Events.EnableTLS,

View File

@@ -12,6 +12,7 @@ import (
"github.com/justinas/alice"
"github.com/oklog/run"
"github.com/opencloud-eu/opencloud/pkg/config/configlog"
"github.com/opencloud-eu/opencloud/pkg/generators"
"github.com/opencloud-eu/opencloud/pkg/log"
pkgmiddleware "github.com/opencloud-eu/opencloud/pkg/middleware"
"github.com/opencloud-eu/opencloud/pkg/oidc"
@@ -157,7 +158,8 @@ func Server(cfg *config.Config) *cli.Command {
var publisher events.Stream
if cfg.Events.Endpoint != "" {
var err error
publisher, err = stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events))
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTYPE_BUS)
publisher, err = stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Events))
if err != nil {
logger.Error().
Err(err).

View File

@@ -24,6 +24,7 @@ import (
"go-micro.dev/v4/metadata"
grpcmetadata "google.golang.org/grpc/metadata"
"github.com/opencloud-eu/opencloud/pkg/generators"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/pkg/registry"
v0 "github.com/opencloud-eu/opencloud/protogen/gen/opencloud/messages/search/v0"
@@ -115,7 +116,8 @@ func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, func(), error)
// setup event handling
stream, err := raw.FromConfig(context.Background(), cfg.Service.Name, raw.Config{
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTYPE_BUS)
stream, err := raw.FromConfig(context.Background(), connName, raw.Config{
Endpoint: cfg.Events.Endpoint,
Cluster: cfg.Events.Cluster,
EnableTLS: cfg.Events.EnableTLS,

View File

@@ -10,6 +10,7 @@ import (
"github.com/urfave/cli/v2"
"github.com/opencloud-eu/opencloud/pkg/config/configlog"
"github.com/opencloud-eu/opencloud/pkg/generators"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/pkg/tracing"
"github.com/opencloud-eu/opencloud/services/sse/pkg/config"
@@ -52,7 +53,8 @@ func Server(cfg *config.Config) *cli.Command {
}
{
natsStream, err := stream.NatsFromConfig(cfg.Service.Name, true, stream.NatsConfig(cfg.Events))
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTYPE_BUS)
natsStream, err := stream.NatsFromConfig(connName, true, stream.NatsConfig(cfg.Events))
if err != nil {
return err
}

View File

@@ -1,6 +1,7 @@
package event
import (
"github.com/opencloud-eu/opencloud/pkg/generators"
"github.com/opencloud-eu/opencloud/services/storage-users/pkg/config"
"github.com/opencloud-eu/reva/v2/pkg/events/stream"
"go-micro.dev/v4/events"
@@ -8,7 +9,8 @@ import (
// NewStream prepares the requested nats stream and returns it.
func NewStream(cfg *config.Config) (events.Stream, error) {
return stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig{
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTYPE_BUS)
return stream.NatsFromConfig(connName, false, stream.NatsConfig{
Endpoint: cfg.Events.Addr,
Cluster: cfg.Events.ClusterID,
EnableTLS: cfg.Events.EnableTLS,

View File

@@ -13,6 +13,7 @@ import (
microstore "go-micro.dev/v4/store"
"github.com/opencloud-eu/opencloud/pkg/config/configlog"
"github.com/opencloud-eu/opencloud/pkg/generators"
"github.com/opencloud-eu/opencloud/pkg/registry"
ogrpc "github.com/opencloud-eu/opencloud/pkg/service/grpc"
"github.com/opencloud-eu/opencloud/pkg/tracing"
@@ -76,7 +77,8 @@ func Server(cfg *config.Config) *cli.Command {
defer cancel()
stream, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events))
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTYPE_BUS)
stream, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Events))
if err != nil {
return err
}