mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-01-07 12:50:21 -06:00
reuse default node id when registering services
Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
This commit is contained in:
@@ -2,6 +2,7 @@ Bugfix: Repair nats-js-kv registry
|
||||
|
||||
The registry would always send traffic to only one pod. This is now fixed and load should be spread evenly. Also implements watcher method so the cache can use it.
|
||||
|
||||
https://github.com/owncloud/ocis/pull/9656
|
||||
https://github.com/owncloud/ocis/pull/9662
|
||||
https://github.com/owncloud/ocis/pull/9654
|
||||
https://github.com/owncloud/ocis/pull/9620
|
||||
|
||||
@@ -12,9 +12,9 @@ import (
|
||||
"time"
|
||||
|
||||
natsjskv "github.com/go-micro/plugins/v4/store/nats-js-kv"
|
||||
"github.com/google/uuid"
|
||||
"github.com/nats-io/nats.go"
|
||||
"go-micro.dev/v4/registry"
|
||||
"go-micro.dev/v4/server"
|
||||
"go-micro.dev/v4/store"
|
||||
"go-micro.dev/v4/util/cmd"
|
||||
)
|
||||
@@ -25,7 +25,7 @@ var (
|
||||
_registryUsernameEnv = "MICRO_REGISTRY_AUTH_USERNAME"
|
||||
_registryPasswordEnv = "MICRO_REGISTRY_AUTH_PASSWORD"
|
||||
|
||||
_serviceDelimiter = "/"
|
||||
_serviceDelimiter = "@"
|
||||
)
|
||||
|
||||
func init() {
|
||||
@@ -90,18 +90,12 @@ func (n *storeregistry) Register(s *registry.Service, opts ...registry.RegisterO
|
||||
o(&options)
|
||||
}
|
||||
|
||||
unique := uuid.New().String()
|
||||
if s.Metadata == nil {
|
||||
s.Metadata = make(map[string]string)
|
||||
}
|
||||
s.Metadata["uuid"] = unique
|
||||
|
||||
b, err := json.Marshal(s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return n.store.Write(&store.Record{
|
||||
Key: s.Name + _serviceDelimiter + unique,
|
||||
Key: s.Name + _serviceDelimiter + server.DefaultId,
|
||||
Value: b,
|
||||
Expiry: options.TTL,
|
||||
})
|
||||
@@ -111,13 +105,7 @@ func (n *storeregistry) Register(s *registry.Service, opts ...registry.RegisterO
|
||||
func (n *storeregistry) Deregister(s *registry.Service, _ ...registry.DeregisterOption) error {
|
||||
n.lock.RLock()
|
||||
defer n.lock.RUnlock()
|
||||
|
||||
var unique string
|
||||
if s.Metadata != nil {
|
||||
unique = s.Metadata["uuid"]
|
||||
}
|
||||
|
||||
return n.store.Delete(s.Name + _serviceDelimiter + unique)
|
||||
return n.store.Delete(s.Name + _serviceDelimiter + server.DefaultId)
|
||||
}
|
||||
|
||||
// GetService gets a specific service from the registry
|
||||
|
||||
@@ -7,10 +7,11 @@ import (
|
||||
"strings"
|
||||
|
||||
mRegistry "go-micro.dev/v4/registry"
|
||||
"go-micro.dev/v4/server"
|
||||
"go-micro.dev/v4/util/addr"
|
||||
)
|
||||
|
||||
func BuildGRPCService(serviceID, uuid, address string, version string) *mRegistry.Service {
|
||||
func BuildGRPCService(serviceID, address string, version string) *mRegistry.Service {
|
||||
var host string
|
||||
var port int
|
||||
|
||||
@@ -28,7 +29,7 @@ func BuildGRPCService(serviceID, uuid, address string, version string) *mRegistr
|
||||
}
|
||||
|
||||
node := &mRegistry.Node{
|
||||
Id: serviceID + "-" + uuid,
|
||||
Id: serviceID + "-" + server.DefaultId,
|
||||
Address: net.JoinHostPort(addr, fmt.Sprint(port)),
|
||||
Metadata: make(map[string]string),
|
||||
}
|
||||
@@ -46,7 +47,7 @@ func BuildGRPCService(serviceID, uuid, address string, version string) *mRegistr
|
||||
}
|
||||
}
|
||||
|
||||
func BuildHTTPService(serviceID, uuid, address string, version string) *mRegistry.Service {
|
||||
func BuildHTTPService(serviceID, address string, version string) *mRegistry.Service {
|
||||
var host string
|
||||
var port int
|
||||
|
||||
@@ -64,7 +65,7 @@ func BuildHTTPService(serviceID, uuid, address string, version string) *mRegistr
|
||||
}
|
||||
|
||||
node := &mRegistry.Node{
|
||||
Id: serviceID + "-" + uuid,
|
||||
Id: serviceID + "-" + server.DefaultId,
|
||||
Address: net.JoinHostPort(addr, fmt.Sprint(port)),
|
||||
Metadata: make(map[string]string),
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ func NewServiceWithClient(client client.Client, opts ...Option) (Service, error)
|
||||
var mServer server.Server
|
||||
sopts := newOptions(opts...)
|
||||
tlsConfig := &tls.Config{}
|
||||
|
||||
if sopts.TLSEnabled {
|
||||
var cert tls.Certificate
|
||||
var err error
|
||||
|
||||
@@ -82,7 +82,7 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
sync.Trap(&gr, cancel)
|
||||
}
|
||||
|
||||
grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, uuid.Must(uuid.NewV4()).String(), cfg.GRPC.Addr, version.GetString())
|
||||
grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, cfg.GRPC.Addr, version.GetString())
|
||||
if err := registry.RegisterService(ctx, grpcSvc, logger); err != nil {
|
||||
logger.Fatal().Err(err).Msg("failed to register the grpc service")
|
||||
}
|
||||
|
||||
@@ -77,7 +77,7 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
cancel()
|
||||
})
|
||||
|
||||
grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, uuid.Must(uuid.NewV4()).String(), cfg.GRPC.Addr, version.GetString())
|
||||
grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, cfg.GRPC.Addr, version.GetString())
|
||||
if err := registry.RegisterService(ctx, grpcSvc, logger); err != nil {
|
||||
logger.Fatal().Err(err).Msg("failed to register the grpc service")
|
||||
}
|
||||
|
||||
@@ -82,7 +82,7 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
sync.Trap(&gr, cancel)
|
||||
}
|
||||
|
||||
grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, uuid.Must(uuid.NewV4()).String(), cfg.GRPC.Addr, version.GetString())
|
||||
grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, cfg.GRPC.Addr, version.GetString())
|
||||
if err := registry.RegisterService(ctx, grpcSvc, logger); err != nil {
|
||||
logger.Fatal().Err(err).Msg("failed to register the grpc service")
|
||||
}
|
||||
|
||||
@@ -95,7 +95,7 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
sync.Trap(&gr, cancel)
|
||||
}
|
||||
|
||||
grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, uuid.Must(uuid.NewV4()).String(), cfg.GRPC.Addr, version.GetString())
|
||||
grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, cfg.GRPC.Addr, version.GetString())
|
||||
if err := registry.RegisterService(ctx, grpcSvc, logger); err != nil {
|
||||
logger.Fatal().Err(err).Msg("failed to register the grpc service")
|
||||
}
|
||||
|
||||
@@ -82,7 +82,7 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
sync.Trap(&gr, cancel)
|
||||
}
|
||||
|
||||
grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, uuid.Must(uuid.NewV4()).String(), cfg.GRPC.Addr, version.GetString())
|
||||
grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, cfg.GRPC.Addr, version.GetString())
|
||||
if err := registry.RegisterService(ctx, grpcSvc, logger); err != nil {
|
||||
logger.Fatal().Err(err).Msg("failed to register the grpc service")
|
||||
}
|
||||
|
||||
@@ -82,7 +82,7 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
sync.Trap(&gr, cancel)
|
||||
}
|
||||
|
||||
grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, uuid.Must(uuid.NewV4()).String(), cfg.GRPC.Addr, version.GetString())
|
||||
grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, cfg.GRPC.Addr, version.GetString())
|
||||
if err := registry.RegisterService(ctx, grpcSvc, logger); err != nil {
|
||||
logger.Fatal().Err(err).Msg("failed to register the grpc service")
|
||||
}
|
||||
|
||||
@@ -83,7 +83,7 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
sync.Trap(&gr, cancel)
|
||||
}
|
||||
|
||||
grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, uuid.Must(uuid.NewV4()).String(), cfg.GRPC.Addr, version.GetString())
|
||||
grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, cfg.GRPC.Addr, version.GetString())
|
||||
if err := registry.RegisterService(ctx, grpcSvc, logger); err != nil {
|
||||
logger.Fatal().Err(err).Msg("failed to register the grpc service")
|
||||
}
|
||||
|
||||
@@ -8,7 +8,6 @@ import (
|
||||
gatewayv1beta1 "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
|
||||
rpcv1beta1 "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
|
||||
"github.com/cs3org/reva/v2/pkg/mime"
|
||||
"github.com/gofrs/uuid"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/log"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/registry"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/version"
|
||||
@@ -19,7 +18,7 @@ import (
|
||||
// There are no explicit requirements for the context, and it will be passed
|
||||
// without changes to the underlying RegisterService method.
|
||||
func RegisterOcisService(ctx context.Context, cfg *config.Config, logger log.Logger) error {
|
||||
svc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name+"."+cfg.App.Name, uuid.Must(uuid.NewV4()).String(), cfg.GRPC.Addr, version.GetString())
|
||||
svc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name+"."+cfg.App.Name, cfg.GRPC.Addr, version.GetString())
|
||||
return registry.RegisterService(ctx, svc, logger)
|
||||
}
|
||||
|
||||
|
||||
@@ -86,7 +86,7 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
sync.Trap(&gr, cancel)
|
||||
}
|
||||
|
||||
httpSvc := registry.BuildHTTPService(cfg.HTTP.Namespace+"."+cfg.Service.Name, uuid.Must(uuid.NewV4()).String(), cfg.HTTP.Addr, version.GetString())
|
||||
httpSvc := registry.BuildHTTPService(cfg.HTTP.Namespace+"."+cfg.Service.Name, cfg.HTTP.Addr, version.GetString())
|
||||
if err := registry.RegisterService(ctx, httpSvc, logger); err != nil {
|
||||
logger.Fatal().Err(err).Msg("failed to register the http service")
|
||||
}
|
||||
|
||||
@@ -77,7 +77,7 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
cancel()
|
||||
})
|
||||
|
||||
grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, uuid.Must(uuid.NewV4()).String(), cfg.GRPC.Addr, version.GetString())
|
||||
grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, cfg.GRPC.Addr, version.GetString())
|
||||
if err := registry.RegisterService(ctx, grpcSvc, logger); err != nil {
|
||||
logger.Fatal().Err(err).Msg("failed to register the grpc service")
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ import (
|
||||
|
||||
func init() {
|
||||
r := registry.GetRegistry(registry.Inmemory())
|
||||
service := registry.BuildGRPCService("com.owncloud.api.gateway", "", "", "")
|
||||
service := registry.BuildGRPCService("com.owncloud.api.gateway", "", "")
|
||||
service.Nodes = []*mRegistry.Node{{
|
||||
Address: "any",
|
||||
}}
|
||||
|
||||
@@ -95,7 +95,7 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
sync.Trap(&gr, cancel)
|
||||
}
|
||||
|
||||
grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, uuid.Must(uuid.NewV4()).String(), cfg.GRPC.Addr, version.GetString())
|
||||
grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, cfg.GRPC.Addr, version.GetString())
|
||||
if err := registry.RegisterService(ctx, grpcSvc, logger); err != nil {
|
||||
logger.Fatal().Err(err).Msg("failed to register the grpc service")
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ import (
|
||||
|
||||
func init() {
|
||||
r := registry.GetRegistry(registry.Inmemory())
|
||||
service := registry.BuildGRPCService("com.owncloud.api.gateway", "", "", "")
|
||||
service := registry.BuildGRPCService("com.owncloud.api.gateway", "", "")
|
||||
service.Nodes = []*mRegistry.Node{{
|
||||
Address: "any",
|
||||
}}
|
||||
|
||||
@@ -83,12 +83,12 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
sync.Trap(&gr, cancel)
|
||||
}
|
||||
|
||||
grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, uuid.Must(uuid.NewV4()).String(), cfg.GRPC.Addr, version.GetString())
|
||||
grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, cfg.GRPC.Addr, version.GetString())
|
||||
if err := registry.RegisterService(ctx, grpcSvc, logger); err != nil {
|
||||
logger.Fatal().Err(err).Msg("failed to register the grpc service")
|
||||
}
|
||||
|
||||
httpSvc := registry.BuildHTTPService(cfg.HTTP.Namespace+"."+cfg.Service.Name, uuid.Must(uuid.NewV4()).String(), cfg.HTTP.Addr, version.GetString())
|
||||
httpSvc := registry.BuildHTTPService(cfg.HTTP.Namespace+"."+cfg.Service.Name, cfg.HTTP.Addr, version.GetString())
|
||||
if err := registry.RegisterService(ctx, httpSvc, logger); err != nil {
|
||||
logger.Fatal().Err(err).Msg("failed to register the http service")
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ import (
|
||||
|
||||
func init() {
|
||||
r := registry.GetRegistry(registry.Inmemory())
|
||||
service := registry.BuildGRPCService("com.owncloud.api.gateway", "", "", "")
|
||||
service := registry.BuildGRPCService("com.owncloud.api.gateway", "", "")
|
||||
service.Nodes = []*mRegistry.Node{{
|
||||
Address: "any",
|
||||
}}
|
||||
|
||||
@@ -99,7 +99,7 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
sync.Trap(&gr, cancel)
|
||||
}
|
||||
|
||||
grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, uuid.Must(uuid.NewV4()).String(), cfg.GRPC.Addr, version.GetString())
|
||||
grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, cfg.GRPC.Addr, version.GetString())
|
||||
if err := registry.RegisterService(ctx, grpcSvc, logger); err != nil {
|
||||
logger.Fatal().Err(err).Msg("failed to register the grpc service")
|
||||
}
|
||||
|
||||
@@ -82,7 +82,7 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
sync.Trap(&gr, cancel)
|
||||
}
|
||||
|
||||
grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, uuid.Must(uuid.NewV4()).String(), cfg.GRPC.Addr, version.GetString())
|
||||
grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, cfg.GRPC.Addr, version.GetString())
|
||||
if err := registry.RegisterService(ctx, grpcSvc, logger); err != nil {
|
||||
logger.Fatal().Err(err).Msg("failed to register the grpc service")
|
||||
}
|
||||
|
||||
@@ -82,7 +82,7 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
sync.Trap(&gr, cancel)
|
||||
}
|
||||
|
||||
grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, uuid.Must(uuid.NewV4()).String(), cfg.GRPC.Addr, version.GetString())
|
||||
grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, cfg.GRPC.Addr, version.GetString())
|
||||
if err := registry.RegisterService(ctx, grpcSvc, logger); err != nil {
|
||||
logger.Fatal().Err(err).Msg("failed to register the grpc service")
|
||||
}
|
||||
|
||||
@@ -82,12 +82,12 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
sync.Trap(&gr, cancel)
|
||||
}
|
||||
|
||||
grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, uuid.Must(uuid.NewV4()).String(), cfg.GRPC.Addr, version.GetString())
|
||||
grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, cfg.GRPC.Addr, version.GetString())
|
||||
if err := registry.RegisterService(ctx, grpcSvc, logger); err != nil {
|
||||
logger.Fatal().Err(err).Msg("failed to register the grpc service")
|
||||
}
|
||||
|
||||
httpScv := registry.BuildHTTPService(cfg.HTTP.Namespace+"."+cfg.Service.Name, uuid.Must(uuid.NewV4()).String(), cfg.HTTP.Addr, version.GetString())
|
||||
httpScv := registry.BuildHTTPService(cfg.HTTP.Namespace+"."+cfg.Service.Name, cfg.HTTP.Addr, version.GetString())
|
||||
if err := registry.RegisterService(ctx, httpScv, logger); err != nil {
|
||||
logger.Fatal().Err(err).Msg("failed to register the http service")
|
||||
}
|
||||
|
||||
@@ -93,7 +93,7 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
sync.Trap(&gr, cancel)
|
||||
}
|
||||
|
||||
grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, uuid.Must(uuid.NewV4()).String(), cfg.GRPC.Addr, version.GetString())
|
||||
grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, cfg.GRPC.Addr, version.GetString())
|
||||
if err := registry.RegisterService(ctx, grpcSvc, logger); err != nil {
|
||||
logger.Fatal().Err(err).Msg("failed to register the grpc service")
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ import (
|
||||
|
||||
func init() {
|
||||
r := registry.GetRegistry(registry.Inmemory())
|
||||
service := registry.BuildGRPCService("com.owncloud.api.gateway", "", "", "")
|
||||
service := registry.BuildGRPCService("com.owncloud.api.gateway", "", "")
|
||||
service.Nodes = []*mRegistry.Node{{
|
||||
Address: "any",
|
||||
}}
|
||||
|
||||
@@ -12,7 +12,7 @@ import (
|
||||
|
||||
func init() {
|
||||
r := registry.GetRegistry(registry.Inmemory())
|
||||
service := registry.BuildGRPCService("com.owncloud.api.gateway", "", "", "")
|
||||
service := registry.BuildGRPCService("com.owncloud.api.gateway", "", "")
|
||||
service.Nodes = []*mRegistry.Node{{
|
||||
Address: "any",
|
||||
}}
|
||||
|
||||
@@ -95,7 +95,7 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
sync.Trap(&gr, cancel)
|
||||
}
|
||||
|
||||
grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, uuid.Must(uuid.NewV4()).String(), cfg.GRPC.Addr, version.GetString())
|
||||
grpcSvc := registry.BuildGRPCService(cfg.GRPC.Namespace+"."+cfg.Service.Name, cfg.GRPC.Addr, version.GetString())
|
||||
if err := registry.RegisterService(ctx, grpcSvc, logger); err != nil {
|
||||
logger.Fatal().Err(err).Msg("failed to register the grpc service")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user