Use go-micro store to cache the roles (#4337)

* Use go-micro store to cache the roles

Add custom in-memory implementation

* replace redis with custom etcd implementation

* adjust table name for the cache in the roles manager

* Fix tests

* Fix sonarcloud issues

* Refactor for sonarcloud

* Allow configuration of cache per service

* Reuse parent context in etcd implementation
This commit is contained in:
Juan Pablo Villafañez
2022-09-16 15:42:47 +02:00
committed by GitHub
parent b8cce99e7a
commit 6ee4a084a2
23 changed files with 3238 additions and 44 deletions

4
go.mod
View File

@@ -7,6 +7,7 @@ require (
github.com/Masterminds/semver v1.5.0
github.com/MicahParks/keyfunc v1.2.2
github.com/ReneKroon/ttlcache/v2 v2.11.0
github.com/armon/go-radix v1.0.0
github.com/blevesearch/bleve/v2 v2.3.4
github.com/blevesearch/bleve_index_api v1.0.3
github.com/coreos/go-oidc/v3 v3.4.0
@@ -65,6 +66,7 @@ require (
github.com/xhit/go-simple-mail/v2 v2.11.0
go-micro.dev/v4 v4.8.1
go.etcd.io/bbolt v1.3.6
go.etcd.io/etcd/client/v3 v3.5.2
go.opencensus.io v0.23.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.34.0
go.opentelemetry.io/otel v1.10.0
@@ -101,7 +103,6 @@ require (
github.com/alexedwards/argon2id v0.0.0-20211130144151-3585854a6387 // indirect
github.com/amoghe/go-crypt v0.0.0-20220222110647-20eada5f5964 // indirect
github.com/armon/go-metrics v0.3.10 // indirect
github.com/armon/go-radix v1.0.0 // indirect
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d // indirect
github.com/aws/aws-sdk-go v1.44.94 // indirect
github.com/beevik/etree v1.1.0 // indirect
@@ -262,7 +263,6 @@ require (
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
go.etcd.io/etcd/api/v3 v3.5.2 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.2 // indirect
go.etcd.io/etcd/client/v3 v3.5.2 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.7.0 // indirect
go.uber.org/zap v1.19.1 // indirect

2
go.sum
View File

@@ -292,8 +292,6 @@ github.com/crewjam/saml v0.4.6 h1:XCUFPkQSJLvzyl4cW9OvpWUbRf0gE7VUpU8ZnilbeM4=
github.com/crewjam/saml v0.4.6/go.mod h1:ZBOXnNPFzB3CgOkRm7Nd6IVdkG+l/wF+0ZXLqD96t1A=
github.com/cs3org/go-cs3apis v0.0.0-20220818202316-e92afdddac6d h1:toyZ7IsXlUdEPZ/IG8fg7hbM8HcLPY0bkX4FKBmgLVI=
github.com/cs3org/go-cs3apis v0.0.0-20220818202316-e92afdddac6d/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY=
github.com/cs3org/reva/v2 v2.10.1-0.20220915071600-3358dc72a980 h1:siIHxgMHWCxERsHPYwHL7Pno27URyqjV2np9Mh1U84g=
github.com/cs3org/reva/v2 v2.10.1-0.20220915071600-3358dc72a980/go.mod h1:+BYVpRV8g1hL8wF3+3BunL9BKPsXVyJYmH8COxq/V7Y=
github.com/cs3org/reva/v2 v2.10.1-0.20220915095422-4b099c09a66c h1:pvbsnSl5WpS6PkSR4glwR8OJGrRdZASajAJtNwp9E+Y=
github.com/cs3org/reva/v2 v2.10.1-0.20220915095422-4b099c09a66c/go.mod h1:+BYVpRV8g1hL8wF3+3BunL9BKPsXVyJYmH8COxq/V7Y=
github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8 h1:Z9lwXumT5ACSmJ7WGnFl+OMLLjpz5uR2fyz7dC255FI=

View File

@@ -57,8 +57,9 @@ type Runtime struct {
type Config struct {
*shared.Commons `yaml:"shared"`
Tracing *shared.Tracing `yaml:"tracing"`
Log *shared.Log `yaml:"log"`
Tracing *shared.Tracing `yaml:"tracing"`
Log *shared.Log `yaml:"log"`
CacheStore *shared.CacheStore `yaml:"cache_store"`
Mode Mode // DEPRECATED
File string

View File

@@ -48,6 +48,9 @@ func EnsureDefaults(cfg *config.Config) {
if cfg.TokenManager == nil {
cfg.TokenManager = &shared.TokenManager{}
}
if cfg.CacheStore == nil {
cfg.CacheStore = &shared.CacheStore{}
}
}
// EnsureCommons copies applicable parts of the oCIS config into the commons part
@@ -81,6 +84,16 @@ func EnsureCommons(cfg *config.Config) {
cfg.Commons.Tracing = &shared.Tracing{}
}
if cfg.CacheStore != nil {
cfg.Commons.CacheStore = &shared.CacheStore{
Type: cfg.CacheStore.Type,
Address: cfg.CacheStore.Address,
Size: cfg.CacheStore.Size,
}
} else {
cfg.Commons.CacheStore = &shared.CacheStore{}
}
// copy token manager to the commons part if set
if cfg.TokenManager != nil {
cfg.Commons.TokenManager = cfg.TokenManager

View File

@@ -2,16 +2,26 @@ package roles
import (
"context"
"time"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
ocisstore "github.com/owncloud/ocis/v2/ocis-pkg/store"
settingsmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/settings/v0"
settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0"
"go-micro.dev/v4/store"
"google.golang.org/protobuf/encoding/protojson"
)
const (
cacheDatabase = "ocis-pkg"
cacheTableName = "ocis-pkg/roles"
cacheTTL = time.Hour
)
// Manager manages a cache of roles by fetching unknown roles from the settings.RoleService.
type Manager struct {
logger log.Logger
cache cache
cache store.Store
roleService settingssvc.RoleService
}
@@ -19,8 +29,9 @@ type Manager struct {
func NewManager(o ...Option) Manager {
opts := newOptions(o...)
nStore := ocisstore.GetStore(opts.storeOptions)
return Manager{
cache: newCache(opts.size, opts.ttl),
cache: nStore,
roleService: opts.roleService,
}
}
@@ -31,10 +42,26 @@ func (m *Manager) List(ctx context.Context, roleIDs []string) []*settingsmsg.Bun
result := make([]*settingsmsg.Bundle, 0)
lookup := make([]string, 0)
for _, roleID := range roleIDs {
if hit := m.cache.get(roleID); hit == nil {
if records, err := m.cache.Read(roleID, store.ReadFrom(cacheDatabase, cacheTableName)); err != nil {
lookup = append(lookup, roleID)
} else {
result = append(result, hit)
role := &settingsmsg.Bundle{}
found := false
for _, record := range records {
if record.Key == roleID {
if err := protojson.Unmarshal(record.Value, role); err == nil {
// if we can unmarshal the role, append it to the result
// otherwise assume the role wasn't found (data was damaged and
// we need to get the role again)
result = append(result, role)
found = true
break
}
}
}
if !found {
lookup = append(lookup, roleID)
}
}
}
@@ -49,7 +76,20 @@ func (m *Manager) List(ctx context.Context, roleIDs []string) []*settingsmsg.Bun
return nil
}
for _, role := range res.Bundles {
m.cache.set(role.Id, role)
jsonbytes, _ := protojson.Marshal(role)
record := &store.Record{
Key: role.Id,
Value: jsonbytes,
Expiry: cacheTTL,
}
err := m.cache.Write(
record,
store.WriteTo(cacheDatabase, cacheTableName),
store.WriteTTL(cacheTTL),
)
if err != nil {
m.logger.Debug().Err(err).Msg("failed to cache roles")
}
result = append(result, role)
}
}

View File

@@ -1,37 +1,21 @@
package roles
import (
"time"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
ocisstore "github.com/owncloud/ocis/v2/ocis-pkg/store"
settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0"
)
// Options are all the possible options.
type Options struct {
size int
ttl time.Duration
logger log.Logger
roleService settingssvc.RoleService
storeOptions ocisstore.OcisStoreOptions
logger log.Logger
roleService settingssvc.RoleService
}
// Option mutates option
type Option func(*Options)
// CacheSize configures the size of the cache in items.
func CacheSize(s int) Option {
return func(o *Options) {
o.size = s
}
}
// CacheTTL rebuilds the cache after the configured duration.
func CacheTTL(ttl time.Duration) Option {
return func(o *Options) {
o.ttl = ttl
}
}
// Logger sets a preconfigured logger
func Logger(logger log.Logger) Option {
return func(o *Options) {
@@ -46,6 +30,12 @@ func RoleService(rs settingssvc.RoleService) Option {
}
}
func StoreOptions(storeOpts ocisstore.OcisStoreOptions) Option {
return func(o *Options) {
o.storeOptions = storeOpts
}
}
func newOptions(opts ...Option) Options {
o := Options{}

View File

@@ -34,11 +34,18 @@ type Reva struct {
Address string `yaml:"address" env:"REVA_GATEWAY" desc:"The CS3 gateway endpoint."`
}
// Commons holds configuration that are common to all services. Each service can then decide whether
type CacheStore struct {
Type string `yaml:"type" env:"OCIS_CACHE_STORE_TYPE" desc:"The type of the cache store. Valid options are \"noop\", \"ocmem\", \"etcd\" and \"memory\""`
Address string `yaml:"address" env:"OCIS_CACHE_STORE_ADDRESS" desc:"a comma-separated list of addresses to connect to. Only for etcd"`
Size int `yaml:"size" env:"OCIS_CACHE_STORE_SIZE" desc:"Maximum size for the cache store. Only ocmem will use this option, in number of items per table. The rest will ignore the option and can grow indefinitely"`
}
// Commons holds configuration that are common to all extensions. Each extension can then decide whether
// to overwrite its values.
type Commons struct {
Log *Log `yaml:"log"`
Tracing *Tracing `yaml:"tracing"`
CacheStore *CacheStore `yaml:"cache_store"`
OcisURL string `yaml:"ocis_url" env:"OCIS_URL" desc:"URL, where oCIS is reachable for users."`
TokenManager *TokenManager `mask:"struct" yaml:"token_manager"`
Reva *Reva `yaml:"reva"`

531
ocis-pkg/store/etcd/etcd.go Normal file
View File

@@ -0,0 +1,531 @@
package etcd
import (
"context"
"encoding/json"
"strings"
"time"
"go-micro.dev/v4/store"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/namespace"
)
const (
prefixNS = ".prefix"
suffixNS = ".suffix"
)
type EtcdStore struct {
options store.Options
client *clientv3.Client
}
// Create a new go-micro store backed by etcd
func NewEtcdStore(opts ...store.Option) store.Store {
es := &EtcdStore{}
_ = es.Init(opts...)
return es
}
func (es *EtcdStore) getCtx() (context.Context, context.CancelFunc) {
currentCtx := es.options.Context
if currentCtx == nil {
currentCtx = context.TODO()
}
ctx, cancel := context.WithTimeout(currentCtx, 10*time.Second)
return ctx, cancel
}
// Setup the etcd client based on the current options. The old client (if any)
// will be closed.
// Currently, only the etcd nodes are configurable. If no node is provided,
// it will use the "127.0.0.1:2379" node.
// Context timeout is setup to 10 seconds, and dial timeout to 2 seconds
func (es *EtcdStore) setupClient() {
if es.client != nil {
es.client.Close()
}
endpoints := []string{"127.0.0.1:2379"}
if len(es.options.Nodes) > 0 {
endpoints = es.options.Nodes
}
cli, _ := clientv3.New(clientv3.Config{
DialTimeout: 2 * time.Second,
Endpoints: endpoints,
})
es.client = cli
}
// Initialize the go-micro store implementation.
// Currently, only the nodes are configurable, the rest of the options
// will be ignored.
func (es *EtcdStore) Init(opts ...store.Option) error {
optList := store.Options{}
for _, opt := range opts {
opt(&optList)
}
es.options = optList
es.setupClient()
return nil
}
// Get the store options
func (es *EtcdStore) Options() store.Options {
return es.options
}
// Get the effective TTL, as int64 number of seconds. It will prioritize
// the TTL set in the options, then the expiry time in the options, and
// finally the one set as part of the record
func getEffectiveTTL(r *store.Record, opts store.WriteOptions) int64 {
// set base ttl duration and expiration time based on the record
duration := r.Expiry
// overwrite ttl duration and expiration time based on options
if !opts.Expiry.IsZero() {
// options.Expiry is a time.Time, newRecord.Expiry is a time.Duration
duration = time.Until(opts.Expiry)
}
// TTL option takes precedence over expiration time
if opts.TTL != 0 {
duration = opts.TTL
}
// use milliseconds because it returns an int64 instead of a float64
return duration.Milliseconds() / 1000
}
// Write the record into the etcd. The record will be duplicated in order to
// find it by prefix or by suffix. This means that it will take double space.
// Note that this is an implementation detail and it will be handled
// transparently.
//
// Database and Table options will be used to provide a different prefix to
// the key. Each service using this store should use a different database+table
// combination in order to prevent key collisions.
//
// Due to how TTLs are implemented in etcd, the minimum valid TTL seems to
// be 2 secs. Using lower values or even negative values will force the etcd
// server to use the minimum value instead.
// In addition, getting a lease for the TTL and attach it to the target key
// are 2 different operations that can't be sent as part of a transaction.
// This means that it's possible to get a lease and have that lease expire
// before attaching it to the key. Errors are expected to happen if this is
// the case, and no key will be inserted.
// According to etcd documentation, the key is guaranteed to be available
// AT LEAST the TTL duration. This means that the key might be available for
// a longer period of time in special circumstances.
//
// It's recommended to use a minimum TTL of 10 secs or higher (or not to use
// TTL) in order to prevent problematic scenarios.
func (es *EtcdStore) Write(r *store.Record, opts ...store.WriteOption) error {
wopts := store.WriteOptions{}
for _, opt := range opts {
opt(&wopts)
}
prefix := buildPrefix(wopts.Database, wopts.Table, prefixNS)
suffix := buildPrefix(wopts.Database, wopts.Table, suffixNS)
kv := es.client.KV
jsonRecord, err := json.Marshal(r)
if err != nil {
return err
}
jsonStringRecord := string(jsonRecord)
effectiveTTL := getEffectiveTTL(r, wopts)
var opOpts []clientv3.OpOption
if effectiveTTL != 0 {
lease := es.client.Lease
ctx, cancel := es.getCtx()
gResp, gErr := lease.Grant(ctx, getEffectiveTTL(r, wopts))
cancel()
if gErr != nil {
return gErr
}
opOpts = []clientv3.OpOption{clientv3.WithLease(gResp.ID)}
} else {
opOpts = []clientv3.OpOption{clientv3.WithLease(0)}
}
ctx, cancel := es.getCtx()
_, err = kv.Txn(ctx).Then(
clientv3.OpPut(prefix+r.Key, jsonStringRecord, opOpts...),
clientv3.OpPut(suffix+reverseString(r.Key), jsonStringRecord, opOpts...),
).Commit()
cancel()
return err
}
// Process a Get response taking into account the provided offset
func processGetResponse(resp *clientv3.GetResponse, offset int64) ([]*store.Record, error) {
result := make([]*store.Record, 0, len(resp.Kvs))
for index, kvs := range resp.Kvs {
if int64(index) < offset {
// skip entries before the offset
continue
}
value := &store.Record{}
err := json.Unmarshal(kvs.Value, value)
if err != nil {
return nil, err
}
result = append(result, value)
}
return result, nil
}
// Process a List response taking into account the provided offset.
// The reverse flag will be used to reverse the keys found. For example,
// "zyxw" will be reversed to "wxyz". This is used for suffix searches,
// where the keys are stored reversed and need to be changed
func processListResponse(resp *clientv3.GetResponse, offset int64, reverse bool) ([]string, error) {
result := make([]string, 0, len(resp.Kvs))
for index, kvs := range resp.Kvs {
if int64(index) < offset {
// skip entries before the offset
continue
}
targetKey := string(kvs.Key)
if reverse {
targetKey = reverseString(targetKey)
}
result = append(result, targetKey)
}
return result, nil
}
// Perform an exact key read and return the result
func (es *EtcdStore) directRead(kv clientv3.KV, key string) ([]*store.Record, error) {
ctx, cancel := es.getCtx()
resp, err := kv.Get(ctx, key)
cancel()
if err != nil {
return nil, err
}
if len(resp.Kvs) == 0 {
return nil, store.ErrNotFound
}
return processGetResponse(resp, 0)
}
// Perform a prefix read with limit and offset. A limit of 0 will return all
// results. Usage of offset isn't recommended because those results must still
// be fethed from the server in order to be discarded.
func (es *EtcdStore) prefixRead(kv clientv3.KV, key string, limit, offset int64) ([]*store.Record, error) {
getOptions := []clientv3.OpOption{
clientv3.WithPrefix(),
}
if limit > 0 {
getOptions = append(getOptions, clientv3.WithLimit(limit+offset))
}
ctx, cancel := es.getCtx()
resp, err := kv.Get(ctx, key, getOptions...)
cancel()
if err != nil {
return nil, err
}
return processGetResponse(resp, offset)
}
// Perform a prefix + suffix read with limit and offset. A limit of 0 will
// return all results found. Usage of this function is discouraged because
// we'll have to request a prefix search and match the suffix manually. This
// means that even with a limit = 3 and offset = 0, there is no guarantee
// we'll find all the results we need within that range, and we'll likely
// need to request more data from the server. The number of requests we need
// to perform is unknown and might cause load.
func (es *EtcdStore) prefixSuffixRead(kv clientv3.KV, prefix, suffix string, limit, offset int64) ([]*store.Record, error) {
firstKeyOut := firstKeyOutOfPrefixString(prefix)
getOptions := []clientv3.OpOption{
clientv3.WithRange(firstKeyOut),
}
if limit > 0 {
// unlikely to find all the entries we need within offset + limit
getOptions = append(getOptions, clientv3.WithLimit((limit+offset)*2))
}
var currentRecordOffset int64
result := []*store.Record{}
initialKey := prefix
keepGoing := true
for keepGoing {
ctx, cancel := es.getCtx()
resp, respErr := kv.Get(ctx, initialKey, getOptions...)
cancel()
if respErr != nil {
return nil, respErr
}
records, err := processGetResponse(resp, 0)
if err != nil {
return nil, err
}
for _, record := range records {
if !strings.HasSuffix(record.Key, suffix) {
continue
}
if currentRecordOffset < offset {
currentRecordOffset++
continue
}
if !shouldFinish(int64(len(result)), limit) {
result = append(result, record)
if shouldFinish(int64(len(result)), limit) {
break
}
}
}
if !resp.More || shouldFinish(int64(len(result)), limit) {
keepGoing = false
} else {
initialKey = string(append(resp.Kvs[len(resp.Kvs)-1].Key, 0)) // append byte 0 (nul char) to the last key
}
}
return result, nil
}
// Read records from the etcd server based in the key. Database and Table
// options are highly recommended, otherwise we'll use a default one (which
// might not have the requested keys)
//
// If no prefix or suffix option is provided, we'll read the record matching
// the provided key. Note that a list of records will be provided anyway,
// likely with only one record (the one requested)
//
// Prefix and suffix options are supported and should perform fine even with
// a large amount of data. Note that the limit option should also be included
// in order to limit the amount of records we need to fetch.
//
// Note that using both prefix and suffix options at the same time is possible
// but discouraged. A prefix search will be send to the etcd server, and from
// there we'll manually pick the records matching the suffix. This might become
// very inefficient since we might need to request more data to the etcd
// multiple times in order to provide the results asked.
// Usage of the offset option is also discouraged because we'll have to request
// records that we'll have to skip manually on our side.
//
// Don't rely on any particular order of the keys. The records are expected to
// be sorted by key except if the suffix option (suffix without prefix) is
// used. In this case, the keys will be sorted based on the reversed key
func (es *EtcdStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) {
ropts := store.ReadOptions{}
for _, opt := range opts {
opt(&ropts)
}
prefix := buildPrefix(ropts.Database, ropts.Table, prefixNS)
suffix := buildPrefix(ropts.Database, ropts.Table, suffixNS)
kv := es.client.KV
preKv := namespace.NewKV(kv, prefix)
sufKv := namespace.NewKV(kv, suffix)
if ropts.Prefix && ropts.Suffix {
return es.prefixSuffixRead(preKv, key, key, int64(ropts.Limit), int64(ropts.Offset))
}
if ropts.Prefix {
return es.prefixRead(preKv, key, int64(ropts.Limit), int64(ropts.Offset))
}
if ropts.Suffix {
return es.prefixRead(sufKv, reverseString(key), int64(ropts.Limit), int64(ropts.Offset))
}
return es.directRead(preKv, key)
}
// Delete the record containing the key provided. Database and Table
// options are highly recommended, otherwise we'll use a default one (which
// might not have the requested keys)
//
// Since the Write method inserts 2 entries for a given key, those both
// entries will also be removed using the same key. This is handled
// transparently.
func (es *EtcdStore) Delete(key string, opts ...store.DeleteOption) error {
dopts := store.DeleteOptions{}
for _, opt := range opts {
opt(&dopts)
}
prefix := buildPrefix(dopts.Database, dopts.Table, prefixNS)
suffix := buildPrefix(dopts.Database, dopts.Table, suffixNS)
kv := es.client.KV
ctx, cancel := es.getCtx()
_, err := kv.Txn(ctx).Then(
clientv3.OpDelete(prefix+key),
clientv3.OpDelete(suffix+reverseString(key)),
).Commit()
cancel()
return err
}
// List the keys based on the provided prefix. Use the empty string (and no
// limit nor offset) to list all keys available.
// Limit and offset options are available to limit the keys we need to return.
// The reverse option will reverse the keys before returning them. Use it when
// listing the keys from the suffix KV.
//
// Note that values for the keys won't be requested to the etcd server, that's
// why the reverse option is important
func (es *EtcdStore) listKeys(kv clientv3.KV, prefixKey string, limit, offset int64, reverse bool) ([]string, error) {
getOptions := []clientv3.OpOption{
clientv3.WithKeysOnly(),
clientv3.WithPrefix(),
}
if limit > 0 {
getOptions = append(getOptions, clientv3.WithLimit(limit+offset))
}
ctx, cancel := es.getCtx()
resp, err := kv.Get(ctx, prefixKey, getOptions...)
cancel()
if err != nil {
return nil, err
}
return processListResponse(resp, offset, reverse)
}
// List the keys matching both prefix and suffix, with the provided limit and
// offset. Usage of this function is discouraged because we'll have to match
// the suffix manually on our side, which means we'll likely need to perform
// additional requests to the etcd server to get more results matching all the
// requirements.
func (es *EtcdStore) prefixSuffixList(kv clientv3.KV, prefix, suffix string, limit, offset int64) ([]string, error) {
firstKeyOut := firstKeyOutOfPrefixString(prefix)
getOptions := []clientv3.OpOption{
clientv3.WithKeysOnly(),
clientv3.WithRange(firstKeyOut),
}
if firstKeyOut == "" {
// could happen of all bytes are "\xff"
getOptions = getOptions[:1] // remove the WithRange option
}
if limit > 0 {
// unlikely to find all the entries we need within offset + limit
getOptions = append(getOptions, clientv3.WithLimit((limit+offset)*2))
}
var currentRecordOffset int64
result := []string{}
initialKey := prefix
keepGoing := true
for keepGoing {
ctx, cancel := es.getCtx()
resp, respErr := kv.Get(ctx, initialKey, getOptions...)
cancel()
if respErr != nil {
return nil, respErr
}
keys, err := processListResponse(resp, 0, false)
if err != nil {
return nil, err
}
for _, key := range keys {
if !strings.HasSuffix(key, suffix) {
continue
}
if currentRecordOffset < offset {
currentRecordOffset++
continue
}
if !shouldFinish(int64(len(result)), limit) {
result = append(result, key)
if shouldFinish(int64(len(result)), limit) {
break
}
}
}
if !resp.More || shouldFinish(int64(len(result)), limit) {
keepGoing = false
} else {
initialKey = string(append(resp.Kvs[len(resp.Kvs)-1].Key, 0)) // append byte 0 (nul char) to the last key
}
}
return result, nil
}
// List the keys available in the etcd server. Database and Table
// options are highly recommended, otherwise we'll use a default one (which
// might not have the requested keys)
//
// With the Database and Table options, all the keys returned will be within
// that database and table. Each service is expected to use a different
// database + table, so using those options will list only the keys used by
// that particular service.
//
// Prefix and suffix options are available along with the limit and offset
// ones.
//
// Using prefix and suffix options at the same time is discourage because
// the suffix matching will be done on our side, and we'll likely need to
// perform multiple requests to get the requested results. Note that using
// just the suffix option is fine.
// In addition, using the offset option is also discouraged because we'll
// need to request additional keys that will be skipped on our side.
func (es *EtcdStore) List(opts ...store.ListOption) ([]string, error) {
lopts := store.ListOptions{}
for _, opt := range opts {
opt(&lopts)
}
prefix := buildPrefix(lopts.Database, lopts.Table, prefixNS)
suffix := buildPrefix(lopts.Database, lopts.Table, suffixNS)
kv := es.client.KV
preKv := namespace.NewKV(kv, prefix)
sufKv := namespace.NewKV(kv, suffix)
if lopts.Prefix != "" && lopts.Suffix != "" {
return es.prefixSuffixList(preKv, lopts.Prefix, lopts.Suffix, int64(lopts.Limit), int64(lopts.Offset))
}
if lopts.Prefix != "" {
return es.listKeys(preKv, lopts.Prefix, int64(lopts.Limit), int64(lopts.Offset), false)
}
if lopts.Suffix != "" {
return es.listKeys(sufKv, reverseString(lopts.Suffix), int64(lopts.Limit), int64(lopts.Offset), true)
}
return es.listKeys(preKv, "", int64(lopts.Limit), int64(lopts.Offset), false)
}
// Close the client
func (es *EtcdStore) Close() error {
return es.client.Close()
}
// Return the service name
func (es *EtcdStore) String() string {
return "Etcd"
}

View File

@@ -0,0 +1,65 @@
package etcd
import (
"strings"
)
// Returns true if the limit isn't 0 AND is greater or equal to the number
// of results.
// If the limit is 0 or the number of items is less than the number of items,
// it will return false
func shouldFinish(numberOfResults, limit int64) bool {
if limit == 0 || numberOfResults < limit {
return false
}
return true
}
// Return the first key out of the prefix represented by the parameter,
// as a byte sequence. Note that it applies to byte sequences and not
// rune sequences, so it might be ill-suited for multi-byte chars
func firstKeyOutOfPrefix(src []byte) []byte {
dst := make([]byte, len(src))
copy(dst, src)
var i int
for i = len(dst) - 1; i >= 0; i-- {
if dst[i] < 255 {
dst[i]++
break
}
}
return dst[:i+1]
}
// Return the first key out of the prefix represented by the parameter.
// This function relies on the firstKeyOutOfPrefix one, which uses a byte
// sequence, so it might be ill-suited if the string contains multi-byte chars.
func firstKeyOutOfPrefixString(src string) string {
srcBytes := []byte(src)
dstBytes := firstKeyOutOfPrefix(srcBytes)
return string(dstBytes)
}
// Reverse the string based on the containing runes
func reverseString(s string) string {
r := []rune(s)
for i, j := 0, len(r)-1; i < len(r)/2; i, j = i+1, j-1 {
r[i], r[j] = r[j], r[i]
}
return string(r)
}
// Build a string based on the parts, to be used as a prefix. Empty string is
// expected if no part is passed as parameter.
// The string will contain all the parts separated by '/'. The last char will
// also be '/'
//
// For example `buildPrefix(P1, P2, P3)` will return "P1/P2/P3/"
func buildPrefix(parts ...string) string {
var b strings.Builder
for _, part := range parts {
b.WriteString(part)
b.WriteRune('/')
}
return b.String()
}

View File

@@ -0,0 +1,513 @@
package memory
import (
"container/list"
"context"
"strings"
"sync"
"time"
"github.com/armon/go-radix"
"go-micro.dev/v4/store"
)
// In-memory store implementation using radix tree for fast prefix and suffix
// searches.
// Insertions are expected to be a bit slow due to the data structures, but
// searches are expected to be fast, including exact key search, as well as
// prefix and suffix searches (based on the number of elements to be returned).
// Prefix+suffix search isn't optimized and will depend on how many items we
// need to skip.
// It's also recommended to use reasonable limits when using prefix or suffix
// searches because we'll need to traverse the data structures to provide the
// results. The traversal will stop a soon as we have the required number of
// results, so it will be faster if we use a short limit.
//
// The overall performance will depend on how the radix trees are built.
// The number of elements won't directly affect the performance but how the
// keys are dispersed. The more dispersed the keys are, the faster the search
// will be, regardless of the number of keys. This happens due to the number
// of hops we need to do to reach the target element.
// This also mean that if the keys are too similar, the performance might be
// slower than expected even if the number of elements isn't too big.
type MemStore struct {
preRadix *radix.Tree
sufRadix *radix.Tree
evictionList *list.List
options store.Options
lockGlob sync.RWMutex
lockEvicList sync.RWMutex // Read operation will modify the eviction list
}
type storeRecord struct {
Key string
Value []byte
Metadata map[string]interface{}
Expiry time.Duration
ExpiresAt time.Time
}
type contextKey string
var targetContextKey contextKey
// Prepare a context to be used with the memory implementation. The context
// is used to set up custom parameters to the specific implementation.
// In this case, you can configure the maximum capacity for the MemStore
// implementation as shown below.
// ```
// cache := NewMemStore(
// store.WithContext(
// NewContext(
// ctx,
// map[string]interface{}{
// "maxCap": 50,
// },
// ),
// ),
// )
// ```
//
// Available options for the MemStore are:
// * "maxCap" -> 512 (int) The maximum number of elements the cache will hold.
// Adding additional elements will remove old elements to ensure we aren't over
// the maximum capacity.
//
// For convenience, this can also be used for the MultiMemStore.
func NewContext(ctx context.Context, storeParams map[string]interface{}) context.Context {
return context.WithValue(ctx, targetContextKey, storeParams)
}
// Create a new MemStore instance
func NewMemStore(opts ...store.Option) store.Store {
m := &MemStore{}
_ = m.Init(opts...)
return m
}
// Get the maximum capacity configured. If no maxCap has been configured
// (via `NewContext`), 512 will be used as maxCap.
func (m *MemStore) getMaxCap() int {
maxCap := 512
ctx := m.options.Context
if ctx == nil {
return maxCap
}
ctxValue := ctx.Value(targetContextKey)
if ctxValue == nil {
return maxCap
}
additionalOpts := ctxValue.(map[string]interface{})
confCap, exists := additionalOpts["maxCap"]
if exists {
maxCap = confCap.(int)
}
return maxCap
}
// Initialize the MemStore. If the MemStore was used, this will reset
// all the internal structures and the new options (passed as parameters)
// will be used.
func (m *MemStore) Init(opts ...store.Option) error {
optList := store.Options{}
for _, opt := range opts {
opt(&optList)
}
m.lockGlob.Lock()
defer m.lockGlob.Unlock()
m.preRadix = radix.New()
m.sufRadix = radix.New()
m.evictionList = list.New()
m.options = optList
return nil
}
// Get the options being used
func (m *MemStore) Options() store.Options {
m.lockGlob.RLock()
defer m.lockGlob.RUnlock()
return m.options
}
// Write the record in the MemStore.
// Note that Database and Table options will be ignored.
// Expiration options will take the following precedence:
// TTL option > expiration option > TTL record
//
// New elements will take the last position in the eviction list. Updating
// an element will also move the element to the last position.
//
// Although not recommended, new elements might be inserted with an
// already-expired date
func (m *MemStore) Write(r *store.Record, opts ...store.WriteOption) error {
var element *list.Element
wopts := store.WriteOptions{}
for _, opt := range opts {
opt(&wopts)
}
cRecord := toStoreRecord(r, wopts)
m.lockGlob.Lock()
defer m.lockGlob.Unlock()
ele, exists := m.preRadix.Get(cRecord.Key)
if exists {
element = ele.(*list.Element)
element.Value = cRecord
m.evictionList.MoveToBack(element)
} else {
if m.evictionList.Len() >= m.getMaxCap() {
elementToDelete := m.evictionList.Front()
if elementToDelete != nil {
recordToDelete := elementToDelete.Value.(*storeRecord)
_, _ = m.preRadix.Delete(recordToDelete.Key)
_, _ = m.sufRadix.Delete(recordToDelete.Key)
m.evictionList.Remove(elementToDelete)
}
}
element = m.evictionList.PushBack(cRecord)
_, _ = m.preRadix.Insert(cRecord.Key, element)
_, _ = m.sufRadix.Insert(reverseString(cRecord.Key), element)
}
return nil
}
// Read the key from the MemStore. A list of records will be returned even if
// you're asking for the exact key (only one record is expected in that case).
//
// Reading the exact element will move such element to the last position of
// the eviction list. This WON'T apply for prefix and / or suffix reads.
//
// This method guarantees that no expired element will be returned. For the
// case of exact read, the element will be removed and a "not found" error
// will be returned.
// For prefix and suffix reads, all the elements that we traverse through
// will be removed. This includes the elements we need to skip as well as
// the elements that might have gotten into the the result. Note that the
// elements that are over the limit won't be touched
//
// All read options are supported except Database and Table.
//
// For prefix and prefix+suffix options, the records will be returned in
// alphabetical order on the keys.
// For the suffix option (just suffix, no prefix), the records will be
// returned in alphabetical order after reversing the keys. This means,
// reverse all the keys and then sort them alphabetically. This just affects
// the sorting order; the keys will be returned as expected.
// This means that ["aboz", "caaz", "ziuz"] will be sorted as ["caaz", "aboz", "ziuz"]
// for the key "z" as suffix.
//
// Note that offset are supported but not recommended. There is no direct access
// to the record X. We'd need to skip all the records until we reach the specified
// offset, which could be problematic.
// Performance for prefix and suffix searches should be good assuming we limit
// the number of results we need to return.
func (m *MemStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) {
var element *list.Element
ropts := store.ReadOptions{}
for _, opt := range opts {
opt(&ropts)
}
if !ropts.Prefix && !ropts.Suffix {
m.lockGlob.RLock()
ele, exists := m.preRadix.Get(key)
if !exists {
m.lockGlob.RUnlock()
return nil, store.ErrNotFound
}
element = ele.(*list.Element)
record := element.Value.(*storeRecord)
if record.Expiry != 0 && record.ExpiresAt.Before(time.Now()) {
// record expired -> need to delete
m.lockGlob.RUnlock()
m.lockGlob.Lock()
defer m.lockGlob.Unlock()
m.evictionList.Remove(element)
_, _ = m.preRadix.Delete(key)
_, _ = m.sufRadix.Delete(reverseString(key))
return nil, store.ErrNotFound
}
m.lockEvicList.Lock()
m.evictionList.MoveToBack(element)
m.lockEvicList.Unlock()
foundRecords := []*store.Record{
fromStoreRecord(record),
}
m.lockGlob.RUnlock()
return foundRecords, nil
}
records := []*store.Record{}
expiredElements := make(map[string]*list.Element)
m.lockGlob.RLock()
if ropts.Prefix && ropts.Suffix {
// if we need to check both prefix and suffix, go through the
// prefix tree and skip elements without the right suffix. We
// don't need to check the suffix tree because the elements
// must be in both trees
m.preRadix.WalkPrefix(key, m.radixTreeCallBackCheckSuffix(ropts.Offset, ropts.Limit, key, &records, expiredElements))
} else {
if ropts.Prefix {
m.preRadix.WalkPrefix(key, m.radixTreeCallBack(ropts.Offset, ropts.Limit, &records, expiredElements))
}
if ropts.Suffix {
m.sufRadix.WalkPrefix(reverseString(key), m.radixTreeCallBack(ropts.Offset, ropts.Limit, &records, expiredElements))
}
}
m.lockGlob.RUnlock()
// if there are expired elements, get a write lock and delete the expired elements
if len(expiredElements) > 0 {
m.lockGlob.Lock()
for key, element := range expiredElements {
m.evictionList.Remove(element)
_, _ = m.preRadix.Delete(key)
_, _ = m.sufRadix.Delete(reverseString(key))
}
m.lockGlob.Unlock()
}
return records, nil
}
// Remove the record based on the key. It won't return any error if it's missing
//
// Database and Table options aren't supported
func (m *MemStore) Delete(key string, opts ...store.DeleteOption) error {
m.lockGlob.Lock()
defer m.lockGlob.Unlock()
ele, exists := m.preRadix.Get(key)
if exists {
element := ele.(*list.Element)
m.evictionList.Remove(element)
_, _ = m.preRadix.Delete(key)
_, _ = m.sufRadix.Delete(reverseString(key))
}
return nil
}
// List the keys currently used in the MemStore
//
// All options are supported except Database and Table
//
// For prefix and prefix+suffix options, the keys will be returned in
// alphabetical order.
// For the suffix option (just suffix, no prefix), the keys will be
// returned in alphabetical order after reversing the keys. This means,
// reverse all the keys and then sort them alphabetically. This just affects
// the sorting order; the keys will be returned as expected.
// This means that ["aboz", "caaz", "ziuz"] will be sorted as ["caaz", "aboz", "ziuz"]
func (m *MemStore) List(opts ...store.ListOption) ([]string, error) {
records := []string{}
expiredElements := make(map[string]*list.Element)
lopts := store.ListOptions{}
for _, opt := range opts {
opt(&lopts)
}
if lopts.Prefix == "" && lopts.Suffix == "" {
m.lockGlob.RLock()
m.preRadix.Walk(m.radixTreeCallBackKeysOnly(lopts.Offset, lopts.Limit, &records, expiredElements))
m.lockGlob.RUnlock()
// if there are expired elements, get a write lock and delete the expired elements
if len(expiredElements) > 0 {
m.lockGlob.Lock()
for key, element := range expiredElements {
m.evictionList.Remove(element)
_, _ = m.preRadix.Delete(key)
_, _ = m.sufRadix.Delete(reverseString(key))
}
m.lockGlob.Unlock()
}
return records, nil
}
m.lockGlob.RLock()
if lopts.Prefix != "" && lopts.Suffix != "" {
// if we need to check both prefix and suffix, go through the
// prefix tree and skip elements without the right suffix. We
// don't need to check the suffix tree because the elements
// must be in both trees
m.preRadix.WalkPrefix(lopts.Prefix, m.radixTreeCallBackKeysOnlyWithSuffix(lopts.Offset, lopts.Limit, lopts.Suffix, &records, expiredElements))
} else {
if lopts.Prefix != "" {
m.preRadix.WalkPrefix(lopts.Prefix, m.radixTreeCallBackKeysOnly(lopts.Offset, lopts.Limit, &records, expiredElements))
}
if lopts.Suffix != "" {
m.sufRadix.WalkPrefix(reverseString(lopts.Suffix), m.radixTreeCallBackKeysOnly(lopts.Offset, lopts.Limit, &records, expiredElements))
}
}
m.lockGlob.RUnlock()
// if there are expired elements, get a write lock and delete the expired elements
if len(expiredElements) > 0 {
m.lockGlob.Lock()
for key, element := range expiredElements {
m.evictionList.Remove(element)
_, _ = m.preRadix.Delete(key)
_, _ = m.sufRadix.Delete(reverseString(key))
}
m.lockGlob.Unlock()
}
return records, nil
}
func (m *MemStore) Close() error {
return nil
}
func (m *MemStore) String() string {
return "RadixMemStore"
}
func (m *MemStore) Len() (int, bool) {
eLen := m.evictionList.Len()
pLen := m.preRadix.Len()
sLen := m.sufRadix.Len()
if eLen == pLen && eLen == sLen {
return eLen, true
}
return 0, false
}
func (m *MemStore) radixTreeCallBack(offset, limit uint, result *[]*store.Record, expiredElements map[string]*list.Element) radix.WalkFn {
currentIndex := new(uint) // needs to be a pointer so the value persist across callback calls
maxIndex := new(uint) // needs to be a pointer so the value persist across callback calls
*maxIndex = offset + limit
return func(key string, value interface{}) bool {
element := value.(*list.Element)
record := element.Value.(*storeRecord)
if record.Expiry != 0 && record.ExpiresAt.Before(time.Now()) {
// record has expired -> add element to the expiredElements map
// and jump directly to the next element without increasing the index
expiredElements[record.Key] = element
return false
}
if *currentIndex >= offset && (*currentIndex < *maxIndex || *maxIndex == offset) {
// if it's within expected range, add a copy to the results
*result = append(*result, fromStoreRecord(record))
}
*currentIndex++
if *currentIndex < *maxIndex || *maxIndex == offset {
return false
}
return true
}
}
func (m *MemStore) radixTreeCallBackCheckSuffix(offset, limit uint, presuf string, result *[]*store.Record, expiredElements map[string]*list.Element) radix.WalkFn {
currentIndex := new(uint) // needs to be a pointer so the value persist across callback calls
maxIndex := new(uint) // needs to be a pointer so the value persist across callback calls
*maxIndex = offset + limit
return func(key string, value interface{}) bool {
if !strings.HasSuffix(key, presuf) {
return false
}
element := value.(*list.Element)
record := element.Value.(*storeRecord)
if record.Expiry != 0 && record.ExpiresAt.Before(time.Now()) {
// record has expired -> add element to the expiredElements map
// and jump directly to the next element without increasing the index
expiredElements[record.Key] = element
return false
}
if *currentIndex >= offset && (*currentIndex < *maxIndex || *maxIndex == offset) {
*result = append(*result, fromStoreRecord(record))
}
*currentIndex++
if *currentIndex < *maxIndex || *maxIndex == offset {
return false
}
return true
}
}
func (m *MemStore) radixTreeCallBackKeysOnly(offset, limit uint, result *[]string, expiredElements map[string]*list.Element) radix.WalkFn {
currentIndex := new(uint) // needs to be a pointer so the value persist across callback calls
maxIndex := new(uint) // needs to be a pointer so the value persist across callback calls
*maxIndex = offset + limit
return func(key string, value interface{}) bool {
element := value.(*list.Element)
record := element.Value.(*storeRecord)
if record.Expiry != 0 && record.ExpiresAt.Before(time.Now()) {
// record has expired -> add element to the expiredElements map
// and jump directly to the next element without increasing the index
expiredElements[record.Key] = element
return false
}
if *currentIndex >= offset && (*currentIndex < *maxIndex || *maxIndex == offset) {
*result = append(*result, record.Key)
}
*currentIndex++
if *currentIndex < *maxIndex || *maxIndex == offset {
return false
}
return true
}
}
func (m *MemStore) radixTreeCallBackKeysOnlyWithSuffix(offset, limit uint, presuf string, result *[]string, expiredElements map[string]*list.Element) radix.WalkFn {
currentIndex := new(uint) // needs to be a pointer so the value persist across callback calls
maxIndex := new(uint) // needs to be a pointer so the value persist across callback calls
*maxIndex = offset + limit
return func(key string, value interface{}) bool {
if !strings.HasSuffix(key, presuf) {
return false
}
element := value.(*list.Element)
record := element.Value.(*storeRecord)
if record.Expiry != 0 && record.ExpiresAt.Before(time.Now()) {
// record has expired -> add element to the expiredElements map
// and jump directly to the next element without increasing the index
expiredElements[record.Key] = element
return false
}
if *currentIndex >= offset && (*currentIndex < *maxIndex || *maxIndex == offset) {
*result = append(*result, record.Key)
}
*currentIndex++
if *currentIndex < *maxIndex || *maxIndex == offset {
return false
}
return true
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,158 @@
package memory
import (
"sync"
"go-micro.dev/v4/store"
)
// In-memory store implementation using multiple MemStore to provide support
// for multiple databases and tables.
// Each table will be mapped to its own MemStore, which will be completely
// isolated from the rest. In particular, each MemStore will have its own
// capacity, so it's possible to have 10 MemStores with full capacity (512
// by default)
//
// The options will be the same for all MemStores unless they're explicitly
// initialized otherwise.
//
// Since each MemStore is isolated, the required synchronization caused by
// concurrency will be minimal if the threads use different tables
type MultiMemStore struct {
storeMap map[string]*MemStore
storeMapLock sync.RWMutex
genOpts []store.Option
}
// Create a new MultiMemStore. A new MemStore will be mapped based on the options.
// A default MemStore will be mapped if no Database and Table aren't used.
func NewMultiMemStore(opts ...store.Option) store.Store {
m := &MultiMemStore{
storeMap: make(map[string]*MemStore),
genOpts: opts,
}
_ = m.Init(opts...)
return m
}
func (m *MultiMemStore) getMemStore(prefix string) *MemStore {
m.storeMapLock.RLock()
mStore, exists := m.storeMap[prefix]
if exists {
m.storeMapLock.RUnlock()
return mStore
}
m.storeMapLock.RUnlock()
// if not exists
newStore := NewMemStore(m.genOpts...).(*MemStore)
m.storeMapLock.Lock()
m.storeMap[prefix] = newStore
m.storeMapLock.Unlock()
return newStore
}
// Initialize the mapped MemStore based on the Database and Table values
// from the options with the same options. The target MemStore will be
// reinitialized if needed.
func (m *MultiMemStore) Init(opts ...store.Option) error {
optList := store.Options{}
for _, opt := range opts {
opt(&optList)
}
prefix := optList.Database + "/" + optList.Table
mStore := m.getMemStore(prefix)
return mStore.Init(opts...)
}
// Get the options used to create the MultiMemStore.
// Specific options for each MemStore aren't available
func (m *MultiMemStore) Options() store.Options {
optList := store.Options{}
for _, opt := range m.genOpts {
opt(&optList)
}
return optList
}
// Write the record in the target MemStore based on the Database and Table
// values from the options. A default MemStore will be used if no Database
// and Table options are provided.
// The write options will be forwarded to the target MemStore
func (m *MultiMemStore) Write(r *store.Record, opts ...store.WriteOption) error {
wopts := store.WriteOptions{}
for _, opt := range opts {
opt(&wopts)
}
prefix := wopts.Database + "/" + wopts.Table
mStore := m.getMemStore(prefix)
return mStore.Write(r, opts...)
}
// Read the matching records in the target MemStore based on the Database and Table
// values from the options. A default MemStore will be used if no Database
// and Table options are provided.
// The read options will be forwarded to the target MemStore.
//
// The expectations regarding the results (sort order, eviction policies, etc)
// will be the same as the target MemStore
func (m *MultiMemStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) {
ropts := store.ReadOptions{}
for _, opt := range opts {
opt(&ropts)
}
prefix := ropts.Database + "/" + ropts.Table
mStore := m.getMemStore(prefix)
return mStore.Read(key, opts...)
}
// Delete the matching records in the target MemStore based on the Database and Table
// values from the options. A default MemStore will be used if no Database
// and Table options are provided.
//
// Matching records from other Tables won't be affected. In fact, we won't
// access to other Tables
func (m *MultiMemStore) Delete(key string, opts ...store.DeleteOption) error {
dopts := store.DeleteOptions{}
for _, opt := range opts {
opt(&dopts)
}
prefix := dopts.Database + "/" + dopts.Table
mStore := m.getMemStore(prefix)
return mStore.Delete(key, opts...)
}
// List the keys in the target MemStore based on the Database and Table
// values from the options. A default MemStore will be used if no Database
// and Table options are provided.
// The list options will be forwarded to the target MemStore.
func (m *MultiMemStore) List(opts ...store.ListOption) ([]string, error) {
lopts := store.ListOptions{}
for _, opt := range opts {
opt(&lopts)
}
prefix := lopts.Database + "/" + lopts.Table
mStore := m.getMemStore(prefix)
return mStore.List(opts...)
}
func (m *MultiMemStore) Close() error {
return nil
}
func (m *MultiMemStore) String() string {
return "MultiRadixMemStore"
}

View File

@@ -0,0 +1,172 @@
package memory
import (
"context"
"strconv"
"testing"
"go-micro.dev/v4/store"
)
func TestWriteReadTables(t *testing.T) {
cache := NewMultiMemStore()
record1 := &store.Record{
Key: "sameKey",
Value: []byte("from record1"),
}
record2 := &store.Record{
Key: "sameKey",
Value: []byte("from record2"),
}
_ = cache.Write(record1)
_ = cache.Write(record2, store.WriteTo("DB02", "Table02"))
records1, _ := cache.Read("sameKey")
if len(records1) != 1 {
t.Fatalf("Wrong number of records, expected 1, got %d", len(records1))
}
if records1[0].Key != "sameKey" {
t.Errorf("Wrong key, expected \"sameKey\", got %s", records1[0].Key)
}
if string(records1[0].Value) != "from record1" {
t.Errorf("Wrong value, expected \"from record1\", got %s", string(records1[0].Value))
}
records2, _ := cache.Read("sameKey", store.ReadFrom("DB02", "Table02"))
if len(records2) != 1 {
t.Fatalf("Wrong number of records, expected 1, got %d", len(records2))
}
if records2[0].Key != "sameKey" {
t.Errorf("Wrong key, expected \"sameKey\", got %s", records2[0].Key)
}
if string(records2[0].Value) != "from record2" {
t.Errorf("Wrong value, expected \"from record2\", got %s", string(records2[0].Value))
}
}
func TestDeleteTables(t *testing.T) {
cache := NewMultiMemStore()
record1 := &store.Record{
Key: "sameKey",
Value: []byte("from record1"),
}
record2 := &store.Record{
Key: "sameKey",
Value: []byte("from record2"),
}
_ = cache.Write(record1)
_ = cache.Write(record2, store.WriteTo("DB02", "Table02"))
records1, _ := cache.Read("sameKey")
if len(records1) != 1 {
t.Fatalf("Wrong number of records, expected 1, got %d", len(records1))
}
if records1[0].Key != "sameKey" {
t.Errorf("Wrong key, expected \"sameKey\", got %s", records1[0].Key)
}
if string(records1[0].Value) != "from record1" {
t.Errorf("Wrong value, expected \"from record1\", got %s", string(records1[0].Value))
}
records2, _ := cache.Read("sameKey", store.ReadFrom("DB02", "Table02"))
if len(records2) != 1 {
t.Fatalf("Wrong number of records, expected 1, got %d", len(records2))
}
if records2[0].Key != "sameKey" {
t.Errorf("Wrong key, expected \"sameKey\", got %s", records2[0].Key)
}
if string(records2[0].Value) != "from record2" {
t.Errorf("Wrong value, expected \"from record2\", got %s", string(records2[0].Value))
}
_ = cache.Delete("sameKey")
if _, err := cache.Read("sameKey"); err != store.ErrNotFound {
t.Errorf("Key \"sameKey\" still exists after deletion")
}
records2, _ = cache.Read("sameKey", store.ReadFrom("DB02", "Table02"))
if len(records2) != 1 {
t.Fatalf("Wrong number of records, expected 1, got %d", len(records2))
}
if records2[0].Key != "sameKey" {
t.Errorf("Wrong key, expected \"sameKey\", got %s", records2[0].Key)
}
if string(records2[0].Value) != "from record2" {
t.Errorf("Wrong value, expected \"from record2\", got %s", string(records2[0].Value))
}
}
func TestListTables(t *testing.T) {
cache := NewMultiMemStore()
record1 := &store.Record{
Key: "key001",
Value: []byte("from record1"),
}
record2 := &store.Record{
Key: "key002",
Value: []byte("from record2"),
}
_ = cache.Write(record1)
_ = cache.Write(record2, store.WriteTo("DB02", "Table02"))
keys, _ := cache.List(store.ListFrom("DB02", "Table02"))
expectedKeys := []string{"key002"}
if len(keys) != 1 {
t.Fatalf("Wrong number of keys, expected 1, got %d", len(keys))
}
for index, key := range keys {
if expectedKeys[index] != key {
t.Errorf("Wrong key for index %d, expected %s, got %s", index, expectedKeys[index], key)
}
}
}
func TestWriteSizeLimit(t *testing.T) {
cache := NewMultiMemStore(
store.WithContext(
NewContext(
context.Background(),
map[string]interface{}{
"maxCap": 2,
},
),
),
)
record := &store.Record{}
for i := 0; i < 4; i++ {
v := strconv.Itoa(i)
record.Key = v
record.Value = []byte(v)
_ = cache.Write(record)
_ = cache.Write(record, store.WriteTo("DB02", "Table02"))
}
keys1, _ := cache.List()
expectedKeys1 := []string{"2", "3"}
if len(keys1) != 2 {
t.Fatalf("Wrong number of keys, expected 2, got %d", len(keys1))
}
for index, key := range keys1 {
if expectedKeys1[index] != key {
t.Errorf("Wrong key for index %d, expected %s, got %s", index, expectedKeys1[index], key)
}
}
keys2, _ := cache.List(store.ListFrom("DB02", "Table02"))
expectedKeys2 := []string{"2", "3"}
if len(keys2) != 2 {
t.Fatalf("Wrong number of keys, expected 2, got %d", len(keys2))
}
for index, key := range keys2 {
if expectedKeys2[index] != key {
t.Errorf("Wrong key for index %d, expected %s, got %s", index, expectedKeys2[index], key)
}
}
}

View File

@@ -0,0 +1,63 @@
package memory
import (
"time"
"go-micro.dev/v4/store"
)
func toStoreRecord(src *store.Record, options store.WriteOptions) *storeRecord {
newRecord := &storeRecord{}
newRecord.Key = src.Key
newRecord.Value = make([]byte, len(src.Value))
copy(newRecord.Value, src.Value)
// set base ttl duration and expiration time based on the record
newRecord.Expiry = src.Expiry
if src.Expiry != 0 {
newRecord.ExpiresAt = time.Now().Add(src.Expiry)
}
// overwrite ttl duration and expiration time based on options
if !options.Expiry.IsZero() {
// options.Expiry is a time.Time, newRecord.Expiry is a time.Duration
newRecord.Expiry = time.Until(options.Expiry)
newRecord.ExpiresAt = options.Expiry
}
// TTL option takes precedence over expiration time
if options.TTL != 0 {
newRecord.Expiry = options.TTL
newRecord.ExpiresAt = time.Now().Add(options.TTL)
}
newRecord.Metadata = make(map[string]interface{})
for k, v := range src.Metadata {
newRecord.Metadata[k] = v
}
return newRecord
}
func fromStoreRecord(src *storeRecord) *store.Record {
newRecord := &store.Record{}
newRecord.Key = src.Key
newRecord.Value = make([]byte, len(src.Value))
copy(newRecord.Value, src.Value)
if src.Expiry != 0 {
newRecord.Expiry = time.Until(src.ExpiresAt)
}
newRecord.Metadata = make(map[string]interface{})
for k, v := range src.Metadata {
newRecord.Metadata[k] = v
}
return newRecord
}
func reverseString(s string) string {
r := []rune(s)
for i, j := 0, len(r)-1; i < len(r)/2; i, j = i+1, j-1 {
r[i], r[j] = r[j], r[i]
}
return string(r)
}

91
ocis-pkg/store/store.go Normal file
View File

@@ -0,0 +1,91 @@
package store
import (
"context"
"strings"
"github.com/owncloud/ocis/v2/ocis-pkg/store/etcd"
"github.com/owncloud/ocis/v2/ocis-pkg/store/memory"
"go-micro.dev/v4/store"
)
var (
storeEnv = "OCIS_STORE"
storeAddressEnv = "OCIS_STORE_ADDRESS"
storeOCMemSize = "OCIS_STORE_OCMEM_SIZE"
)
var ocMemStore *store.Store
type OcisStoreOptions struct {
Type string
Address string
Size int
}
// Get the configured key-value store to be used.
//
// Each microservice (or whatever piece is using the store) should use the
// options available in the interface's operations to choose the right database
// and table to prevent collisions with other microservices.
// Recommended approach is to use "services" or "ocis-pkg" for the database,
// and "services/<service-name>/" or "ocis-pkg/<pkg>/" for the package name.
//
// So far, only the name of the store and the node addresses are configurable
// via environment variables.
// Available options for "OCIS_STORE" are:
// * "noop", for a noop store (it does nothing)
// * "etcd", for etcd
// * "ocmem", custom in-memory implementation, with fixed size and optimized prefix
// and suffix search
// * "memory", for a in-memory implementation, which is the default if noone matches
//
// "OCIS_STORE_ADDRESS" is a comma-separated list of nodes that the store
// will use. This is currently usable only with the etcd implementation. If it
// isn't provided, "127.0.0.1:2379" will be the only node used.
//
// "OCIS_STORE_OCMEM_SIZE" will configure the maximum capacity of the cache for
// the "ocmem" implementation, in number of items that the cache can hold per table.
// You can use "OCIS_STORE_OCMEM_SIZE=5000" so the cache will hold up to 5000 elements.
// The parameter only affects to the "ocmem" implementation, the rest will ignore it.
// If an invalid value is used, the default will be used instead, so up to 512 elements
// the cache will hold.
func GetStore(ocisOpts OcisStoreOptions) store.Store {
var s store.Store
addresses := strings.Split(ocisOpts.Address, ",")
opts := []store.Option{
store.Nodes(addresses...),
}
switch ocisOpts.Type {
case "noop":
s = store.NewNoopStore(opts...)
case "etcd":
s = etcd.NewEtcdStore(opts...)
case "ocmem":
if ocMemStore == nil {
var memStore store.Store
sizeNum := ocisOpts.Size
if sizeNum <= 0 {
memStore = memory.NewMultiMemStore()
} else {
memStore = memory.NewMultiMemStore(
store.WithContext(
memory.NewContext(
context.Background(),
map[string]interface{}{
"maxCap": sizeNum,
},
)),
)
}
ocMemStore = &memStore
}
s = *ocMemStore
default:
s = store.NewMemoryStore(opts...)
}
return s
}

View File

@@ -0,0 +1,8 @@
package config
// CacheStore defines the available configuration for the cache store
type CacheStore struct {
Type string `yaml:"type" env:"OCIS_CACHE_STORE_TYPE;GRAPH_CACHE_STORE_TYPE" desc:"The type of the cache store. Valid options are \"noop\", \"ocmem\", \"etcd\" and \"memory\""`
Address string `yaml:"address" env:"OCIS_CACHE_STORE_ADDRESS;GRAPH_CACHE_STORE_ADDRESS" desc:"a comma-separated list of addresses to connect to. Only for etcd"`
Size int `yaml:"size" env:"OCIS_CACHE_STORE_SIZE;GRAPH_CACHE_STORE_SIZE" desc:"Maximum size for the cache store. Only ocmem will use this option, in number of items per table. The rest will ignore the option and can grow indefinitely"`
}

View File

@@ -12,9 +12,10 @@ type Config struct {
Service Service `yaml:"-"`
Tracing *Tracing `yaml:"tracing"`
Log *Log `yaml:"log"`
Debug Debug `yaml:"debug"`
Tracing *Tracing `yaml:"tracing"`
Log *Log `yaml:"log"`
CacheStore *CacheStore `yaml:"cache_store"`
Debug Debug `yaml:"debug"`
HTTP HTTP `yaml:"http"`

View File

@@ -96,6 +96,16 @@ func EnsureDefaults(cfg *config.Config) {
cfg.Tracing = &config.Tracing{}
}
if cfg.CacheStore == nil && cfg.Commons != nil && cfg.Commons.CacheStore != nil {
cfg.CacheStore = &config.CacheStore{
Type: cfg.Commons.CacheStore.Type,
Address: cfg.Commons.CacheStore.Address,
Size: cfg.Commons.CacheStore.Size,
}
} else if cfg.CacheStore == nil {
cfg.CacheStore = &config.CacheStore{}
}
if cfg.TokenManager == nil && cfg.Commons != nil && cfg.Commons.TokenManager != nil {
cfg.TokenManager = &config.TokenManager{
JWTSecret: cfg.Commons.TokenManager.JWTSecret,

View File

@@ -6,7 +6,6 @@ import (
"io/ioutil"
"net/http"
"strconv"
"time"
"github.com/ReneKroon/ttlcache/v2"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
@@ -16,6 +15,7 @@ import (
ocisldap "github.com/owncloud/ocis/v2/ocis-pkg/ldap"
"github.com/owncloud/ocis/v2/ocis-pkg/roles"
"github.com/owncloud/ocis/v2/ocis-pkg/service/grpc"
"github.com/owncloud/ocis/v2/ocis-pkg/store"
settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0"
"github.com/owncloud/ocis/v2/services/graph/pkg/identity"
"github.com/owncloud/ocis/v2/services/graph/pkg/identity/ldap"
@@ -144,9 +144,13 @@ func NewService(opts ...Option) Service {
roleManager := options.RoleManager
if roleManager == nil {
storeOptions := store.OcisStoreOptions{
Type: options.Config.CacheStore.Type,
Address: options.Config.CacheStore.Address,
Size: options.Config.CacheStore.Size,
}
m := roles.NewManager(
roles.CacheSize(1024),
roles.CacheTTL(time.Hour),
roles.StoreOptions(storeOptions),
roles.Logger(options.Logger),
roles.RoleService(svc.roleService),
)

View File

@@ -0,0 +1,8 @@
package config
// CacheStore defines the available configuration for the cache store
type CacheStore struct {
Type string `yaml:"type" env:"OCIS_CACHE_STORE_TYPE;OCS_CACHE_STORE_TYPE" desc:"The type of the cache store. Valid options are \"noop\", \"ocmem\", \"etcd\" and \"memory\""`
Address string `yaml:"address" env:"OCIS_CACHE_STORE_ADDRESS;OCS_CACHE_STORE_ADDRESS" desc:"a comma-separated list of addresses to connect to. Only for etcd"`
Size int `yaml:"size" env:"OCIS_CACHE_STORE_SIZE;OCS_CACHE_STORE_SIZE" desc:"Maximum size for the cache store. Only ocmem will use this option, in number of items per table. The rest will ignore the option and can grow indefinitely"`
}

View File

@@ -12,9 +12,10 @@ type Config struct {
Service Service `yaml:"-"`
Tracing *Tracing `yaml:"tracing"`
Log *Log `yaml:"log"`
Debug Debug `yaml:"debug"`
Tracing *Tracing `yaml:"tracing"`
Log *Log `yaml:"log"`
CacheStore *CacheStore `yaml:"cache_store"`
Debug Debug `yaml:"debug"`
HTTP HTTP `yaml:"http"`

View File

@@ -69,6 +69,16 @@ func EnsureDefaults(cfg *config.Config) {
cfg.Tracing = &config.Tracing{}
}
if cfg.CacheStore == nil && cfg.Commons != nil && cfg.Commons.CacheStore != nil {
cfg.CacheStore = &config.CacheStore{
Type: cfg.Commons.CacheStore.Type,
Address: cfg.Commons.CacheStore.Address,
Size: cfg.Commons.CacheStore.Size,
}
} else if cfg.CacheStore == nil {
cfg.CacheStore = &config.CacheStore{}
}
if cfg.Reva == nil && cfg.Commons != nil && cfg.Commons.Reva != nil {
cfg.Reva = &config.Reva{
Address: cfg.Commons.Reva.Address,

View File

@@ -2,10 +2,10 @@ package svc
import (
"net/http"
"time"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/owncloud/ocis/v2/ocis-pkg/service/grpc"
"github.com/owncloud/ocis/v2/ocis-pkg/store"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
@@ -42,9 +42,13 @@ func NewService(opts ...Option) Service {
}
roleManager := options.RoleManager
if roleManager == nil {
storeOptions := store.OcisStoreOptions{
Type: options.Config.CacheStore.Type,
Address: options.Config.CacheStore.Address,
Size: options.Config.CacheStore.Size,
}
m := roles.NewManager(
roles.CacheSize(1024),
roles.CacheTTL(time.Hour*24*7),
roles.StoreOptions(storeOptions),
roles.Logger(options.Logger),
roles.RoleService(roleService),
)