mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-01-06 04:09:40 -06:00
update reva to v2.17.0
This commit is contained in:
191
vendor/github.com/go-micro/plugins/v4/store/nats-js-kv/LICENSE
generated
vendored
Normal file
191
vendor/github.com/go-micro/plugins/v4/store/nats-js-kv/LICENSE
generated
vendored
Normal file
@@ -0,0 +1,191 @@
|
||||
|
||||
Apache License
|
||||
Version 2.0, January 2004
|
||||
http://www.apache.org/licenses/
|
||||
|
||||
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
|
||||
|
||||
1. Definitions.
|
||||
|
||||
"License" shall mean the terms and conditions for use, reproduction,
|
||||
and distribution as defined by Sections 1 through 9 of this document.
|
||||
|
||||
"Licensor" shall mean the copyright owner or entity authorized by
|
||||
the copyright owner that is granting the License.
|
||||
|
||||
"Legal Entity" shall mean the union of the acting entity and all
|
||||
other entities that control, are controlled by, or are under common
|
||||
control with that entity. For the purposes of this definition,
|
||||
"control" means (i) the power, direct or indirect, to cause the
|
||||
direction or management of such entity, whether by contract or
|
||||
otherwise, or (ii) ownership of fifty percent (50%) or more of the
|
||||
outstanding shares, or (iii) beneficial ownership of such entity.
|
||||
|
||||
"You" (or "Your") shall mean an individual or Legal Entity
|
||||
exercising permissions granted by this License.
|
||||
|
||||
"Source" form shall mean the preferred form for making modifications,
|
||||
including but not limited to software source code, documentation
|
||||
source, and configuration files.
|
||||
|
||||
"Object" form shall mean any form resulting from mechanical
|
||||
transformation or translation of a Source form, including but
|
||||
not limited to compiled object code, generated documentation,
|
||||
and conversions to other media types.
|
||||
|
||||
"Work" shall mean the work of authorship, whether in Source or
|
||||
Object form, made available under the License, as indicated by a
|
||||
copyright notice that is included in or attached to the work
|
||||
(an example is provided in the Appendix below).
|
||||
|
||||
"Derivative Works" shall mean any work, whether in Source or Object
|
||||
form, that is based on (or derived from) the Work and for which the
|
||||
editorial revisions, annotations, elaborations, or other modifications
|
||||
represent, as a whole, an original work of authorship. For the purposes
|
||||
of this License, Derivative Works shall not include works that remain
|
||||
separable from, or merely link (or bind by name) to the interfaces of,
|
||||
the Work and Derivative Works thereof.
|
||||
|
||||
"Contribution" shall mean any work of authorship, including
|
||||
the original version of the Work and any modifications or additions
|
||||
to that Work or Derivative Works thereof, that is intentionally
|
||||
submitted to Licensor for inclusion in the Work by the copyright owner
|
||||
or by an individual or Legal Entity authorized to submit on behalf of
|
||||
the copyright owner. For the purposes of this definition, "submitted"
|
||||
means any form of electronic, verbal, or written communication sent
|
||||
to the Licensor or its representatives, including but not limited to
|
||||
communication on electronic mailing lists, source code control systems,
|
||||
and issue tracking systems that are managed by, or on behalf of, the
|
||||
Licensor for the purpose of discussing and improving the Work, but
|
||||
excluding communication that is conspicuously marked or otherwise
|
||||
designated in writing by the copyright owner as "Not a Contribution."
|
||||
|
||||
"Contributor" shall mean Licensor and any individual or Legal Entity
|
||||
on behalf of whom a Contribution has been received by Licensor and
|
||||
subsequently incorporated within the Work.
|
||||
|
||||
2. Grant of Copyright License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
copyright license to reproduce, prepare Derivative Works of,
|
||||
publicly display, publicly perform, sublicense, and distribute the
|
||||
Work and such Derivative Works in Source or Object form.
|
||||
|
||||
3. Grant of Patent License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
(except as stated in this section) patent license to make, have made,
|
||||
use, offer to sell, sell, import, and otherwise transfer the Work,
|
||||
where such license applies only to those patent claims licensable
|
||||
by such Contributor that are necessarily infringed by their
|
||||
Contribution(s) alone or by combination of their Contribution(s)
|
||||
with the Work to which such Contribution(s) was submitted. If You
|
||||
institute patent litigation against any entity (including a
|
||||
cross-claim or counterclaim in a lawsuit) alleging that the Work
|
||||
or a Contribution incorporated within the Work constitutes direct
|
||||
or contributory patent infringement, then any patent licenses
|
||||
granted to You under this License for that Work shall terminate
|
||||
as of the date such litigation is filed.
|
||||
|
||||
4. Redistribution. You may reproduce and distribute copies of the
|
||||
Work or Derivative Works thereof in any medium, with or without
|
||||
modifications, and in Source or Object form, provided that You
|
||||
meet the following conditions:
|
||||
|
||||
(a) You must give any other recipients of the Work or
|
||||
Derivative Works a copy of this License; and
|
||||
|
||||
(b) You must cause any modified files to carry prominent notices
|
||||
stating that You changed the files; and
|
||||
|
||||
(c) You must retain, in the Source form of any Derivative Works
|
||||
that You distribute, all copyright, patent, trademark, and
|
||||
attribution notices from the Source form of the Work,
|
||||
excluding those notices that do not pertain to any part of
|
||||
the Derivative Works; and
|
||||
|
||||
(d) If the Work includes a "NOTICE" text file as part of its
|
||||
distribution, then any Derivative Works that You distribute must
|
||||
include a readable copy of the attribution notices contained
|
||||
within such NOTICE file, excluding those notices that do not
|
||||
pertain to any part of the Derivative Works, in at least one
|
||||
of the following places: within a NOTICE text file distributed
|
||||
as part of the Derivative Works; within the Source form or
|
||||
documentation, if provided along with the Derivative Works; or,
|
||||
within a display generated by the Derivative Works, if and
|
||||
wherever such third-party notices normally appear. The contents
|
||||
of the NOTICE file are for informational purposes only and
|
||||
do not modify the License. You may add Your own attribution
|
||||
notices within Derivative Works that You distribute, alongside
|
||||
or as an addendum to the NOTICE text from the Work, provided
|
||||
that such additional attribution notices cannot be construed
|
||||
as modifying the License.
|
||||
|
||||
You may add Your own copyright statement to Your modifications and
|
||||
may provide additional or different license terms and conditions
|
||||
for use, reproduction, or distribution of Your modifications, or
|
||||
for any such Derivative Works as a whole, provided Your use,
|
||||
reproduction, and distribution of the Work otherwise complies with
|
||||
the conditions stated in this License.
|
||||
|
||||
5. Submission of Contributions. Unless You explicitly state otherwise,
|
||||
any Contribution intentionally submitted for inclusion in the Work
|
||||
by You to the Licensor shall be under the terms and conditions of
|
||||
this License, without any additional terms or conditions.
|
||||
Notwithstanding the above, nothing herein shall supersede or modify
|
||||
the terms of any separate license agreement you may have executed
|
||||
with Licensor regarding such Contributions.
|
||||
|
||||
6. Trademarks. This License does not grant permission to use the trade
|
||||
names, trademarks, service marks, or product names of the Licensor,
|
||||
except as required for reasonable and customary use in describing the
|
||||
origin of the Work and reproducing the content of the NOTICE file.
|
||||
|
||||
7. Disclaimer of Warranty. Unless required by applicable law or
|
||||
agreed to in writing, Licensor provides the Work (and each
|
||||
Contributor provides its Contributions) on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
implied, including, without limitation, any warranties or conditions
|
||||
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
|
||||
PARTICULAR PURPOSE. You are solely responsible for determining the
|
||||
appropriateness of using or redistributing the Work and assume any
|
||||
risks associated with Your exercise of permissions under this License.
|
||||
|
||||
8. Limitation of Liability. In no event and under no legal theory,
|
||||
whether in tort (including negligence), contract, or otherwise,
|
||||
unless required by applicable law (such as deliberate and grossly
|
||||
negligent acts) or agreed to in writing, shall any Contributor be
|
||||
liable to You for damages, including any direct, indirect, special,
|
||||
incidental, or consequential damages of any character arising as a
|
||||
result of this License or out of the use or inability to use the
|
||||
Work (including but not limited to damages for loss of goodwill,
|
||||
work stoppage, computer failure or malfunction, or any and all
|
||||
other commercial damages or losses), even if such Contributor
|
||||
has been advised of the possibility of such damages.
|
||||
|
||||
9. Accepting Warranty or Additional Liability. While redistributing
|
||||
the Work or Derivative Works thereof, You may choose to offer,
|
||||
and charge a fee for, acceptance of support, warranty, indemnity,
|
||||
or other liability obligations and/or rights consistent with this
|
||||
License. However, in accepting such obligations, You may act only
|
||||
on Your own behalf and on Your sole responsibility, not on behalf
|
||||
of any other Contributor, and only if You agree to indemnify,
|
||||
defend, and hold each Contributor harmless for any liability
|
||||
incurred by, or claims asserted against, such Contributor by reason
|
||||
of your accepting any such warranty or additional liability.
|
||||
|
||||
END OF TERMS AND CONDITIONS
|
||||
|
||||
Copyright 2015 Asim Aslam.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
79
vendor/github.com/go-micro/plugins/v4/store/nats-js-kv/README.md
generated
vendored
Normal file
79
vendor/github.com/go-micro/plugins/v4/store/nats-js-kv/README.md
generated
vendored
Normal file
@@ -0,0 +1,79 @@
|
||||
# NATS JetStream Key Value Store Plugin
|
||||
|
||||
This plugin uses the NATS JetStream [KeyValue Store](https://docs.nats.io/nats-concepts/jetstream/key-value-store) to implement the Go-Micro store interface.
|
||||
|
||||
You can use this plugin like any other store plugin.
|
||||
To start a local NATS JetStream server run `nats-server -js`.
|
||||
|
||||
To manually create a new storage object call:
|
||||
|
||||
```go
|
||||
natsjskv.NewStore(opts ...store.Option)
|
||||
```
|
||||
|
||||
The Go-Micro store interface uses databases and tables to store keys. These translate
|
||||
to buckets (key value stores) and key prefixes. If no database (bucket name) is provided, "default" will be used.
|
||||
|
||||
You can call `Write` with any arbitrary database name, and if a bucket with that name does not exist yet,
|
||||
it will be automatically created.
|
||||
|
||||
If a table name is provided, it will use it to prefix the key as `<table>_<key>`.
|
||||
|
||||
To delete a bucket, and all the key/value pairs in it, pass the `DeleteBucket` option to the `Delete`
|
||||
method, then they key name will be interpreted as a bucket name, and the bucket will be deleted.
|
||||
|
||||
Next to the default store options, a few NATS specific options are available:
|
||||
|
||||
|
||||
```go
|
||||
// NatsOptions accepts nats.Options
|
||||
NatsOptions(opts nats.Options)
|
||||
|
||||
// JetStreamOptions accepts multiple nats.JSOpt
|
||||
JetStreamOptions(opts ...nats.JSOpt)
|
||||
|
||||
// KeyValueOptions accepts multiple nats.KeyValueConfig
|
||||
// This will create buckets with the provided configs at initialization.
|
||||
//
|
||||
// type KeyValueConfig struct {
|
||||
// Bucket string
|
||||
// Description string
|
||||
// MaxValueSize int32
|
||||
// History uint8
|
||||
// TTL time.Duration
|
||||
// MaxBytes int64
|
||||
// Storage StorageType
|
||||
// Replicas int
|
||||
// Placement *Placement
|
||||
// RePublish *RePublish
|
||||
// Mirror *StreamSource
|
||||
// Sources []*StreamSource
|
||||
}
|
||||
KeyValueOptions(cfg ...*nats.KeyValueConfig)
|
||||
|
||||
// DefaultTTL sets the default TTL to use for new buckets
|
||||
// By default no TTL is set.
|
||||
//
|
||||
// TTL ON INDIVIDUAL WRITE CALLS IS NOT SUPPORTED, only bucket wide TTL.
|
||||
// Either set a default TTL with this option or provide bucket specific options
|
||||
// with ObjectStoreOptions
|
||||
DefaultTTL(ttl time.Duration)
|
||||
|
||||
// DefaultMemory sets the default storage type to memory only.
|
||||
//
|
||||
// The default is file storage, persisting storage between service restarts.
|
||||
// Be aware that the default storage location of NATS the /tmp dir is, and thus
|
||||
// won't persist reboots.
|
||||
DefaultMemory()
|
||||
|
||||
// DefaultDescription sets the default description to use when creating new
|
||||
// buckets. The default is "Store managed by go-micro"
|
||||
DefaultDescription(text string)
|
||||
|
||||
// DeleteBucket will use the key passed to Delete as a bucket (database) name,
|
||||
// and delete the bucket.
|
||||
// This option should not be combined with the store.DeleteFrom option, as
|
||||
// that will overwrite the delete action.
|
||||
DeleteBucket()
|
||||
```
|
||||
|
||||
18
vendor/github.com/go-micro/plugins/v4/store/nats-js-kv/context.go
generated
vendored
Normal file
18
vendor/github.com/go-micro/plugins/v4/store/nats-js-kv/context.go
generated
vendored
Normal file
@@ -0,0 +1,18 @@
|
||||
package natsjskv
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go-micro.dev/v4/store"
|
||||
)
|
||||
|
||||
// setStoreOption returns a function to setup a context with given value.
|
||||
func setStoreOption(k, v interface{}) store.Option {
|
||||
return func(o *store.Options) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
|
||||
o.Context = context.WithValue(o.Context, k, v)
|
||||
}
|
||||
}
|
||||
109
vendor/github.com/go-micro/plugins/v4/store/nats-js-kv/keys.go
generated
vendored
Normal file
109
vendor/github.com/go-micro/plugins/v4/store/nats-js-kv/keys.go
generated
vendored
Normal file
@@ -0,0 +1,109 @@
|
||||
package natsjskv
|
||||
|
||||
import (
|
||||
"encoding/base32"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// NatsKey is a convenience function to create a key for the nats kv store.
|
||||
func NatsKey(table, microkey string) string {
|
||||
return NewKey(table, microkey, "").NatsKey()
|
||||
}
|
||||
|
||||
// MicroKey is a convenience function to create a key for the micro interface.
|
||||
func MicroKey(table, natskey string) string {
|
||||
return NewKey(table, "", natskey).MicroKey()
|
||||
}
|
||||
|
||||
// MicroKeyFilter is a convenience function to create a key for the micro interface.
|
||||
// It returns false if the key does not match the table, prefix or suffix.
|
||||
func MicroKeyFilter(table, natskey string, prefix, suffix string) (string, bool) {
|
||||
k := NewKey(table, "", natskey)
|
||||
return k.MicroKey(), k.Check(table, prefix, suffix)
|
||||
}
|
||||
|
||||
// Key represents a key in the store.
|
||||
// They are used to convert nats keys (base64 encoded) to micro keys (plain text - no table prefix) and vice versa.
|
||||
type Key struct {
|
||||
// Plain is the plain key as requested by the go-micro interface.
|
||||
Plain string
|
||||
// Full is the full key including the table prefix.
|
||||
Full string
|
||||
// Encoded is the base64 encoded key as used by the nats kv store.
|
||||
Encoded string
|
||||
}
|
||||
|
||||
// NewKey creates a new key. Either plain or encoded must be set.
|
||||
func NewKey(table string, plain, encoded string) *Key {
|
||||
k := &Key{
|
||||
Plain: plain,
|
||||
Encoded: encoded,
|
||||
}
|
||||
|
||||
switch {
|
||||
case k.Plain != "":
|
||||
k.Full = getKey(k.Plain, table)
|
||||
k.Encoded = encode(k.Full)
|
||||
case k.Encoded != "":
|
||||
k.Full = decode(k.Encoded)
|
||||
k.Plain = trimKey(k.Full, table)
|
||||
}
|
||||
|
||||
return k
|
||||
}
|
||||
|
||||
// NatsKey returns a key the nats kv store can work with.
|
||||
func (k *Key) NatsKey() string {
|
||||
return k.Encoded
|
||||
}
|
||||
|
||||
// MicroKey returns a key the micro interface can work with.
|
||||
func (k *Key) MicroKey() string {
|
||||
return k.Plain
|
||||
}
|
||||
|
||||
// Check returns false if the key does not match the table, prefix or suffix.
|
||||
func (k *Key) Check(table, prefix, suffix string) bool {
|
||||
if table != "" && k.Full != getKey(k.Plain, table) {
|
||||
return false
|
||||
}
|
||||
|
||||
if prefix != "" && !strings.HasPrefix(k.Plain, prefix) {
|
||||
return false
|
||||
}
|
||||
|
||||
if suffix != "" && !strings.HasSuffix(k.Plain, suffix) {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func encode(s string) string {
|
||||
return base32.StdEncoding.EncodeToString([]byte(s))
|
||||
}
|
||||
|
||||
func decode(s string) string {
|
||||
b, err := base32.StdEncoding.DecodeString(s)
|
||||
if err != nil {
|
||||
return s
|
||||
}
|
||||
|
||||
return string(b)
|
||||
}
|
||||
|
||||
func getKey(key, table string) string {
|
||||
if table != "" {
|
||||
return table + "_" + key
|
||||
}
|
||||
|
||||
return key
|
||||
}
|
||||
|
||||
func trimKey(key, table string) string {
|
||||
if table != "" {
|
||||
return strings.TrimPrefix(key, table+"_")
|
||||
}
|
||||
|
||||
return key
|
||||
}
|
||||
479
vendor/github.com/go-micro/plugins/v4/store/nats-js-kv/nats.go
generated
vendored
Normal file
479
vendor/github.com/go-micro/plugins/v4/store/nats-js-kv/nats.go
generated
vendored
Normal file
@@ -0,0 +1,479 @@
|
||||
// Package natsjskv is a go-micro store plugin for NATS JetStream Key-Value store.
|
||||
package natsjskv
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cornelk/hashmap"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/pkg/errors"
|
||||
"go-micro.dev/v4/store"
|
||||
"go-micro.dev/v4/util/cmd"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrBucketNotFound is returned when the requested bucket does not exist.
|
||||
ErrBucketNotFound = errors.New("Bucket (database) not found")
|
||||
)
|
||||
|
||||
// KeyValueEnvelope is the data structure stored in the key value store.
|
||||
type KeyValueEnvelope struct {
|
||||
Key string `json:"key"`
|
||||
Data []byte `json:"data"`
|
||||
Metadata map[string]interface{} `json:"metadata"`
|
||||
}
|
||||
|
||||
type natsStore struct {
|
||||
sync.Once
|
||||
sync.RWMutex
|
||||
|
||||
ttl time.Duration
|
||||
storageType nats.StorageType
|
||||
description string
|
||||
|
||||
opts store.Options
|
||||
nopts nats.Options
|
||||
jsopts []nats.JSOpt
|
||||
kvConfigs []*nats.KeyValueConfig
|
||||
|
||||
conn *nats.Conn
|
||||
js nats.JetStreamContext
|
||||
buckets *hashmap.Map[string, nats.KeyValue]
|
||||
}
|
||||
|
||||
func init() {
|
||||
cmd.DefaultStores["natsjskv"] = NewStore
|
||||
}
|
||||
|
||||
// NewStore will create a new NATS JetStream Object Store.
|
||||
func NewStore(opts ...store.Option) store.Store {
|
||||
options := store.Options{
|
||||
Nodes: []string{},
|
||||
Database: "default",
|
||||
Table: "",
|
||||
Context: context.Background(),
|
||||
}
|
||||
|
||||
n := &natsStore{
|
||||
description: "KeyValue storage administered by go-micro store plugin",
|
||||
opts: options,
|
||||
jsopts: []nats.JSOpt{},
|
||||
kvConfigs: []*nats.KeyValueConfig{},
|
||||
buckets: hashmap.New[string, nats.KeyValue](),
|
||||
storageType: nats.FileStorage,
|
||||
}
|
||||
|
||||
n.setOption(opts...)
|
||||
|
||||
return n
|
||||
}
|
||||
|
||||
// Init initializes the store. It must perform any required setup on the
|
||||
// backing storage implementation and check that it is ready for use,
|
||||
// returning any errors.
|
||||
func (n *natsStore) Init(opts ...store.Option) error {
|
||||
n.setOption(opts...)
|
||||
|
||||
// Connect to NATS servers
|
||||
conn, err := n.nopts.Connect()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Failed to connect to NATS Server")
|
||||
}
|
||||
|
||||
// Create JetStream context
|
||||
js, err := conn.JetStream(n.jsopts...)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Failed to create JetStream context")
|
||||
}
|
||||
|
||||
n.conn = conn
|
||||
n.js = js
|
||||
|
||||
// Create default config if no configs present
|
||||
if len(n.kvConfigs) == 0 {
|
||||
if _, err := n.mustGetBucketByName(n.opts.Database); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Create kv store buckets
|
||||
for _, cfg := range n.kvConfigs {
|
||||
if _, err := n.mustGetBucket(cfg); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *natsStore) setOption(opts ...store.Option) {
|
||||
for _, o := range opts {
|
||||
o(&n.opts)
|
||||
}
|
||||
|
||||
n.Once.Do(func() {
|
||||
n.nopts = nats.GetDefaultOptions()
|
||||
})
|
||||
|
||||
// Extract options from context
|
||||
if nopts, ok := n.opts.Context.Value(natsOptionsKey{}).(nats.Options); ok {
|
||||
n.nopts = nopts
|
||||
}
|
||||
|
||||
if jsopts, ok := n.opts.Context.Value(jsOptionsKey{}).([]nats.JSOpt); ok {
|
||||
n.jsopts = append(n.jsopts, jsopts...)
|
||||
}
|
||||
|
||||
if cfg, ok := n.opts.Context.Value(kvOptionsKey{}).([]*nats.KeyValueConfig); ok {
|
||||
n.kvConfigs = append(n.kvConfigs, cfg...)
|
||||
}
|
||||
|
||||
if ttl, ok := n.opts.Context.Value(ttlOptionsKey{}).(time.Duration); ok {
|
||||
n.ttl = ttl
|
||||
}
|
||||
|
||||
if sType, ok := n.opts.Context.Value(memoryOptionsKey{}).(nats.StorageType); ok {
|
||||
n.storageType = sType
|
||||
}
|
||||
|
||||
if text, ok := n.opts.Context.Value(descriptionOptionsKey{}).(string); ok {
|
||||
n.description = text
|
||||
}
|
||||
|
||||
// Assign store option server addresses to nats options
|
||||
if len(n.opts.Nodes) > 0 {
|
||||
n.nopts.Url = ""
|
||||
n.nopts.Servers = n.opts.Nodes
|
||||
}
|
||||
|
||||
if len(n.nopts.Servers) == 0 && n.nopts.Url == "" {
|
||||
n.nopts.Url = nats.DefaultURL
|
||||
}
|
||||
}
|
||||
|
||||
// Options allows you to view the current options.
|
||||
func (n *natsStore) Options() store.Options {
|
||||
return n.opts
|
||||
}
|
||||
|
||||
// Read takes a single key name and optional ReadOptions. It returns matching []*Record or an error.
|
||||
func (n *natsStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) {
|
||||
if err := n.initConn(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
opt := store.ReadOptions{}
|
||||
|
||||
for _, o := range opts {
|
||||
o(&opt)
|
||||
}
|
||||
|
||||
if opt.Database == "" {
|
||||
opt.Database = n.opts.Database
|
||||
}
|
||||
|
||||
if opt.Table == "" {
|
||||
opt.Table = n.opts.Table
|
||||
}
|
||||
|
||||
bucket, ok := n.buckets.Get(opt.Database)
|
||||
if !ok {
|
||||
return nil, ErrBucketNotFound
|
||||
}
|
||||
|
||||
keys, err := n.natsKeys(bucket, opt.Table, key, opt.Prefix, opt.Suffix)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
records := make([]*store.Record, 0, len(keys))
|
||||
|
||||
for _, key := range keys {
|
||||
rec, ok, err := n.getRecord(bucket, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if ok {
|
||||
records = append(records, rec)
|
||||
}
|
||||
}
|
||||
|
||||
return enforceLimits(records, opt.Limit, opt.Offset), nil
|
||||
}
|
||||
|
||||
// Write writes a record to the store, and returns an error if the record was not written.
|
||||
func (n *natsStore) Write(rec *store.Record, opts ...store.WriteOption) error {
|
||||
if err := n.initConn(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
opt := store.WriteOptions{}
|
||||
for _, o := range opts {
|
||||
o(&opt)
|
||||
}
|
||||
|
||||
if opt.Database == "" {
|
||||
opt.Database = n.opts.Database
|
||||
}
|
||||
|
||||
if opt.Table == "" {
|
||||
opt.Table = n.opts.Table
|
||||
}
|
||||
|
||||
store, err := n.mustGetBucketByName(opt.Database)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b, err := json.Marshal(KeyValueEnvelope{
|
||||
Key: rec.Key,
|
||||
Data: rec.Value,
|
||||
Metadata: rec.Metadata,
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Failed to marshal object")
|
||||
}
|
||||
|
||||
if _, err := store.Put(NatsKey(opt.Table, rec.Key), b); err != nil {
|
||||
return errors.Wrapf(err, "Failed to store data in bucket '%s'", NatsKey(opt.Table, rec.Key))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete removes the record with the corresponding key from the store.
|
||||
func (n *natsStore) Delete(key string, opts ...store.DeleteOption) error {
|
||||
if err := n.initConn(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
opt := store.DeleteOptions{}
|
||||
|
||||
for _, o := range opts {
|
||||
o(&opt)
|
||||
}
|
||||
|
||||
if opt.Database == "" {
|
||||
opt.Database = n.opts.Database
|
||||
}
|
||||
|
||||
if opt.Table == "" {
|
||||
opt.Table = n.opts.Table
|
||||
}
|
||||
|
||||
if opt.Table == "DELETE_BUCKET" {
|
||||
n.buckets.Del(key)
|
||||
|
||||
if err := n.js.DeleteKeyValue(key); err != nil {
|
||||
return errors.Wrap(err, "Failed to delete bucket")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
store, ok := n.buckets.Get(opt.Database)
|
||||
if !ok {
|
||||
return ErrBucketNotFound
|
||||
}
|
||||
|
||||
if err := store.Delete(NatsKey(opt.Table, key)); err != nil {
|
||||
return errors.Wrap(err, "Failed to delete data")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// List returns any keys that match, or an empty list with no error if none matched.
|
||||
func (n *natsStore) List(opts ...store.ListOption) ([]string, error) {
|
||||
if err := n.initConn(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
opt := store.ListOptions{}
|
||||
for _, o := range opts {
|
||||
o(&opt)
|
||||
}
|
||||
|
||||
if opt.Database == "" {
|
||||
opt.Database = n.opts.Database
|
||||
}
|
||||
|
||||
if opt.Table == "" {
|
||||
opt.Table = n.opts.Table
|
||||
}
|
||||
|
||||
store, ok := n.buckets.Get(opt.Database)
|
||||
if !ok {
|
||||
return nil, ErrBucketNotFound
|
||||
}
|
||||
|
||||
keys, err := n.microKeys(store, opt.Table, opt.Prefix, opt.Suffix)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Failed to list keys in bucket")
|
||||
}
|
||||
|
||||
return enforceLimits(keys, opt.Limit, opt.Offset), nil
|
||||
}
|
||||
|
||||
// Close the store.
|
||||
func (n *natsStore) Close() error {
|
||||
n.conn.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
// String returns the name of the implementation.
|
||||
func (n *natsStore) String() string {
|
||||
return "NATS JetStream KeyValueStore"
|
||||
}
|
||||
|
||||
// thread safe way to initialize the connection.
|
||||
func (n *natsStore) initConn() error {
|
||||
if n.hasConn() {
|
||||
return nil
|
||||
}
|
||||
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
|
||||
// check if conn was initialized meanwhile
|
||||
if n.conn != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return n.Init()
|
||||
}
|
||||
|
||||
// thread safe way to check if n is initialized.
|
||||
func (n *natsStore) hasConn() bool {
|
||||
n.RLock()
|
||||
defer n.RUnlock()
|
||||
|
||||
return n.conn != nil
|
||||
}
|
||||
|
||||
// mustGetDefaultBucket returns the bucket with the given name creating it with default configuration if needed.
|
||||
func (n *natsStore) mustGetBucketByName(name string) (nats.KeyValue, error) {
|
||||
return n.mustGetBucket(&nats.KeyValueConfig{
|
||||
Bucket: name,
|
||||
Description: n.description,
|
||||
TTL: n.ttl,
|
||||
Storage: n.storageType,
|
||||
})
|
||||
}
|
||||
|
||||
// mustGetBucket creates a new bucket if it does not exist yet.
|
||||
func (n *natsStore) mustGetBucket(kv *nats.KeyValueConfig) (nats.KeyValue, error) {
|
||||
if store, ok := n.buckets.Get(kv.Bucket); ok {
|
||||
return store, nil
|
||||
}
|
||||
|
||||
store, err := n.js.KeyValue(kv.Bucket)
|
||||
if err != nil {
|
||||
if !errors.Is(err, nats.ErrBucketNotFound) {
|
||||
return nil, errors.Wrapf(err, "Failed to get bucket (%s)", kv.Bucket)
|
||||
}
|
||||
|
||||
store, err = n.js.CreateKeyValue(kv)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "Failed to create bucket (%s)", kv.Bucket)
|
||||
}
|
||||
}
|
||||
|
||||
n.buckets.Set(kv.Bucket, store)
|
||||
|
||||
return store, nil
|
||||
}
|
||||
|
||||
// getRecord returns the record with the given key from the nats kv store.
|
||||
func (n *natsStore) getRecord(bucket nats.KeyValue, key string) (*store.Record, bool, error) {
|
||||
obj, err := bucket.Get(key)
|
||||
if errors.Is(err, nats.ErrKeyNotFound) {
|
||||
return nil, false, nil
|
||||
} else if err != nil {
|
||||
return nil, false, errors.Wrap(err, "Failed to get object from bucket")
|
||||
}
|
||||
|
||||
var kv KeyValueEnvelope
|
||||
if err := json.Unmarshal(obj.Value(), &kv); err != nil {
|
||||
return nil, false, errors.Wrap(err, "Failed to unmarshal object")
|
||||
}
|
||||
|
||||
if obj.Operation() != nats.KeyValuePut {
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
return &store.Record{
|
||||
Key: kv.Key,
|
||||
Value: kv.Data,
|
||||
Metadata: kv.Metadata,
|
||||
}, true, nil
|
||||
}
|
||||
|
||||
func (n *natsStore) natsKeys(bucket nats.KeyValue, table, key string, prefix, suffix bool) ([]string, error) {
|
||||
if !suffix && !prefix {
|
||||
return []string{NatsKey(table, key)}, nil
|
||||
}
|
||||
|
||||
toS := func(s string, b bool) string {
|
||||
if b {
|
||||
return s
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
||||
|
||||
keys, _, err := n.getKeys(bucket, table, toS(key, prefix), toS(key, suffix))
|
||||
|
||||
return keys, err
|
||||
}
|
||||
|
||||
func (n *natsStore) microKeys(bucket nats.KeyValue, table, prefix, suffix string) ([]string, error) {
|
||||
_, keys, err := n.getKeys(bucket, table, prefix, suffix)
|
||||
|
||||
return keys, err
|
||||
}
|
||||
|
||||
func (n *natsStore) getKeys(bucket nats.KeyValue, table string, prefix, suffix string) ([]string, []string, error) {
|
||||
names, err := bucket.Keys(nats.IgnoreDeletes())
|
||||
if errors.Is(err, nats.ErrKeyNotFound) {
|
||||
return []string{}, []string{}, nil
|
||||
} else if err != nil {
|
||||
return []string{}, []string{}, errors.Wrap(err, "Failed to list objects")
|
||||
}
|
||||
|
||||
natsKeys := make([]string, 0, len(names))
|
||||
microKeys := make([]string, 0, len(names))
|
||||
|
||||
for _, k := range names {
|
||||
mkey, ok := MicroKeyFilter(table, k, prefix, suffix)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
natsKeys = append(natsKeys, k)
|
||||
microKeys = append(microKeys, mkey)
|
||||
}
|
||||
|
||||
return natsKeys, microKeys, nil
|
||||
}
|
||||
|
||||
// enforces offset and limit without causing a panic.
|
||||
func enforceLimits[V any](recs []V, limit, offset uint) []V {
|
||||
l := uint(len(recs))
|
||||
|
||||
from := offset
|
||||
if from > l {
|
||||
from = l
|
||||
}
|
||||
|
||||
to := l
|
||||
if limit > 0 && offset+limit < l {
|
||||
to = offset + limit
|
||||
}
|
||||
|
||||
return recs[from:to]
|
||||
}
|
||||
75
vendor/github.com/go-micro/plugins/v4/store/nats-js-kv/options.go
generated
vendored
Normal file
75
vendor/github.com/go-micro/plugins/v4/store/nats-js-kv/options.go
generated
vendored
Normal file
@@ -0,0 +1,75 @@
|
||||
package natsjskv
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
"go-micro.dev/v4/store"
|
||||
)
|
||||
|
||||
// store.Option.
|
||||
type natsOptionsKey struct{}
|
||||
type jsOptionsKey struct{}
|
||||
type kvOptionsKey struct{}
|
||||
type ttlOptionsKey struct{}
|
||||
type memoryOptionsKey struct{}
|
||||
type descriptionOptionsKey struct{}
|
||||
|
||||
// NatsOptions accepts nats.Options.
|
||||
func NatsOptions(opts nats.Options) store.Option {
|
||||
return setStoreOption(natsOptionsKey{}, opts)
|
||||
}
|
||||
|
||||
// JetStreamOptions accepts multiple nats.JSOpt.
|
||||
func JetStreamOptions(opts ...nats.JSOpt) store.Option {
|
||||
return setStoreOption(jsOptionsKey{}, opts)
|
||||
}
|
||||
|
||||
// KeyValueOptions accepts multiple nats.KeyValueConfig
|
||||
// This will create buckets with the provided configs at initialization.
|
||||
func KeyValueOptions(cfg ...*nats.KeyValueConfig) store.Option {
|
||||
return setStoreOption(kvOptionsKey{}, cfg)
|
||||
}
|
||||
|
||||
// DefaultTTL sets the default TTL to use for new buckets
|
||||
//
|
||||
// By default no TTL is set.
|
||||
//
|
||||
// TTL ON INDIVIDUAL WRITE CALLS IS NOT SUPPORTED, only bucket wide TTL.
|
||||
// Either set a default TTL with this option or provide bucket specific options
|
||||
//
|
||||
// with ObjectStoreOptions
|
||||
func DefaultTTL(ttl time.Duration) store.Option {
|
||||
return setStoreOption(ttlOptionsKey{}, ttl)
|
||||
}
|
||||
|
||||
// DefaultMemory sets the default storage type to memory only.
|
||||
//
|
||||
// The default is file storage, persisting storage between service restarts.
|
||||
//
|
||||
// Be aware that the default storage location of NATS the /tmp dir is, and thus
|
||||
//
|
||||
// won't persist reboots.
|
||||
func DefaultMemory() store.Option {
|
||||
return setStoreOption(memoryOptionsKey{}, nats.MemoryStorage)
|
||||
}
|
||||
|
||||
// DefaultDescription sets the default description to use when creating new
|
||||
//
|
||||
// buckets. The default is "Store managed by go-micro"
|
||||
func DefaultDescription(text string) store.Option {
|
||||
return setStoreOption(descriptionOptionsKey{}, text)
|
||||
}
|
||||
|
||||
// DeleteBucket will use the key passed to Delete as a bucket (database) name,
|
||||
//
|
||||
// and delete the bucket.
|
||||
//
|
||||
// This option should not be combined with the store.DeleteFrom option, as
|
||||
//
|
||||
// that will overwrite the delete action.
|
||||
func DeleteBucket() store.DeleteOption {
|
||||
return func(d *store.DeleteOptions) {
|
||||
d.Table = "DELETE_BUCKET"
|
||||
}
|
||||
}
|
||||
138
vendor/github.com/go-micro/plugins/v4/store/nats-js-kv/test_data.go
generated
vendored
Normal file
138
vendor/github.com/go-micro/plugins/v4/store/nats-js-kv/test_data.go
generated
vendored
Normal file
@@ -0,0 +1,138 @@
|
||||
package natsjskv
|
||||
|
||||
import "go-micro.dev/v4/store"
|
||||
|
||||
type test struct {
|
||||
Record *store.Record
|
||||
Database string
|
||||
Table string
|
||||
}
|
||||
|
||||
var (
|
||||
table = []test{
|
||||
{
|
||||
Record: &store.Record{
|
||||
Key: "One",
|
||||
Value: []byte("First value"),
|
||||
},
|
||||
},
|
||||
{
|
||||
Record: &store.Record{
|
||||
Key: "Two",
|
||||
Value: []byte("Second value"),
|
||||
},
|
||||
Table: "prefix_test",
|
||||
},
|
||||
{
|
||||
Record: &store.Record{
|
||||
Key: "Third",
|
||||
Value: []byte("Third value"),
|
||||
},
|
||||
Database: "new-bucket",
|
||||
},
|
||||
{
|
||||
Record: &store.Record{
|
||||
Key: "Four",
|
||||
Value: []byte("Fourth value"),
|
||||
},
|
||||
Database: "new-bucket",
|
||||
Table: "prefix_test",
|
||||
},
|
||||
{
|
||||
Record: &store.Record{
|
||||
Key: "empty-value",
|
||||
Value: []byte{},
|
||||
},
|
||||
Database: "new-bucket",
|
||||
},
|
||||
{
|
||||
Record: &store.Record{
|
||||
Key: "Alex",
|
||||
Value: []byte("Some value"),
|
||||
},
|
||||
Database: "prefix-test",
|
||||
Table: "names",
|
||||
},
|
||||
{
|
||||
Record: &store.Record{
|
||||
Key: "Jones",
|
||||
Value: []byte("Some value"),
|
||||
},
|
||||
Database: "prefix-test",
|
||||
Table: "names",
|
||||
},
|
||||
{
|
||||
Record: &store.Record{
|
||||
Key: "Adrianna",
|
||||
Value: []byte("Some value"),
|
||||
},
|
||||
Database: "prefix-test",
|
||||
Table: "names",
|
||||
},
|
||||
{
|
||||
Record: &store.Record{
|
||||
Key: "MexicoCity",
|
||||
Value: []byte("Some value"),
|
||||
},
|
||||
Database: "prefix-test",
|
||||
Table: "cities",
|
||||
},
|
||||
{
|
||||
Record: &store.Record{
|
||||
Key: "HoustonCity",
|
||||
Value: []byte("Some value"),
|
||||
},
|
||||
Database: "prefix-test",
|
||||
Table: "cities",
|
||||
},
|
||||
{
|
||||
Record: &store.Record{
|
||||
Key: "ZurichCity",
|
||||
Value: []byte("Some value"),
|
||||
},
|
||||
Database: "prefix-test",
|
||||
Table: "cities",
|
||||
},
|
||||
{
|
||||
Record: &store.Record{
|
||||
Key: "Helsinki",
|
||||
Value: []byte("Some value"),
|
||||
},
|
||||
Database: "prefix-test",
|
||||
Table: "cities",
|
||||
},
|
||||
{
|
||||
Record: &store.Record{
|
||||
Key: "testKeytest",
|
||||
Value: []byte("Some value"),
|
||||
},
|
||||
Table: "some_table",
|
||||
},
|
||||
{
|
||||
Record: &store.Record{
|
||||
Key: "testSecondtest",
|
||||
Value: []byte("Some value"),
|
||||
},
|
||||
Table: "some_table",
|
||||
},
|
||||
{
|
||||
Record: &store.Record{
|
||||
Key: "lalala",
|
||||
Value: []byte("Some value"),
|
||||
},
|
||||
Table: "some_table",
|
||||
},
|
||||
{
|
||||
Record: &store.Record{
|
||||
Key: "testAnothertest",
|
||||
Value: []byte("Some value"),
|
||||
},
|
||||
},
|
||||
{
|
||||
Record: &store.Record{
|
||||
Key: "FobiddenCharactersAreAllowed:|@..+",
|
||||
Value: []byte("data no matter"),
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
Reference in New Issue
Block a user