fix(registry): Fix TTL and registration interval

Signed-off-by: jkoberg <jkoberg@owncloud.com>
This commit is contained in:
jkoberg
2024-07-19 11:33:26 +02:00
parent 6467a4797a
commit 0c0866711c
6 changed files with 51 additions and 11 deletions

View File

@@ -2,4 +2,5 @@ Bugfix: Repair nats-js-kv registry
The registry would always send traffic to only one pod. This is now fixed and load should be spread evenly. Also implements watcher method so the cache can use it.
https://github.com/owncloud/ocis/pull/9654
https://github.com/owncloud/ocis/pull/9620

View File

@@ -180,6 +180,10 @@ func (n *storeregistry) storeOptions(opts registry.Options) []store.Option {
natsjskv.EncodeKeys(),
}
if ttl, ok := opts.Context.Value(expiryKey{}).(time.Duration); ok {
storeoptions = append(storeoptions, natsjskv.DefaultTTL(ttl))
}
addr := []string{"127.0.0.1:9233"}
if len(opts.Addrs) > 0 {
addr = opts.Addrs

View File

@@ -0,0 +1,33 @@
package registry
import (
"os"
"time"
)
const (
_registryRegisterIntervalEnv = "EXPERIMENTAL_REGISTER_INTERVAL"
_registryRegisterTTLEnv = "EXPERIMENTAL_REGISTER_TTL"
// Note: _defaultRegisterInterval should always be lower than _defaultRegisterTTL
_defaultRegisterInterval = time.Second * 25
_defaultRegisterTTL = time.Second * 30
)
// GetRegisterInterval returns the register interval from the environment.
func GetRegisterInterval() time.Duration {
d, err := time.ParseDuration(os.Getenv(_registryRegisterIntervalEnv))
if err != nil {
return _defaultRegisterInterval
}
return d
}
// GetRegisterTTL returns the register TTL from the environment.
func GetRegisterTTL() time.Duration {
d, err := time.ParseDuration(os.Getenv(_registryRegisterTTLEnv))
if err != nil {
return _defaultRegisterTTL
}
return d
}

View File

@@ -31,11 +31,12 @@ var (
// Config is the config for a registry
type Config struct {
Type string `mapstructure:"type"`
Addresses []string `mapstructure:"addresses"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
DisableCache bool `mapstructure:"disable_cache"`
Type string `mapstructure:"type"`
Addresses []string `mapstructure:"addresses"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
DisableCache bool `mapstructure:"disable_cache"`
TTL time.Duration `mapstructure:"ttl"`
}
// Option allows configuring the registry
@@ -62,6 +63,7 @@ func GetRegistry(opts ...Option) mRegistry.Registry {
case "natsjs", "nats-js", "nats-js-kv": // for backwards compatibility - we will stick with one of those
_reg = natsjsregistry.NewRegistry(
mRegistry.Addrs(cfg.Addresses...),
natsjsregistry.ServiceExpiry(cfg.TTL),
)
case "memory":
_reg = memr.NewRegistry()
@@ -119,6 +121,8 @@ func getEnvs(opts ...Option) *Config {
cfg.Addresses = s
}
cfg.TTL = GetRegisterTTL()
for _, o := range opts {
o(cfg)
}

View File

@@ -4,7 +4,6 @@ import (
"crypto/tls"
"fmt"
"strings"
"time"
mgrpcs "github.com/go-micro/plugins/v4/server/grpc"
"github.com/go-micro/plugins/v4/wrapper/monitoring/prometheus"
@@ -59,8 +58,8 @@ func NewServiceWithClient(client client.Client, opts ...Option) (Service, error)
micro.Version(sopts.Version),
micro.Context(sopts.Context),
micro.Registry(registry.GetRegistry()),
micro.RegisterTTL(time.Second * 30),
micro.RegisterInterval(time.Second * 10),
micro.RegisterTTL(registry.GetRegisterTTL()),
micro.RegisterInterval(registry.GetRegisterInterval()),
micro.WrapHandler(prometheus.NewHandlerWrapper()),
micro.WrapClient(mtracer.NewClientWrapper(
mtracer.WithTraceProvider(sopts.TraceProvider),

View File

@@ -4,7 +4,6 @@ import (
"crypto/tls"
"fmt"
"strings"
"time"
"github.com/owncloud/ocis/v2/ocis-pkg/broker"
"github.com/owncloud/ocis/v2/ocis-pkg/registry"
@@ -65,8 +64,8 @@ func NewService(opts ...Option) (Service, error) {
micro.Context(sopts.Context),
micro.Flags(sopts.Flags...),
micro.Registry(registry.GetRegistry()),
micro.RegisterTTL(time.Second * 30),
micro.RegisterInterval(time.Second * 10),
micro.RegisterTTL(registry.GetRegisterTTL()),
micro.RegisterInterval(registry.GetRegisterInterval()),
micro.WrapClient(mtracer.NewClientWrapper(
mtracer.WithTraceProvider(sopts.TraceProvider),
)),