Merge pull request #9620 from kobergj/FixNatsjsRegistry

Repair Natsjskv Registry
This commit is contained in:
kobergj
2024-07-18 09:57:00 +02:00
committed by GitHub
7 changed files with 159 additions and 43 deletions

View File

@@ -0,0 +1,5 @@
Bugfix: Repair nats-js-kv registry
The registry would always send traffic to only one pod. This is now fixed and load should be spread evenly. Also implements watcher method so the cache can use it.
https://github.com/owncloud/ocis/pull/9620

2
go.mod
View File

@@ -365,6 +365,8 @@ replace github.com/egirna/icap-client => github.com/fschade/icap-client v0.0.0-2
replace github.com/unrolled/secure => github.com/DeepDiver1975/secure v0.0.0-20240611112133-abc838fb797c
replace github.com/go-micro/plugins/v4/store/nats-js-kv => github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20240716134540-6dfbf5819fbf
// exclude the v2 line of go-sqlite3 which was released accidentally and prevents pulling in newer versions of go-sqlite3
// see https://github.com/mattn/go-sqlite3/issues/965 for more details
exclude github.com/mattn/go-sqlite3 v2.0.3+incompatible

4
go.sum
View File

@@ -1218,8 +1218,6 @@ github.com/go-micro/plugins/v4/server/http v1.2.2 h1:UK2/09AU0zV3wHELuR72TZzVU2v
github.com/go-micro/plugins/v4/server/http v1.2.2/go.mod h1:YuAjaSPxcn3LI8j2FUsqx0Rxunrj4YwDV41Ax76rLl0=
github.com/go-micro/plugins/v4/store/nats-js v1.2.1-0.20231129143103-d72facc652f0 h1:Qa1EBQ9UyCGecFAJQovl/MHGnvbcvDaM3qUoAG5Lnvk=
github.com/go-micro/plugins/v4/store/nats-js v1.2.1-0.20231129143103-d72facc652f0/go.mod h1:aCRl8JQmqIaonOl88nFPY/BOQnHPVHY9ngStzLkXnYk=
github.com/go-micro/plugins/v4/store/nats-js-kv v0.0.0-20231226212146-94a49ba3e06e h1:hwH0qXT0J3UFYRi0UD+e3ItL92oW+jdPFA+3o/j6ASg=
github.com/go-micro/plugins/v4/store/nats-js-kv v0.0.0-20231226212146-94a49ba3e06e/go.mod h1:Goi4eJ9SrKkxE6NsAVqBVNxfQFbwb7UbyII6743ldgM=
github.com/go-micro/plugins/v4/store/redis v1.2.1 h1:d9kwr9bSpoK9vkHkqcv+isQUbgBCHpfwCV57pcAPS6c=
github.com/go-micro/plugins/v4/store/redis v1.2.1/go.mod h1:MbCG0YiyPqETTtm7uHFmxQNCaW1o9hBoYtFwhbVjLUg=
github.com/go-micro/plugins/v4/transport/grpc v1.1.0 h1:mXfDYfFQLnVDzjGY3o84oe4prfux9h8txsnA19dKsj8=
@@ -1613,6 +1611,8 @@ github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa02
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc=
github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20240716134540-6dfbf5819fbf h1:X4Hm7mZFAE+vJZ62mcXuH9BywmKiAr9B4V5LQLcTr70=
github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20240716134540-6dfbf5819fbf/go.mod h1:pjcozWijkNPbEtX5SIQaxEW/h8VAVZYTLx+70bmB3LY=
github.com/kolo/xmlrpc v0.0.0-20200310150728-e0350524596b/go.mod h1:o03bZfuBwAXHetKXuInt4S7omeXUu62/A845kiycsSQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=

View File

