fix(natsjsregistry): spread load evenly among instances

Signed-off-by: jkoberg <jkoberg@owncloud.com>
This commit is contained in:
jkoberg
2024-07-16 12:19:48 +02:00
parent 4c613776ec
commit 60d4305bab
3 changed files with 138 additions and 39 deletions

View File

@@ -0,0 +1,5 @@
Fix: Repair nats-js-kv registry
The registry would always send traffic to only one pod. This is now fixed and load should be spread evenly. Also implements watcher method so the cache can use it.
https://github.com/owncloud/ocis/pull/9618

View File

@@ -12,6 +12,7 @@ import (
"time"
natsjskv "github.com/go-micro/plugins/v4/store/nats-js-kv"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
"go-micro.dev/v4/registry"
"go-micro.dev/v4/store"
@@ -23,6 +24,8 @@ var (
_registryAddressEnv = "MICRO_REGISTRY_ADDRESS"
_registryUsernameEnv = "MICRO_REGISTRY_AUTH_USERNAME"
_registryPasswordEnv = "MICRO_REGISTRY_AUTH_PASSWORD"
_serviceDelimiter = "/"
)
func init() {
@@ -80,71 +83,51 @@ func (n *storeregistry) Register(s *registry.Service, _ ...registry.RegisterOpti
if s == nil {
return errors.New("wont store nil service")
}
unique := uuid.New().String()
if s.Metadata == nil {
s.Metadata = make(map[string]string)
}
s.Metadata["uuid"] = unique
b, err := json.Marshal(s)
if err != nil {
return err
}
return n.store.Write(&store.Record{
Key: s.Name,
Key: s.Name + _serviceDelimiter + unique,
Value: b,
Expiry: n.expiry,
})
}
// Deregister removes a service from the registry
// Deregister removes a service from the registry.
func (n *storeregistry) Deregister(s *registry.Service, _ ...registry.DeregisterOption) error {
n.lock.RLock()
defer n.lock.RUnlock()
return n.store.Delete(s.Name)
var unique string
if s.Metadata != nil {
unique = s.Metadata["uuid"]
}
return n.store.Delete(s.Name + _serviceDelimiter + unique)
}
// GetService gets a specific service from the registry
func (n *storeregistry) GetService(s string, _ ...registry.GetOption) ([]*registry.Service, error) {
n.lock.RLock()
defer n.lock.RUnlock()
recs, err := n.store.Read(s)
if err != nil {
return nil, err
}
svcs := make([]*registry.Service, 0, len(recs))
for _, rec := range recs {
var s registry.Service
if err := json.Unmarshal(rec.Value, &s); err != nil {
return nil, err
}
svcs = append(svcs, &s)
}
return svcs, nil
// avoid listing e.g. `webfinger` when requesting `web` by adding the delimiter to the service name
return n.listServices(store.ListPrefix(s + _serviceDelimiter))
}
// ListServices lists all registered services
func (n *storeregistry) ListServices(...registry.ListOption) ([]*registry.Service, error) {
n.lock.RLock()
defer n.lock.RUnlock()
keys, err := n.store.List()
if err != nil {
return nil, err
}
var svcs []*registry.Service
for _, k := range keys {
s, err := n.GetService(k)
if err != nil {
// TODO: continue ?
return nil, err
}
svcs = append(svcs, s...)
}
return svcs, nil
return n.listServices()
}
// Watch allowes following the changes in the registry if it would be implemented
func (n *storeregistry) Watch(...registry.WatchOption) (registry.Watcher, error) {
return nil, errors.New("watcher not implemented")
return NewWatcher(n)
}
// String returns the name of the registry
@@ -152,6 +135,43 @@ func (n *storeregistry) String() string {
return n.typ
}
func (n *storeregistry) listServices(opts ...store.ListOption) ([]*registry.Service, error) {
n.lock.RLock()
defer n.lock.RUnlock()
keys, err := n.store.List(opts...)
if err != nil {
return nil, err
}
svcs := make([]*registry.Service, 0, len(keys))
for _, k := range keys {
s, err := n.getService(k)
if err != nil {
// TODO: continue ?
return nil, err
}
svcs = append(svcs, s)
}
return svcs, nil
}
func (n *storeregistry) getService(s string) (*registry.Service, error) {
recs, err := n.store.Read(s)
if err != nil {
return nil, err
}
if len(recs) == 0 {
return nil, registry.ErrNotFound
}
var svc registry.Service
if err := json.Unmarshal(recs[0].Value, &svc); err != nil {
return nil, err
}
return &svc, nil
}
func (n *storeregistry) storeOptions(opts registry.Options) []store.Option {
storeoptions := []store.Option{
store.Database("service-registry"),

View File

@@ -0,0 +1,74 @@
package natsjsregistry
import (
"errors"
"github.com/nats-io/nats.go"
"go-micro.dev/v4/registry"
)
// NatsWatcher is the watcher of the nats interface
type NatsWatcher interface {
Watch(bucket string) (nats.KeyWatcher, error)
}
// Watcher is used to keep track of changes in the registry
type Watcher struct {
watch nats.KeyWatcher
updates <-chan nats.KeyValueEntry
reg *storeregistry
}
// NewWatcher returns a new watcher
func NewWatcher(s *storeregistry) (*Watcher, error) {
w, ok := s.store.(NatsWatcher)
if !ok {
return nil, errors.New("store does not implement watcher interface")
}
watcher, err := w.Watch("service-registry")
if err != nil {
return nil, err
}
return &Watcher{
watch: watcher,
updates: watcher.Updates(),
reg: s,
}, nil
}
// Next returns the next result. It is a blocking call
func (w *Watcher) Next() (*registry.Result, error) {
kve := <-w.updates
if kve == nil {
return nil, errors.New("watcher stopped")
}
service, err := w.reg.getService(kve.Key())
if err != nil {
return nil, err
}
var action string
switch kve.Operation() {
default:
action = "create"
case nats.KeyValuePut:
action = "create"
case nats.KeyValueDelete:
action = "delete"
case nats.KeyValuePurge:
action = "delete"
}
return &registry.Result{
Service: service,
Action: action,
}, nil
}
// Stop stops the watcher
func (w *Watcher) Stop() {
_ = w.watch.Stop()
}