mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-02-10 14:09:05 -06:00
reva-bump-2.42.1 (#2225)
This commit is contained in:
67
vendor/github.com/opencloud-eu/reva/v2/pkg/store/store.go
generated
vendored
67
vendor/github.com/opencloud-eu/reva/v2/pkg/store/store.go
generated
vendored
@@ -21,6 +21,7 @@ package store
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -32,6 +33,7 @@ import (
|
||||
"github.com/opencloud-eu/reva/v2/pkg/store/etcd"
|
||||
"github.com/opencloud-eu/reva/v2/pkg/store/memory"
|
||||
"go-micro.dev/v4/logger"
|
||||
"go-micro.dev/v4/store"
|
||||
microstore "go-micro.dev/v4/store"
|
||||
)
|
||||
|
||||
@@ -125,19 +127,33 @@ func Create(opts ...microstore.Option) microstore.Store {
|
||||
return *ocMemStore
|
||||
case TypeNatsJS:
|
||||
opts, ttl, natsOptions := natsConfig(options.Logger, options.Context, opts)
|
||||
return natsjs.NewStore(
|
||||
store := natsjs.NewStore(
|
||||
append(opts,
|
||||
natsjs.NatsOptions(natsOptions), // always pass in properly initialized default nats options
|
||||
natsjs.DefaultTTL(ttl))..., // nats needs a DefaultTTL option as it does not support per Write TTL
|
||||
)
|
||||
|
||||
err := updateNatsStore(opts, ttl, natsOptions)
|
||||
if err != nil {
|
||||
options.Logger.Logf(logger.ErrorLevel, "failed to update nats-js store: '%s'", err.Error())
|
||||
}
|
||||
|
||||
return store
|
||||
case TypeNatsJSKV:
|
||||
opts, ttl, natsOptions := natsConfig(options.Logger, options.Context, opts)
|
||||
return natsjskv.NewStore(
|
||||
store := natsjskv.NewStore(
|
||||
append(opts,
|
||||
natsjskv.NatsOptions(natsOptions), // always pass in properly initialized default nats options
|
||||
natsjskv.EncodeKeys(), // nats has restrictions on the key, we cannot use slashes
|
||||
natsjskv.DefaultTTL(ttl))..., // nats needs a DefaultTTL option as it does not support per Write TTL
|
||||
)
|
||||
|
||||
err := updateNatsStore(opts, ttl, natsOptions)
|
||||
if err != nil {
|
||||
options.Logger.Logf(logger.ErrorLevel, "failed to update nats-js-kv store: '%s'", err.Error())
|
||||
}
|
||||
|
||||
return store
|
||||
case TypeMemory, "mem", "": // allow existing short form and use as default
|
||||
return microstore.NewMemoryStore(opts...)
|
||||
default:
|
||||
@@ -146,13 +162,58 @@ func Create(opts ...microstore.Option) microstore.Store {
|
||||
}
|
||||
}
|
||||
|
||||
func updateNatsStore(opts []store.Option, ttl time.Duration, natsOptions nats.Options) error {
|
||||
options := store.Options{}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
bucketName := options.Database
|
||||
if bucketName == "" {
|
||||
return fmt.Errorf("bucket name (database) must be set")
|
||||
}
|
||||
|
||||
if len(options.Nodes) > 0 {
|
||||
natsOptions.Servers = options.Nodes
|
||||
}
|
||||
nc, err := natsOptions.Connect()
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not connect to nats: %w", err)
|
||||
}
|
||||
defer nc.Close()
|
||||
|
||||
js, err := nc.JetStream()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// NATS KV buckets are actually streams named "KV_<bucket_name>"
|
||||
info, err := js.StreamInfo("KV_" + bucketName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get bucket info: %w", err)
|
||||
}
|
||||
|
||||
config := info.Config
|
||||
config.MaxAge = ttl
|
||||
|
||||
_, err = js.UpdateStream(&config)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update bucket TTL: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func natsConfig(log logger.Logger, ctx context.Context, opts []microstore.Option) ([]microstore.Option, time.Duration, nats.Options) {
|
||||
|
||||
if mem, _ := ctx.Value(disablePersistanceContextKey{}).(bool); mem {
|
||||
opts = append(opts, natsjs.DefaultMemory())
|
||||
}
|
||||
|
||||
ttl, _ := ctx.Value(ttlContextKey{}).(time.Duration)
|
||||
ttl := time.Duration(0)
|
||||
if d, ok := ctx.Value(ttlContextKey{}).(time.Duration); ok {
|
||||
ttl = d
|
||||
}
|
||||
|
||||
// preparing natsOptions before the switch to reuse the same code
|
||||
natsOptions := nats.GetDefaultOptions()
|
||||
|
||||
Reference in New Issue
Block a user