bump go-micro nats-js store

Signed-off-by: jkoberg <jkoberg@owncloud.com>
This commit is contained in:
jkoberg
2023-10-18 10:48:36 +02:00
parent 7f57f68725
commit c8f487d804
6 changed files with 122 additions and 68 deletions

View File

@@ -6,7 +6,7 @@ import (
"go-micro.dev/v4/store"
)
// setStoreOption returns a function to setup a context with given value
// setStoreOption returns a function to setup a context with given value.
func setStoreOption(k, v interface{}) store.Option {
return func(o *store.Options) {
if o.Context == nil {

View File

@@ -34,14 +34,14 @@ type natsStore struct {
conn *nats.Conn
js nats.JetStreamContext
buckets map[string]nats.ObjectStore
buckets *sync.Map
}
func init() {
cmd.DefaultStores["natsjs"] = NewStore
}
// NewStore will create a new NATS JetStream Object Store
// NewStore will create a new NATS JetStream Object Store.
func NewStore(opts ...store.Option) store.Store {
options := store.Options{
Nodes: []string{},
@@ -55,7 +55,7 @@ func NewStore(opts ...store.Option) store.Store {
opts: options,
jsopts: []nats.JSOpt{},
objStoreConfigs: []*nats.ObjectStoreConfig{},
buckets: map[string]nats.ObjectStore{},
buckets: &sync.Map{},
storageType: nats.FileStorage,
}
@@ -64,7 +64,9 @@ func NewStore(opts ...store.Option) store.Store {
return n
}
// Init initialises the store. It must perform any required setup on the backing storage implementation and check that it is ready for use, returning any errors.
// Init initializes the store. It must perform any required setup on the
// backing storage implementation and check that it is ready for use,
// returning any errors.
func (n *natsStore) Init(opts ...store.Option) error {
n.setOption(opts...)
@@ -101,7 +103,7 @@ func (n *natsStore) Init(opts ...store.Option) error {
if err != nil {
return errors.Wrapf(err, "Failed to create bucket (%s)", cfg.Bucket)
}
n.buckets[cfg.Bucket] = store
n.buckets.Store(cfg.Bucket, store)
}
return nil
@@ -159,10 +161,8 @@ func (n *natsStore) Options() store.Options {
// Read takes a single key name and optional ReadOptions. It returns matching []*Record or an error.
func (n *natsStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) {
if n.conn == nil {
if err := n.Init(); err != nil {
return nil, err
}
if err := n.initConn(); err != nil {
return nil, err
}
opt := store.ReadOptions{}
@@ -178,51 +178,70 @@ func (n *natsStore) Read(key string, opts ...store.ReadOption) ([]*store.Record,
opt.Table = n.opts.Table
}
bucket, ok := n.buckets[opt.Database]
b, ok := n.buckets.Load(opt.Database)
if !ok {
return nil, ErrBucketNotFound
}
bucket := b.(nats.ObjectStore)
var keys []string
objects, err := bucket.List()
if err == nats.ErrNoObjectsFound {
return []*store.Record{}, nil
} else if err != nil {
return nil, errors.Wrap(err, "Failed to list objects")
var keyPrefix, keySuffix string
switch {
case opt.Prefix:
keyPrefix = getKey(key, opt.Table)
case opt.Suffix:
keySuffix = key
default:
keys = []string{getKey(key, opt.Table)}
}
for _, obj := range objects {
name := obj.Name
if (!opt.Prefix && !opt.Suffix) && getKey(key, opt.Table) != name {
continue
}
if opt.Prefix && !strings.HasPrefix(name, getKey(key, opt.Table)) {
continue
if len(keys) == 0 {
objects, err := bucket.List()
if err == nats.ErrNoObjectsFound {
return []*store.Record{}, nil
} else if err != nil {
return nil, errors.Wrap(err, "Failed to list objects")
}
if opt.Suffix && !strings.HasSuffix(name, key) {
continue
for _, obj := range objects {
name := obj.Name
if !strings.HasPrefix(name, opt.Table) {
continue
}
if (!opt.Prefix && !opt.Suffix) && key != name {
continue
}
if opt.Prefix && !strings.HasPrefix(name, keyPrefix) {
continue
}
if opt.Suffix && !strings.HasSuffix(name, keySuffix) {
continue
}
keys = append(keys, name)
}
keys = append(keys, name)
}
records := []*store.Record{}
for _, key := range keys {
obj, err := bucket.Get(key)
if err != nil {
if err == nats.ErrObjectNotFound {
return []*store.Record{}, nil
} else if err != nil {
return nil, errors.Wrap(err, "Failed to get object from bucket")
}
b, err := io.ReadAll(obj)
if err != nil {
return nil, errors.Wrap(err, "Failed to read returned bytes")
}
defer obj.Close()
info, err := obj.Info()
if err != nil {
return nil, errors.Wrap(err, "Failed to fetch record info")
}
if info.Deleted {
continue
}
metadata := map[string]interface{}{}
for key, value := range info.Headers {
var val interface{}
@@ -232,14 +251,17 @@ func (n *natsStore) Read(key string, opts ...store.ReadOption) ([]*store.Record,
metadata[key] = val
}
b, err := io.ReadAll(obj)
if err != nil {
return nil, errors.Wrap(err, "Failed to read returned bytes")
}
records = append(records, &store.Record{
Key: key,
Value: b,
Metadata: metadata,
})
// Why is there a close method?
obj.Close()
}
if opt.Limit > 0 {
@@ -253,10 +275,8 @@ func (n *natsStore) Read(key string, opts ...store.ReadOption) ([]*store.Record,
// Write writes a record to the store, and returns an error if the record was not written.
func (n *natsStore) Write(r *store.Record, opts ...store.WriteOption) error {
if n.conn == nil {
if err := n.Init(); err != nil {
return err
}
if err := n.initConn(); err != nil {
return err
}
opt := store.WriteOptions{}
@@ -272,8 +292,8 @@ func (n *natsStore) Write(r *store.Record, opts ...store.WriteOption) error {
opt.Table = n.opts.Table
}
store, ok := n.buckets[opt.Database]
s, ok := n.buckets.Load(opt.Database)
store, _ := s.(nats.ObjectStore)
// Create new bucket if not exists
if !ok {
var err error
@@ -307,10 +327,8 @@ func (n *natsStore) Write(r *store.Record, opts ...store.WriteOption) error {
// Delete removes the record with the corresponding key from the store.
func (n *natsStore) Delete(key string, opts ...store.DeleteOption) error {
if n.conn == nil {
if err := n.Init(); err != nil {
return err
}
if err := n.initConn(); err != nil {
return err
}
opt := store.DeleteOptions{}
@@ -327,18 +345,19 @@ func (n *natsStore) Delete(key string, opts ...store.DeleteOption) error {
}
if opt.Table == "DELETE_BUCKET" {
delete(n.buckets, key)
n.buckets.Delete(key)
if err := n.js.DeleteObjectStore(key); err != nil {
return errors.Wrap(err, "Failed to delete bucket")
}
return nil
}
store, ok := n.buckets[opt.Database]
s, ok := n.buckets.Load(opt.Database)
if !ok {
return ErrBucketNotFound
}
store := s.(nats.ObjectStore)
if err := store.Delete(getKey(key, opt.Table)); err != nil {
return errors.Wrap(err, "Failed to delete data")
}
@@ -347,10 +366,8 @@ func (n *natsStore) Delete(key string, opts ...store.DeleteOption) error {
// List returns any keys that match, or an empty list with no error if none matched.
func (n *natsStore) List(opts ...store.ListOption) ([]string, error) {
if n.conn == nil {
if err := n.Init(); err != nil {
return nil, err
}
if err := n.initConn(); err != nil {
return nil, err
}
opt := store.ListOptions{}
@@ -366,10 +383,11 @@ func (n *natsStore) List(opts ...store.ListOption) ([]string, error) {
opt.Table = n.opts.Table
}
store, ok := n.buckets[opt.Database]
s, ok := n.buckets.Load(opt.Database)
if !ok {
return nil, ErrBucketNotFound
}
store := s.(nats.ObjectStore)
objects, err := store.List()
if err != nil {
@@ -399,7 +417,7 @@ func (n *natsStore) List(opts ...store.ListOption) ([]string, error) {
return keys, nil
}
// Close the store
// Close the store.
func (n *natsStore) Close() error {
n.conn.Close()
return nil
@@ -430,6 +448,31 @@ func (n *natsStore) createNewBucket(name string) (nats.ObjectStore, error) {
if err != nil {
return nil, errors.Wrapf(err, "Failed to create new bucket (%s)", name)
}
n.buckets[name] = store
n.buckets.Store(name, store)
return store, err
}
// thread safe way to initialize the connection.
func (n *natsStore) initConn() error {
if n.hasConn() {
return nil
}
n.Lock()
defer n.Unlock()
// check if conn was initialized meanwhile
if n.conn != nil {
return nil
}
return n.Init()
}
// thread safe way to check if n is initialized.
func (n *natsStore) hasConn() bool {
n.RLock()
defer n.RUnlock()
return n.conn != nil
}

View File

@@ -7,7 +7,7 @@ import (
"go-micro.dev/v4/store"
)
// store.Option
// store.Option.
type natsOptionsKey struct{}
type jsOptionsKey struct{}
type objOptionsKey struct{}
@@ -15,15 +15,15 @@ type ttlOptionsKey struct{}
type memoryOptionsKey struct{}
type descriptionOptionsKey struct{}
// store.DeleteOption
// store.DeleteOption.
type delBucketOptionsKey struct{}
// NatsOptions accepts nats.Options
// NatsOptions accepts nats.Options.
func NatsOptions(opts nats.Options) store.Option {
return setStoreOption(natsOptionsKey{}, opts)
}
// JetStreamOptions accepts multiple nats.JSOpt
// JetStreamOptions accepts multiple nats.JSOpt.
func JetStreamOptions(opts ...nats.JSOpt) store.Option {
return setStoreOption(jsOptionsKey{}, opts)
}
@@ -35,34 +35,42 @@ func ObjectStoreOptions(cfg ...*nats.ObjectStoreConfig) store.Option {
}
// DefaultTTL sets the default TTL to use for new buckets
// By default no TTL is set.
//
// By default no TTL is set.
//
// TTL ON INDIVIDUAL WRITE CALLS IS NOT SUPPORTED, only bucket wide TTL.
// Either set a default TTL with this option or provide bucket specific options
// with ObjectStoreOptions
//
// with ObjectStoreOptions
func DefaultTTL(ttl time.Duration) store.Option {
return setStoreOption(ttlOptionsKey{}, ttl)
}
// DefaultMemory sets the default storage type to memory only.
//
// The default is file storage, persisting storage between service restarts.
// The default is file storage, persisting storage between service restarts.
//
// Be aware that the default storage location of NATS the /tmp dir is, and thus
// won't persist reboots.
//
// won't persist reboots.
func DefaultMemory() store.Option {
return setStoreOption(memoryOptionsKey{}, nats.MemoryStorage)
}
// DefaultDescription sets the default description to use when creating new
// buckets. The default is "Store managed by go-micro"
//
// buckets. The default is "Store managed by go-micro"
func DefaultDescription(text string) store.Option {
return setStoreOption(descriptionOptionsKey{}, text)
}
// DeleteBucket will use the key passed to Delete as a bucket (database) name,
// and delete the bucket.
//
// and delete the bucket.
//
// This option should not be combined with the store.DeleteFrom option, as
// that will overwrite the delete action.
//
// that will overwrite the delete action.
func DeleteBucket() store.DeleteOption {
return func(d *store.DeleteOptions) {
d.Table = "DELETE_BUCKET"