@@ -12,6 +12,7 @@ import (
"time"
natsjskv "github.com/go-micro/plugins/v4/store/nats-js-kv"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
"go-micro.dev/v4/registry"
"go-micro.dev/v4/store"
@@ -23,6 +24,8 @@ var (
_registryAddressEnv = "MICRO_REGISTRY_ADDRESS"
_registryUsernameEnv = "MICRO_REGISTRY_AUTH_USERNAME"
_registryPasswordEnv = "MICRO_REGISTRY_AUTH_PASSWORD"
_serviceDelimiter = "/"
)
func init() {
@@ -80,71 +83,51 @@ func (n *storeregistry) Register(s *registry.Service, _ ...registry.RegisterOpti
if s == nil {
return errors.New("wont store nil service")
}
unique := uuid.New().String()
if s.Metadata == nil {
s.Metadata = make(map[string]string)
}
s.Metadata["uuid"] = unique
b, err := json.Marshal(s)
if err != nil {
return err
}
return n.store.Write(&store.Record{
Key: s.Name,
Key: s.Name + _serviceDelimiter + unique,
Value: b,
Expiry: n.expiry,
})
}
// Deregister removes a service from the registry
// Deregister removes a service from the registry.
func (n *storeregistry) Deregister(s *registry.Service, _ ...registry.DeregisterOption) error {
n.lock.RLock()
defer n.lock.RUnlock()
return n.store.Delete(s.Name)
var unique string
if s.Metadata != nil {
unique = s.Metadata["uuid"]
}
return n.store.Delete(s.Name + _serviceDelimiter + unique)
}
// GetService gets a specific service from the registry
func (n *storeregistry) GetService(s string, _ ...registry.GetOption) ([]*registry.Service, error) {
n.lock.RLock()
defer n.lock.RUnlock()
recs, err := n.store.Read(s)
if err != nil {
return nil, err
}
svcs := make([]*registry.Service, 0, len(recs))
for _, rec := range recs {
var s registry.Service
if err := json.Unmarshal(rec.Value, &s); err != nil {
return nil, err
}
svcs = append(svcs, &s)
}
return svcs, nil
// avoid listing e.g. `webfinger` when requesting `web` by adding the delimiter to the service name
return n.listServices(store.ListPrefix(s + _serviceDelimiter))
}
// ListServices lists all registered services
func (n *storeregistry) ListServices(...registry.ListOption) ([]*registry.Service, error) {
n.lock.RLock()
defer n.lock.RUnlock()
keys, err := n.store.List()
if err != nil {
return nil, err
}
var svcs []*registry.Service
for _, k := range keys {
s, err := n.GetService(k)
if err != nil {
// TODO: continue ?
return nil, err
}
svcs = append(svcs, s...)
}
return svcs, nil
return n.listServices()
}
// Watch allowes following the changes in the registry if it would be implemented
func (n *storeregistry) Watch(...registry.WatchOption) (registry.Watcher, error) {
return nil, errors.New("watcher not implemented")
return NewWatcher(n)
}
// String returns the name of the registry
@@ -152,6 +135,43 @@ func (n *storeregistry) String() string {
return n.typ
}
func (n *storeregistry) listServices(opts ...store.ListOption) ([]*registry.Service, error) {
n.lock.RLock()
defer n.lock.RUnlock()
keys, err := n.store.List(opts...)
if err != nil {
return nil, err
}
svcs := make([]*registry.Service, 0, len(keys))
for _, k := range keys {
s, err := n.getService(k)
if err != nil {
// TODO: continue ?
return nil, err
}
svcs = append(svcs, s)
}
return svcs, nil
}
func (n *storeregistry) getService(s string) (*registry.Service, error) {
recs, err := n.store.Read(s)
if err != nil {
return nil, err
}
if len(recs) == 0 {
return nil, registry.ErrNotFound
}
var svc registry.Service
if err := json.Unmarshal(recs[0].Value, &svc); err != nil {
return nil, err
}
return &svc, nil
}
func (n *storeregistry) storeOptions(opts registry.Options) []store.Option {
storeoptions := []store.Option{
store.Database("service-registry"),

View File

@@ -0,0 +1,74 @@
package natsjsregistry
import (
"errors"
"github.com/nats-io/nats.go"
"go-micro.dev/v4/registry"
)
// NatsWatcher is the watcher of the nats interface
type NatsWatcher interface {
WatchAll(bucket string, opts ...nats.WatchOpt) (nats.KeyWatcher, error)
}
// Watcher is used to keep track of changes in the registry
type Watcher struct {
watch nats.KeyWatcher
updates <-chan nats.KeyValueEntry
reg *storeregistry
}
// NewWatcher returns a new watcher
func NewWatcher(s *storeregistry) (*Watcher, error) {
w, ok := s.store.(NatsWatcher)
if !ok {
return nil, errors.New("store does not implement watcher interface")
}
watcher, err := w.WatchAll("service-registry")
if err != nil {
return nil, err
}
return &Watcher{
watch: watcher,
updates: watcher.Updates(),
reg: s,
}, nil
}
// Next returns the next result. It is a blocking call
func (w *Watcher) Next() (*registry.Result, error) {
kve := <-w.updates
if kve == nil {
return nil, errors.New("watcher stopped")
}
service, err := w.reg.getService(kve.Key())
if err != nil {
return nil, err
}
var action string
switch kve.Operation() {
default:
action = "create"
case nats.KeyValuePut:
action = "create"
case nats.KeyValueDelete:
action = "delete"
case nats.KeyValuePurge:
action = "delete"
}
return &registry.Result{
Service: service,
Action: action,
}, nil
}
// Stop stops the watcher
func (w *Watcher) Stop() {
_ = w.watch.Stop()
}

View File

@@ -335,6 +335,20 @@ func (n *natsStore) String() string {
return "NATS JetStream KeyValueStore"
}
// WatchAll exposes the watcher interface from the underlying JetStreamContext.
func (n *natsStore) WatchAll(bucket string, opts ...nats.WatchOpt) (nats.KeyWatcher, error) {
if bucket == "" {
return nil, errors.New("multi bucket watching is not supported")
}
b, err := n.js.KeyValue(bucket)
if err != nil {
return nil, errors.Wrap(err, "Failed to get bucket")
}
return b.WatchAll(opts...)
}
// thread safe way to initialize the connection.
func (n *natsStore) initConn() error {
if n.hasConn() {
@@ -397,7 +411,7 @@ func (n *natsStore) mustGetBucket(kv *nats.KeyValueConfig) (nats.KeyValue, error
func (n *natsStore) getRecord(bucket nats.KeyValue, key string) (*store.Record, bool, error) {
obj, err := bucket.Get(key)
if errors.Is(err, nats.ErrKeyNotFound) {
return nil, false, nil
return nil, false, store.ErrNotFound
} else if err != nil {
return nil, false, errors.Wrap(err, "Failed to get object from bucket")
}

3
vendor/modules.txt vendored
View File

@@ -966,7 +966,7 @@ github.com/go-micro/plugins/v4/server/http
# github.com/go-micro/plugins/v4/store/nats-js v1.2.1-0.20231129143103-d72facc652f0
## explicit; go 1.21
github.com/go-micro/plugins/v4/store/nats-js
# github.com/go-micro/plugins/v4/store/nats-js-kv v0.0.0-20231226212146-94a49ba3e06e
# github.com/go-micro/plugins/v4/store/nats-js-kv v0.0.0-20231226212146-94a49ba3e06e => github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20240716134540-6dfbf5819fbf
## explicit; go 1.21
github.com/go-micro/plugins/v4/store/nats-js-kv
# github.com/go-micro/plugins/v4/store/redis v1.2.1
@@ -2435,3 +2435,4 @@ stash.kopano.io/kgol/rndm
# github.com/studio-b12/gowebdav => github.com/aduffeck/gowebdav v0.0.0-20231215102054-212d4a4374f6
# github.com/egirna/icap-client => github.com/fschade/icap-client v0.0.0-20240123094924-5af178158eaf
# github.com/unrolled/secure => github.com/DeepDiver1975/secure v0.0.0-20240611112133-abc838fb797c
# github.com/go-micro/plugins/v4/store/nats-js-kv => github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20240716134540-6dfbf5819fbf