mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2025-12-31 01:10:20 -06:00
fix(natsjsregistry): fix reconnects
Signed-off-by: jkoberg <jkoberg@owncloud.com>
This commit is contained in:
5
changelog/unreleased/nats-reconnects.md
Normal file
5
changelog/unreleased/nats-reconnects.md
Normal file
@@ -0,0 +1,5 @@
|
||||
Bugfix: Nats reconnects
|
||||
|
||||
Natsjs kv registry could not handle reconnects correctly. This fixes it.
|
||||
|
||||
https://github.com/owncloud/ocis/pull/8880
|
||||
@@ -5,8 +5,10 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
natsjskv "github.com/go-micro/plugins/v4/store/nats-js-kv"
|
||||
@@ -36,12 +38,13 @@ func NewRegistry(opts ...registry.Option) registry.Registry {
|
||||
o(&options)
|
||||
}
|
||||
exp, _ := options.Context.Value(expiryKey{}).(time.Duration)
|
||||
return &storeregistry{
|
||||
n := &storeregistry{
|
||||
opts: options,
|
||||
store: natsjskv.NewStore(storeOptions(options)...),
|
||||
typ: _registryName,
|
||||
expiry: exp,
|
||||
}
|
||||
n.store = natsjskv.NewStore(n.storeOptions(options)...)
|
||||
return n
|
||||
}
|
||||
|
||||
type storeregistry struct {
|
||||
@@ -49,14 +52,19 @@ type storeregistry struct {
|
||||
store store.Store
|
||||
typ string
|
||||
expiry time.Duration
|
||||
lock sync.RWMutex
|
||||
}
|
||||
|
||||
// Init inits the registry
|
||||
func (n *storeregistry) Init(opts ...registry.Option) error {
|
||||
n.lock.Lock()
|
||||
defer n.lock.Unlock()
|
||||
|
||||
for _, o := range opts {
|
||||
o(&n.opts)
|
||||
}
|
||||
return n.store.Init(storeOptions(n.opts)...)
|
||||
n.store = natsjskv.NewStore(n.storeOptions(n.opts)...)
|
||||
return n.store.Init(n.storeOptions(n.opts)...)
|
||||
}
|
||||
|
||||
// Options returns the configured options
|
||||
@@ -66,6 +74,9 @@ func (n *storeregistry) Options() registry.Options {
|
||||
|
||||
// Register adds a service to the registry
|
||||
func (n *storeregistry) Register(s *registry.Service, _ ...registry.RegisterOption) error {
|
||||
n.lock.RLock()
|
||||
defer n.lock.RUnlock()
|
||||
|
||||
if s == nil {
|
||||
return errors.New("wont store nil service")
|
||||
}
|
||||
@@ -82,11 +93,17 @@ func (n *storeregistry) Register(s *registry.Service, _ ...registry.RegisterOpti
|
||||
|
||||
// Deregister removes a service from the registry
|
||||
func (n *storeregistry) Deregister(s *registry.Service, _ ...registry.DeregisterOption) error {
|
||||
n.lock.RLock()
|
||||
defer n.lock.RUnlock()
|
||||
|
||||
return n.store.Delete(s.Name)
|
||||
}
|
||||
|
||||
// GetService gets a specific service from the registry
|
||||
func (n *storeregistry) GetService(s string, _ ...registry.GetOption) ([]*registry.Service, error) {
|
||||
n.lock.RLock()
|
||||
defer n.lock.RUnlock()
|
||||
|
||||
recs, err := n.store.Read(s)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -104,6 +121,9 @@ func (n *storeregistry) GetService(s string, _ ...registry.GetOption) ([]*regist
|
||||
|
||||
// ListServices lists all registered services
|
||||
func (n *storeregistry) ListServices(...registry.ListOption) ([]*registry.Service, error) {
|
||||
n.lock.RLock()
|
||||
defer n.lock.RUnlock()
|
||||
|
||||
keys, err := n.store.List()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -132,7 +152,7 @@ func (n *storeregistry) String() string {
|
||||
return n.typ
|
||||
}
|
||||
|
||||
func storeOptions(opts registry.Options) []store.Option {
|
||||
func (n *storeregistry) storeOptions(opts registry.Options) []store.Option {
|
||||
storeoptions := []store.Option{
|
||||
store.Database("service-registry"),
|
||||
store.Table("service-registry"),
|
||||
@@ -150,6 +170,16 @@ func storeOptions(opts registry.Options) []store.Option {
|
||||
natsOptions := nats.GetDefaultOptions()
|
||||
natsOptions.Name = "nats-js-kv-registry"
|
||||
natsOptions.User, natsOptions.Password = getAuth()
|
||||
natsOptions.ReconnectedCB = func(_ *nats.Conn) {
|
||||
if err := n.Init(); err != nil {
|
||||
fmt.Println("cannot reconnect to nats")
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
natsOptions.ClosedCB = func(_ *nats.Conn) {
|
||||
fmt.Println("nats connection closed")
|
||||
os.Exit(1)
|
||||
}
|
||||
storeoptions = append(storeoptions, natsjskv.NatsOptions(natsOptions))
|
||||
|
||||
if so, ok := opts.Context.Value(storeOptionsKey{}).([]store.Option); ok {
|
||||
|
||||
Reference in New Issue
Block a user