mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-05-05 11:00:12 -05:00
Proxy accesstoken cache store (#5829)
* refactor middleware options Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de> * use ocmemstore micro store implementaiton for token cache Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de> * refactor ocis store options, support redis sentinel Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de> * align cache configuration Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de> * database and tabe are used to build prefixes for inmemory stores Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de> * add global persistent store options to userlog config Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de> * log cache errors but continue Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de> * drup unnecessary type conversion Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de> * Better description for the default userinfo ttl Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de> * use global cache options for even more caches Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de> * don't log userinfo cache misses Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de> * default to stock memory store Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de> * use correct mem store typo string Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de> * split cache options, doc cleanup Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de> * mint and write userinfo to cache async Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de> * use hashed token as key Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de> * go mod tidy Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de> * update docs Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de> * update cache store naming Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de> * bring back depreceted ocis-pkg/store package for backwards compatability Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de> * update changelog Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de> * Apply suggestions from code review Co-authored-by: kobergj <jkoberg@owncloud.com> * revert ocis-pkg/cache to store rename Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de> * add waiting for each step 50 milliseconds * starlack check --------- Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de> Co-authored-by: kobergj <jkoberg@owncloud.com> Co-authored-by: Viktor Scharf <scharf.vi@gmail.com>
This commit is contained in:
committed by
GitHub
parent
688d07e297
commit
6bec87f582
@@ -54,7 +54,7 @@ type Config struct {
|
||||
|
||||
Tracing *shared.Tracing `yaml:"tracing"`
|
||||
Log *shared.Log `yaml:"log"`
|
||||
CacheStore *shared.CacheStore `yaml:"cache_store"`
|
||||
Cache *shared.Cache `yaml:"cache"`
|
||||
GRPCClientTLS *shared.GRPCClientTLS `yaml:"grpc_client_tls"`
|
||||
GRPCServiceTLS *shared.GRPCServiceTLS `yaml:"grpc_service_tls"`
|
||||
HTTPServiceTLS shared.HTTPServiceTLS `yaml:"http_service_tls"`
|
||||
|
||||
@@ -49,8 +49,8 @@ func EnsureDefaults(cfg *config.Config) {
|
||||
if cfg.TokenManager == nil {
|
||||
cfg.TokenManager = &shared.TokenManager{}
|
||||
}
|
||||
if cfg.CacheStore == nil {
|
||||
cfg.CacheStore = &shared.CacheStore{}
|
||||
if cfg.Cache == nil {
|
||||
cfg.Cache = &shared.Cache{}
|
||||
}
|
||||
if cfg.GRPCClientTLS == nil {
|
||||
cfg.GRPCClientTLS = &shared.GRPCClientTLS{}
|
||||
@@ -70,7 +70,7 @@ func EnsureCommons(cfg *config.Config) {
|
||||
|
||||
cfg.Commons.Log = structs.CopyOrZeroValue(cfg.Log)
|
||||
cfg.Commons.Tracing = structs.CopyOrZeroValue(cfg.Tracing)
|
||||
cfg.Commons.CacheStore = structs.CopyOrZeroValue(cfg.CacheStore)
|
||||
cfg.Commons.Cache = structs.CopyOrZeroValue(cfg.Cache)
|
||||
|
||||
if cfg.GRPCClientTLS != nil {
|
||||
cfg.Commons.GRPCClientTLS = cfg.GRPCClientTLS
|
||||
|
||||
@@ -1,36 +0,0 @@
|
||||
package roles
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/sync"
|
||||
settingsmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/settings/v0"
|
||||
)
|
||||
|
||||
// cache is a cache implementation for roles, keyed by roleIDs.
|
||||
type cache struct {
|
||||
sc sync.Cache
|
||||
ttl time.Duration
|
||||
}
|
||||
|
||||
// newCache returns a new instance of Cache.
|
||||
func newCache(capacity int, ttl time.Duration) cache {
|
||||
return cache{
|
||||
ttl: ttl,
|
||||
sc: sync.NewCache(capacity),
|
||||
}
|
||||
}
|
||||
|
||||
// get gets a role-bundle by a given `roleID`.
|
||||
func (c *cache) get(roleID string) *settingsmsg.Bundle {
|
||||
if ce := c.sc.Load(roleID); ce != nil {
|
||||
return ce.V.(*settingsmsg.Bundle)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// set sets a roleID / role-bundle.
|
||||
func (c *cache) set(roleID string, value *settingsmsg.Bundle) {
|
||||
c.sc.Store(roleID, value, time.Now().Add(c.ttl))
|
||||
}
|
||||
@@ -1,55 +0,0 @@
|
||||
package roles
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
settingsmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/settings/v0"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func cacheRunner(size int, ttl time.Duration) (*cache, func(f func(v string))) {
|
||||
c := newCache(size, ttl)
|
||||
run := func(f func(v string)) {
|
||||
wg := sync.WaitGroup{}
|
||||
for i := 0; i < size; i++ {
|
||||
wg.Add(1)
|
||||
go func(i int) {
|
||||
f(strconv.Itoa(i))
|
||||
wg.Done()
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
return &c, run
|
||||
}
|
||||
|
||||
func BenchmarkCache(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
size := 1024
|
||||
c, cr := cacheRunner(size, 100*time.Millisecond)
|
||||
|
||||
cr(func(v string) { c.set(v, &settingsmsg.Bundle{}) })
|
||||
cr(func(v string) { c.get(v) })
|
||||
}
|
||||
|
||||
func TestCache(t *testing.T) {
|
||||
size := 1024
|
||||
ttl := 100 * time.Millisecond
|
||||
c, cr := cacheRunner(size, ttl)
|
||||
|
||||
cr(func(v string) {
|
||||
c.set(v, &settingsmsg.Bundle{Id: v})
|
||||
})
|
||||
|
||||
assert.Equal(t, "50", c.get("50").Id, "it returns the right bundle")
|
||||
assert.Nil(t, c.get("unknown"), "unknown bundle ist nil")
|
||||
|
||||
time.Sleep(ttl + 1)
|
||||
// roles cache has no access to evict, adding new items triggers a cleanup
|
||||
c.set("evict", nil)
|
||||
assert.Nil(t, c.get("50"), "old bundles get removed")
|
||||
}
|
||||
+11
-11
@@ -5,23 +5,23 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/log"
|
||||
ocisstore "github.com/owncloud/ocis/v2/ocis-pkg/store"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/store"
|
||||
settingsmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/settings/v0"
|
||||
settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0"
|
||||
"go-micro.dev/v4/store"
|
||||
microstore "go-micro.dev/v4/store"
|
||||
"google.golang.org/protobuf/encoding/protojson"
|
||||
)
|
||||
|
||||
const (
|
||||
cacheDatabase = "ocis-pkg"
|
||||
cacheTableName = "ocis-pkg/roles"
|
||||
cacheTableName = "roles"
|
||||
cacheTTL = time.Hour
|
||||
)
|
||||
|
||||
// Manager manages a cache of roles by fetching unknown roles from the settings.RoleService.
|
||||
type Manager struct {
|
||||
logger log.Logger
|
||||
cache store.Store
|
||||
roleCache microstore.Store
|
||||
roleService settingssvc.RoleService
|
||||
}
|
||||
|
||||
@@ -29,9 +29,9 @@ type Manager struct {
|
||||
func NewManager(o ...Option) Manager {
|
||||
opts := newOptions(o...)
|
||||
|
||||
nStore := ocisstore.Create(opts.storeOptions...)
|
||||
nStore := store.Create(opts.storeOptions...)
|
||||
return Manager{
|
||||
cache: nStore,
|
||||
roleCache: nStore,
|
||||
roleService: opts.roleService,
|
||||
}
|
||||
}
|
||||
@@ -42,7 +42,7 @@ func (m *Manager) List(ctx context.Context, roleIDs []string) []*settingsmsg.Bun
|
||||
result := make([]*settingsmsg.Bundle, 0)
|
||||
lookup := make([]string, 0)
|
||||
for _, roleID := range roleIDs {
|
||||
if records, err := m.cache.Read(roleID, store.ReadFrom(cacheDatabase, cacheTableName)); err != nil {
|
||||
if records, err := m.roleCache.Read(roleID, microstore.ReadFrom(cacheDatabase, cacheTableName)); err != nil {
|
||||
lookup = append(lookup, roleID)
|
||||
} else {
|
||||
role := &settingsmsg.Bundle{}
|
||||
@@ -77,15 +77,15 @@ func (m *Manager) List(ctx context.Context, roleIDs []string) []*settingsmsg.Bun
|
||||
}
|
||||
for _, role := range res.Bundles {
|
||||
jsonbytes, _ := protojson.Marshal(role)
|
||||
record := &store.Record{
|
||||
record := µstore.Record{
|
||||
Key: role.Id,
|
||||
Value: jsonbytes,
|
||||
Expiry: cacheTTL,
|
||||
}
|
||||
err := m.cache.Write(
|
||||
err := m.roleCache.Write(
|
||||
record,
|
||||
store.WriteTo(cacheDatabase, cacheTableName),
|
||||
store.WriteTTL(cacheTTL),
|
||||
microstore.WriteTo(cacheDatabase, cacheTableName),
|
||||
microstore.WriteTTL(cacheTTL),
|
||||
)
|
||||
if err != nil {
|
||||
m.logger.Debug().Err(err).Msg("failed to cache roles")
|
||||
|
||||
@@ -2,13 +2,13 @@ package roles
|
||||
|
||||
import (
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/log"
|
||||
ocisstore "github.com/owncloud/ocis/v2/ocis-pkg/store"
|
||||
settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0"
|
||||
"go-micro.dev/v4/store"
|
||||
)
|
||||
|
||||
// Options are all the possible options.
|
||||
type Options struct {
|
||||
storeOptions []ocisstore.Option
|
||||
storeOptions []store.Option
|
||||
logger log.Logger
|
||||
roleService settingssvc.RoleService
|
||||
}
|
||||
@@ -31,7 +31,7 @@ func RoleService(rs settingssvc.RoleService) Option {
|
||||
}
|
||||
|
||||
// StoreOptions are the options for the store
|
||||
func StoreOptions(storeOpts []ocisstore.Option) Option {
|
||||
func StoreOptions(storeOpts []store.Option) Option {
|
||||
return func(o *Options) {
|
||||
o.storeOptions = storeOpts
|
||||
}
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
package shared
|
||||
|
||||
import "time"
|
||||
|
||||
// EnvBinding represents a direct binding from an env variable to a go kind. Along with gookit/config, its primal goal
|
||||
// is to unpack environment variables into a Go value. We do so with reflection, and this data structure is just a step
|
||||
// in between.
|
||||
@@ -53,10 +55,13 @@ type HTTPServiceTLS struct {
|
||||
Key string `yaml:"key" env:"OCIS_HTTP_TLS_KEY" desc:"Path/File name for the TLS certificate key (in PEM format) for the server certificate to use for the http services."`
|
||||
}
|
||||
|
||||
type CacheStore struct {
|
||||
Type string `yaml:"type" env:"OCIS_CACHE_STORE_TYPE" desc:"The type of the cache store. Valid options are \"noop\", \"ocmem\", \"etcd\" and \"memory\""`
|
||||
Address string `yaml:"address" env:"OCIS_CACHE_STORE_ADDRESS" desc:"A comma-separated list of addresses to connect to. Only valid if the above setting is set to \"etcd\""`
|
||||
Size int `yaml:"size" env:"OCIS_CACHE_STORE_SIZE" desc:"Maximum size for the cache store. Only ocmem will use this option, in number of items per table. The rest will ignore the option and can grow indefinitely"`
|
||||
type Cache struct {
|
||||
Store string `yaml:"store" env:"OCIS_CACHE_STORE;OCIS_CACHE_STORE_TYPE" desc:"The type of the cache store. Supported values are: 'memory', 'ocmem', 'etcd', 'redis', 'redis-sentinel', 'nats-js', 'noop'. See the text description for details."`
|
||||
Nodes []string `yaml:"nodes" env:"OCIS_CACHE_STORE_NODES;OCIS_CACHE_STORE_ADDRESSES" desc:"A comma separated list of nodes to access the configured store. This has no effect when 'in-memory' stores are configured. Note that the behaviour how nodes are used is dependent on the library of the configured store."`
|
||||
Database string `yaml:"database" env:"OCIS_CACHE_STORE_DATABASE" desc:"The database name the configured store should use."`
|
||||
Table string `yaml:"table" env:"OCIS_CACHE_STORE_TABLE" desc:"The database table the store should use."`
|
||||
TTL time.Duration `yaml:"ttl" env:"OCIS_CACHE_STORE_TTL" desc:"Time to live for events in the store. The duration can be set as number followed by a unit identifier like s, m or h."`
|
||||
Size int `yaml:"size" env:"OCIS_CACHE_STORE_SIZE" desc:"The maximum quantity of items in the store. Only applies when store type 'ocmem' is configured."`
|
||||
}
|
||||
|
||||
// Commons holds configuration that are common to all extensions. Each extension can then decide whether
|
||||
@@ -64,7 +69,7 @@ type CacheStore struct {
|
||||
type Commons struct {
|
||||
Log *Log `yaml:"log"`
|
||||
Tracing *Tracing `yaml:"tracing"`
|
||||
CacheStore *CacheStore `yaml:"cache_store"`
|
||||
Cache *Cache `yaml:"cache"`
|
||||
GRPCClientTLS *GRPCClientTLS `yaml:"grpc_client_tls"`
|
||||
GRPCServiceTLS *GRPCServiceTLS `yaml:"grpc_service_tls"`
|
||||
HTTPServiceTLS HTTPServiceTLS `yaml:"http_service_tls"`
|
||||
|
||||
@@ -0,0 +1,118 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
natsjs "github.com/go-micro/plugins/v4/store/nats-js"
|
||||
"github.com/go-micro/plugins/v4/store/redis"
|
||||
redisopts "github.com/go-redis/redis/v8"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/store/etcd"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/store/memory"
|
||||
"go-micro.dev/v4/logger"
|
||||
"go-micro.dev/v4/store"
|
||||
)
|
||||
|
||||
var ocMemStore *store.Store
|
||||
|
||||
const (
|
||||
TypeMemory = "memory"
|
||||
TypeNoop = "noop"
|
||||
TypeEtcd = "etcd"
|
||||
TypeRedis = "redis"
|
||||
TypeRedisSentinel = "redis-sentinel"
|
||||
TypeOCMem = "ocmem"
|
||||
TypeNatsJS = "nats-js"
|
||||
)
|
||||
|
||||
// Create returns a configured key-value micro store
|
||||
//
|
||||
// Each microservice (or whatever piece is using the store) should use the
|
||||
// options available in the interface's operations to choose the right database
|
||||
// and table to prevent collisions with other microservices.
|
||||
// Recommended approach is to use "services" or "ocis-pkg" for the database,
|
||||
// and "services/<service-name>/" or "ocis-pkg/<pkg>/" for the package name.
|
||||
func Create(opts ...store.Option) store.Store {
|
||||
options := &store.Options{
|
||||
Context: context.Background(),
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(options)
|
||||
}
|
||||
|
||||
storeType, _ := options.Context.Value(typeContextKey{}).(string)
|
||||
|
||||
switch storeType {
|
||||
case TypeNoop:
|
||||
return store.NewNoopStore(opts...)
|
||||
case TypeEtcd:
|
||||
return etcd.NewEtcdStore(opts...)
|
||||
case TypeRedis:
|
||||
// FIXME redis plugin does not support redis cluster or ring -> needs upstream patch or our implementation
|
||||
return redis.NewStore(opts...)
|
||||
case TypeRedisSentinel:
|
||||
redisMaster := ""
|
||||
redisNodes := []string{}
|
||||
for _, node := range options.Nodes {
|
||||
parts := strings.SplitN(node, "/", 2)
|
||||
if len(parts) != 2 {
|
||||
return nil
|
||||
}
|
||||
// the first node is used to retrieve the redis master
|
||||
redisNodes = append(redisNodes, parts[0])
|
||||
if redisMaster == "" {
|
||||
redisMaster = parts[1]
|
||||
}
|
||||
}
|
||||
return redis.NewStore(
|
||||
store.Database(options.Database),
|
||||
store.Table(options.Table),
|
||||
store.Nodes(redisNodes...),
|
||||
redis.WithRedisOptions(redisopts.UniversalOptions{
|
||||
MasterName: redisMaster,
|
||||
}),
|
||||
)
|
||||
case TypeOCMem:
|
||||
if ocMemStore == nil {
|
||||
var memStore store.Store
|
||||
|
||||
sizeNum, _ := options.Context.Value(sizeContextKey{}).(int)
|
||||
if sizeNum <= 0 {
|
||||
memStore = memory.NewMultiMemStore()
|
||||
} else {
|
||||
memStore = memory.NewMultiMemStore(
|
||||
store.WithContext(
|
||||
memory.NewContext(
|
||||
context.Background(),
|
||||
map[string]interface{}{
|
||||
"maxCap": sizeNum,
|
||||
},
|
||||
)),
|
||||
)
|
||||
}
|
||||
ocMemStore = &memStore
|
||||
}
|
||||
return *ocMemStore
|
||||
case TypeNatsJS:
|
||||
ttl, _ := options.Context.Value(ttlContextKey{}).(time.Duration)
|
||||
// TODO nats needs a DefaultTTL option as it does not support per Write TTL ...
|
||||
// FIXME nats has restrictions on the key, we cannot use slashes AFAICT
|
||||
// host, port, clusterid
|
||||
return natsjs.NewStore(
|
||||
append(opts,
|
||||
natsjs.NatsOptions(nats.Options{Name: "TODO"}),
|
||||
natsjs.DefaultTTL(ttl))...,
|
||||
) // TODO test with ocis nats
|
||||
case TypeMemory, "mem", "": // allow existing short form and use as default
|
||||
return store.NewMemoryStore(opts...)
|
||||
default:
|
||||
// try to log an error
|
||||
if options.Logger == nil {
|
||||
options.Logger = logger.DefaultLogger
|
||||
}
|
||||
options.Logger.Logf(logger.ErrorLevel, "unknown store type: '%s', falling back to memory", storeType)
|
||||
return store.NewMemoryStore(opts...)
|
||||
}
|
||||
}
|
||||
+45
-36
@@ -1,50 +1,59 @@
|
||||
package store
|
||||
|
||||
import "time"
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
// Option provides an option to configure the store
|
||||
type Option func(*Options)
|
||||
"go-micro.dev/v4/store"
|
||||
)
|
||||
|
||||
// Type defines the type of the store
|
||||
func Type(typ string) Option {
|
||||
return func(o *Options) {
|
||||
o.Type = typ
|
||||
type typeContextKey struct{}
|
||||
|
||||
// Store determines the implementation:
|
||||
// - "memory", for a in-memory implementation, which is also the default if noone matches
|
||||
// - "noop", for a noop store (it does nothing)
|
||||
// - "etcd", for etcd
|
||||
// - "nats-js" for nats-js, needs to have TTL configured at creation
|
||||
// - "redis", for redis
|
||||
// - "redis-sentinel", for redis-sentinel
|
||||
// - "ocmem", custom in-memory implementation, with fixed size and optimized prefix
|
||||
// and suffix search
|
||||
func Store(val string) store.Option {
|
||||
return func(o *store.Options) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
|
||||
o.Context = context.WithValue(o.Context, typeContextKey{}, val)
|
||||
}
|
||||
}
|
||||
|
||||
// Addresses defines the addresses where the store can be reached
|
||||
func Addresses(addrs ...string) Option {
|
||||
return func(o *Options) {
|
||||
o.Addresses = addrs
|
||||
type sizeContextKey struct{}
|
||||
|
||||
// Size configures the maximum capacity of the cache for the "ocmem" implementation,
|
||||
// in number of items that the cache can hold per table.
|
||||
// You can use 5000 to make the cache hold up to 5000 elements.
|
||||
// The parameter only affects to the "ocmem" implementation, the rest will ignore it.
|
||||
// If an invalid value is used, the default of 512 will be used instead.
|
||||
func Size(val int) store.Option {
|
||||
return func(o *store.Options) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
|
||||
o.Context = context.WithValue(o.Context, sizeContextKey{}, val)
|
||||
}
|
||||
}
|
||||
|
||||
// Database defines the Database the store should use
|
||||
func Database(db string) Option {
|
||||
return func(o *Options) {
|
||||
o.Database = db
|
||||
}
|
||||
}
|
||||
type ttlContextKey struct{}
|
||||
|
||||
// Table defines the table the store should use
|
||||
func Table(t string) Option {
|
||||
return func(o *Options) {
|
||||
o.Table = t
|
||||
}
|
||||
}
|
||||
// TTL is the time to live for documents stored in the store
|
||||
func TTL(val time.Duration) store.Option {
|
||||
return func(o *store.Options) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
|
||||
// Size defines the maximum capacity of the store.
|
||||
// Only applicable when using "ocmem" store
|
||||
func Size(s int) Option {
|
||||
return func(o *Options) {
|
||||
o.Size = s
|
||||
}
|
||||
}
|
||||
|
||||
// TTL defines the time to life for elements in the store.
|
||||
// Only applicable when using "natsjs" store
|
||||
func TTL(t time.Duration) Option {
|
||||
return func(o *Options) {
|
||||
o.TTL = t
|
||||
o.Context = context.WithValue(o.Context, ttlContextKey{}, val)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,128 +0,0 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
natsjs "github.com/go-micro/plugins/v4/store/nats-js"
|
||||
"github.com/go-micro/plugins/v4/store/redis"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/store/etcd"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/store/memory"
|
||||
"go-micro.dev/v4/store"
|
||||
)
|
||||
|
||||
var ocMemStore *store.Store
|
||||
|
||||
// Options are the options to configure the store
|
||||
type Options struct {
|
||||
// Type determines the implementation:
|
||||
// * "noop", for a noop store (it does nothing)
|
||||
// * "etcd", for etcd
|
||||
// * "ocmem", custom in-memory implementation, with fixed size and optimized prefix
|
||||
// and suffix search
|
||||
// * "memory", for a in-memory implementation, which is the default if noone matches
|
||||
Type string
|
||||
|
||||
// Address is a list of nodes that the store will use.
|
||||
Addresses []string
|
||||
|
||||
// Size configures the maximum capacity of the cache for
|
||||
// the "ocmem" implementation, in number of items that the cache can hold per table.
|
||||
// You can use 5000 to make the cache hold up to 5000 elements.
|
||||
// The parameter only affects to the "ocmem" implementation, the rest will ignore it.
|
||||
// If an invalid value is used, the default of 512 will be used instead.
|
||||
Size int
|
||||
|
||||
// Database the store should use (optional)
|
||||
Database string
|
||||
|
||||
// Table the store should use (optional)
|
||||
Table string
|
||||
|
||||
// TTL is the time to life for documents stored in the store
|
||||
TTL time.Duration
|
||||
}
|
||||
|
||||
// Create returns a configured key-value store
|
||||
//
|
||||
// Each microservice (or whatever piece is using the store) should use the
|
||||
// options available in the interface's operations to choose the right database
|
||||
// and table to prevent collisions with other microservices.
|
||||
// Recommended approach is to use "services" or "ocis-pkg" for the database,
|
||||
// and "services/<service-name>/" or "ocis-pkg/<pkg>/" for the package name.
|
||||
func Create(opts ...Option) store.Store {
|
||||
options := &Options{}
|
||||
for _, o := range opts {
|
||||
o(options)
|
||||
}
|
||||
|
||||
storeopts := storeOptions(options)
|
||||
|
||||
switch options.Type {
|
||||
default:
|
||||
// TODO: better to error in default case?
|
||||
fallthrough
|
||||
case "mem":
|
||||
return store.NewMemoryStore(storeopts...)
|
||||
case "noop":
|
||||
return store.NewNoopStore(storeopts...)
|
||||
case "etcd":
|
||||
return etcd.NewEtcdStore(storeopts...)
|
||||
case "redis":
|
||||
// FIXME redis plugin does not support redis cluster, sentinel or ring -> needs upstream patch or our implementation
|
||||
return redis.NewStore(storeopts...)
|
||||
case "ocmem":
|
||||
if ocMemStore == nil {
|
||||
var memStore store.Store
|
||||
|
||||
sizeNum := options.Size
|
||||
if sizeNum <= 0 {
|
||||
memStore = memory.NewMultiMemStore()
|
||||
} else {
|
||||
memStore = memory.NewMultiMemStore(
|
||||
store.WithContext(
|
||||
memory.NewContext(
|
||||
context.Background(),
|
||||
map[string]interface{}{
|
||||
"maxCap": sizeNum,
|
||||
},
|
||||
)),
|
||||
)
|
||||
}
|
||||
ocMemStore = &memStore
|
||||
}
|
||||
return *ocMemStore
|
||||
case "nats-js":
|
||||
// TODO nats needs a DefaultTTL option as it does not support per Write TTL ...
|
||||
// FIXME nats has restrictions on the key, we cannot use slashes AFAICT
|
||||
// host, port, clusterid
|
||||
return natsjs.NewStore(
|
||||
append(storeopts,
|
||||
natsjs.NatsOptions(nats.Options{Name: "TODO"}),
|
||||
natsjs.DefaultTTL(options.TTL),
|
||||
)...,
|
||||
) // TODO test with ocis nats
|
||||
}
|
||||
}
|
||||
|
||||
func storeOptions(o *Options) []store.Option {
|
||||
var opts []store.Option
|
||||
|
||||
if o.Addresses != nil {
|
||||
opts = append(opts, store.Nodes(o.Addresses...))
|
||||
}
|
||||
|
||||
if o.Database != "" {
|
||||
opts = append(opts, store.Database(o.Database))
|
||||
|
||||
}
|
||||
|
||||
if o.Table != "" {
|
||||
opts = append(opts, store.Table(o.Table))
|
||||
|
||||
}
|
||||
|
||||
return opts
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user