Merge pull request #9740 from owncloud/nats-registry-fixes

Nats registry fixes
This commit is contained in:
Jörn Friedrich Dreyer
2024-08-07 15:02:03 +02:00
committed by GitHub
4 changed files with 33 additions and 9 deletions

View File

@@ -1,6 +1,8 @@
Bugfix: Repair nats-js-kv registry
The registry would always send traffic to only one pod. This is now fixed and load should be spread evenly. Also implements watcher method so the cache can use it.
Internally, it can now distinguish services by version and will aggregate all nodes of the same version into a single service, as expected by the registry cache and watcher.
https://github.com/owncloud/ocis/pull/9734
https://github.com/owncloud/ocis/pull/9726
https://github.com/owncloud/ocis/pull/9656

View File

@@ -95,7 +95,7 @@ func (n *storeregistry) Register(s *registry.Service, opts ...registry.RegisterO
return err
}
return n.store.Write(&store.Record{
Key: s.Name + _serviceDelimiter + server.DefaultId,
Key: s.Name + _serviceDelimiter + server.DefaultId + _serviceDelimiter + s.Version,
Value: b,
Expiry: options.TTL,
})
@@ -105,7 +105,7 @@ func (n *storeregistry) Register(s *registry.Service, opts ...registry.RegisterO
func (n *storeregistry) Deregister(s *registry.Service, _ ...registry.DeregisterOption) error {
n.lock.RLock()
defer n.lock.RUnlock()
return n.store.Delete(s.Name + _serviceDelimiter + server.DefaultId)
return n.store.Delete(s.Name + _serviceDelimiter + server.DefaultId + _serviceDelimiter + s.Version)
}
// GetService gets a specific service from the registry
@@ -138,20 +138,28 @@ func (n *storeregistry) listServices(opts ...store.ListOption) ([]*registry.Serv
return nil, err
}
svcs := make([]*registry.Service, 0, len(keys))
versions := map[string]*registry.Service{}
for _, k := range keys {
s, err := n.getService(k)
s, err := n.getNode(k)
if err != nil {
// TODO: continue ?
return nil, err
}
if versions[s.Version] == nil {
versions[s.Version] = s
} else {
versions[s.Version].Nodes = append(versions[s.Version].Nodes, s.Nodes...)
}
}
svcs := make([]*registry.Service, 0, len(versions))
for _, s := range versions {
svcs = append(svcs, s)
}
return svcs, nil
}
func (n *storeregistry) getService(s string) (*registry.Service, error) {
// getNode retrieves a node from the store. It returns a service to also keep track of the version.
func (n *storeregistry) getNode(s string) (*registry.Service, error) {
recs, err := n.store.Read(s)
if err != nil {
return nil, err

View File

@@ -3,6 +3,7 @@ package natsjsregistry
import (
"encoding/json"
"errors"
"strings"
natsjskv "github.com/go-micro/plugins/v4/store/nats-js-kv"
"github.com/nats-io/nats.go"
@@ -48,9 +49,21 @@ func (w *Watcher) Next() (*registry.Result, error) {
}
var svc registry.Service
if err := json.Unmarshal(kve.Value.Data, &svc); err != nil {
_ = w.stop()
return nil, err
if kve.Value.Data == nil {
// fake a service
parts := strings.SplitN(kve.Value.Key, _serviceDelimiter, 3)
if len(parts) != 3 {
return nil, errors.New("invalid service key")
}
svc.Name = parts[0]
// ocis registers nodes with a - separator
svc.Nodes = []*registry.Node{{Id: parts[0] + "-" + parts[1]}}
svc.Version = parts[2]
} else {
if err := json.Unmarshal(kve.Value.Data, &svc); err != nil {
_ = w.stop()
return nil, err
}
}
return &registry.Result{

View File

@@ -65,6 +65,7 @@ func BuildHTTPService(serviceID, address string, version string) *mRegistry.Serv
}
node := &mRegistry.Node{
// This id is read by the registry watcher
Id: serviceID + "-" + server.DefaultId,
Address: net.JoinHostPort(addr, fmt.Sprint(port)),
Metadata: make(map[string]string),