remove locking from accounts service

add a cached named rwlock pkg
use sync.map in the cache pkg
use named rwlock in indexer pkg
use sync.map in indexer pkg
remove husky
This commit is contained in:
Florian Schade
2021-01-14 22:07:58 +01:00
parent f97f0d7342
commit a02fb890f7
11 changed files with 10554 additions and 131 deletions

View File

@@ -11,7 +11,6 @@ import (
"path"
"regexp"
"strconv"
"sync"
"time"
"github.com/owncloud/ocis/ocis-pkg/log"
@@ -33,11 +32,8 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
)
// accLock mutually exclude readers from writers on account files
var accLock sync.Mutex
// passwordValidCache caches basic auth password validations
var passwordValidCache = cache.NewCache(cache.Size(1024))
var passwordValidCache = cache.NewCache(1024)
// passwordValidCacheExpiration defines the entry lifetime
const passwordValidCacheExpiration = 10 * time.Minute
@@ -133,9 +129,6 @@ func (s Service) ListAccounts(ctx context.Context, in *proto.ListAccountsRequest
}
onlySelf := hasSelf && !hasManagement
accLock.Lock()
defer accLock.Unlock()
teardownServiceUser := s.serviceUserToIndex()
defer teardownServiceUser()
match, authRequest := getAuthQueryMatch(in.Query)
@@ -172,11 +165,8 @@ func (s Service) ListAccounts(ctx context.Context, in *proto.ListAccountsRequest
// - it checks if the cache already contains an entry that matches found account Id // account PasswordProfile.LastPasswordChangeDateTime (k)
// - if no entry exists it runs the bcrypt.CompareHashAndPassword as before and if everything is ok it stores the
// result by the (k) as key and (v) as value. If not it errors
// - if a entry is found it checks if the given value matches (v). If it doesnt match the cache entry gets removed
// - if a entry is found it checks if the given value matches (v). If it doesnt match, the cache entry gets removed
// and it errors.
//
// if many concurrent requests from the same user come in within a short period of time, it's possible that e is still nil.
// This is why all this needs to be wrapped within a sync.Mutex locking to prevent calling bcrypt.CompareHashAndPassword unnecessarily too often.
{
var suspicious bool
@@ -284,8 +274,6 @@ func (s Service) GetAccount(ctx context.Context, in *proto.GetAccountRequest, ou
}
onlySelf := hasSelf && !hasManagement
accLock.Lock()
defer accLock.Unlock()
var id string
if id, err = cleanupID(in.Id); err != nil {
return merrors.InternalServerError(s.id, "could not clean up account id: %v", err.Error())
@@ -331,8 +319,6 @@ func (s Service) CreateAccount(ctx context.Context, in *proto.CreateAccountReque
return merrors.Forbidden(s.id, "no permission for CreateAccount")
}
accLock.Lock()
defer accLock.Unlock()
var id string
if in.Account == nil {
@@ -468,8 +454,6 @@ func (s Service) UpdateAccount(ctx context.Context, in *proto.UpdateAccountReque
}
onlySelf := hasSelf && !hasManagement
accLock.Lock()
defer accLock.Unlock()
var id string
if in.Account == nil {
return merrors.BadRequest(s.id, "account missing")
@@ -633,8 +617,6 @@ func (s Service) DeleteAccount(ctx context.Context, in *proto.DeleteAccountReque
return merrors.Forbidden(s.id, "no permission for DeleteAccount")
}
accLock.Lock()
defer accLock.Unlock()
var id string
if id, err = cleanupID(in.Id); err != nil {
return merrors.InternalServerError(s.id, "could not clean up account id: %v", err.Error())

View File

@@ -0,0 +1,16 @@
Enhancement: remove locking from accounts service
Tags: ocis
In the past we locked every request in the accounts service. This is problematic as soon as we start to hammer the system with many users at the same time.
The locking is now removed from the accounts service and is moved to the indexer.
Instead of doing locking for reads and writes we now differentiate them by using a named RWLock.
- remove locking from accounts service
- add a cached named rwlock pkg
- use sync.map in the cache pkg
- use named rwlock in indexer pkg
- use sync.map in indexer pkg
https://github.com/owncloud/ocis/issues/966

View File

@@ -13,68 +13,64 @@ type Entry struct {
// Cache is a barebones cache implementation.
type Cache struct {
entries map[string]*Entry
size int
m sync.Mutex
sync.Map
sizeTotal int
sizeCurrent int
}
// NewCache returns a new instance of Cache.
func NewCache(o ...Option) Cache {
opts := newOptions(o...)
func NewCache(sizeTotal int) Cache {
return Cache{
size: opts.size,
entries: map[string]*Entry{},
sizeTotal: sizeTotal,
}
}
// Get gets an entry by given key
func (c *Cache) Get(k string) *Entry {
c.m.Lock()
defer c.m.Unlock()
if _, ok := c.entries[k]; ok {
if c.expired(c.entries[k]) {
delete(c.entries, k)
if sme, ok := c.Load(k); ok {
e := sme.(*Entry)
if c.expired(e) {
c.Delete(k)
return nil
}
return c.entries[k]
return e
}
return nil
}
// Set adds an entry for given key and value
func (c *Cache) Set(k string, val interface{}, expiration time.Time) {
c.m.Lock()
defer c.m.Unlock()
if !c.fits() {
c.evict()
}
c.entries[k] = &Entry{
c.Store(k, &Entry{
val,
expiration,
}
})
c.sizeCurrent++
}
// Unset removes an entry by given key
func (c *Cache) Unset(k string) bool {
if _, ok := c.entries[k]; !ok {
if _, ok := c.Load(k); !ok {
return false
}
delete(c.entries, k)
c.Delete(k)
c.sizeCurrent--
return true
}
// evict frees memory from the cache by removing entries that exceeded the cache TTL.
func (c *Cache) evict() {
for i := range c.entries {
if c.expired(c.entries[i]) {
delete(c.entries, i)
c.Range(func(k, sme interface{}) bool {
e := sme.(*Entry)
if c.expired(e) {
c.Delete(k)
c.sizeCurrent--
}
}
return true
})
}
// expired checks if an entry is expired
@@ -84,5 +80,5 @@ func (c *Cache) expired(e *Entry) bool {
// fits returns whether the cache fits more entries.
func (c *Cache) fits() bool {
return c.size > len(c.entries)
return c.sizeTotal > c.sizeCurrent
}

View File

@@ -1,36 +0,0 @@
package cache
import "time"
// Options are all the possible options.
type Options struct {
size int
ttl time.Duration
}
// Option mutates option
type Option func(*Options)
// Size configures the size of the cache in items.
func Size(s int) Option {
return func(o *Options) {
o.size = s
}
}
// TTL rebuilds the cache after the configured duration.
func TTL(ttl time.Duration) Option {
return func(o *Options) {
o.ttl = ttl
}
}
func newOptions(opts ...Option) Options {
o := Options{}
for _, v := range opts {
v(&o)
}
return o
}

View File

@@ -3,9 +3,6 @@ package indexer
import (
"fmt"
"path"
"strings"
"github.com/CiscoM31/godata"
"github.com/iancoleman/strcase"
"github.com/owncloud/ocis/ocis-pkg/indexer/config"
@@ -15,12 +12,17 @@ import (
_ "github.com/owncloud/ocis/ocis-pkg/indexer/index/disk" // to populate index
"github.com/owncloud/ocis/ocis-pkg/indexer/option"
"github.com/owncloud/ocis/ocis-pkg/indexer/registry"
"github.com/owncloud/ocis/ocis-pkg/sync"
"path"
"strings"
)
// Indexer is a facade to configure and query over multiple indices.
type Indexer struct {
config *config.Config
indices typeMap
mu sync.NRWMutex
}
// IdxAddResult represents the result of an Add call on an index
@@ -33,6 +35,7 @@ func CreateIndexer(cfg *config.Config) *Indexer {
return &Indexer{
config: cfg,
indices: typeMap{},
mu: sync.NewNRWMutex(),
}
}
@@ -45,9 +48,9 @@ func getRegistryStrategy(cfg *config.Config) string {
}
// Reset takes care of deleting all indices from storage and from the internal map of indices
func (i Indexer) Reset() error {
for j := range i.indices {
for _, indices := range i.indices[j].IndicesByField {
func (i *Indexer) Reset() error {
for j := range i.indices.allTypeMappings() {
for _, indices := range i.indices.getTypeMapping(j).IndicesByField {
for _, idx := range indices {
err := idx.Delete()
if err != nil {
@@ -55,29 +58,19 @@ func (i Indexer) Reset() error {
}
}
}
delete(i.indices, j)
i.indices.deleteTypeMapping(j)
}
return nil
}
// AddIndex adds a new index to the indexer receiver.
func (i Indexer) AddIndex(t interface{}, indexBy, pkName, entityDirName, indexType string, bound *option.Bound, caseInsensitive bool) error {
func (i *Indexer) AddIndex(t interface{}, indexBy, pkName, entityDirName, indexType string, bound *option.Bound, caseInsensitive bool) error {
strategy := getRegistryStrategy(i.config)
f := registry.IndexConstructorRegistry[strategy][indexType]
var idx index.Index
if strategy == "disk" {
idx = f(
option.CaseInsensitive(caseInsensitive),
option.WithEntity(t),
option.WithBounds(bound),
option.WithTypeName(getTypeFQN(t)),
option.WithIndexBy(indexBy),
option.WithFilesDir(path.Join(i.config.Repo.Disk.Path, entityDirName)),
option.WithDataDir(i.config.Repo.Disk.Path),
)
} else if strategy == "cs3" {
if strategy == "cs3" {
idx = f(
option.CaseInsensitive(caseInsensitive),
option.WithEntity(t),
@@ -90,6 +83,16 @@ func (i Indexer) AddIndex(t interface{}, indexBy, pkName, entityDirName, indexTy
option.WithProviderAddr(i.config.Repo.CS3.ProviderAddr),
option.WithServiceUser(i.config.ServiceUser),
)
} else {
idx = f(
option.CaseInsensitive(caseInsensitive),
option.WithEntity(t),
option.WithBounds(bound),
option.WithTypeName(getTypeFQN(t)),
option.WithIndexBy(indexBy),
option.WithFilesDir(path.Join(i.config.Repo.Disk.Path, entityDirName)),
option.WithDataDir(i.config.Repo.Disk.Path),
)
}
i.indices.addIndex(getTypeFQN(t), pkName, idx)
@@ -97,10 +100,14 @@ func (i Indexer) AddIndex(t interface{}, indexBy, pkName, entityDirName, indexTy
}
// Add a new entry to the indexer
func (i Indexer) Add(t interface{}) ([]IdxAddResult, error) {
func (i *Indexer) Add(t interface{}) ([]IdxAddResult, error) {
typeName := getTypeFQN(t)
i.mu.Lock(typeName)
defer i.mu.Unlock(typeName)
var results []IdxAddResult
if fields, ok := i.indices[typeName]; ok {
if fields := i.indices.getTypeMapping(typeName); fields != nil {
for _, indices := range fields.IndicesByField {
for _, idx := range indices {
pkVal := valueOf(t, fields.PKFieldName)
@@ -121,10 +128,14 @@ func (i Indexer) Add(t interface{}) ([]IdxAddResult, error) {
}
// FindBy finds a value on an index by field and value.
func (i Indexer) FindBy(t interface{}, field string, val string) ([]string, error) {
func (i *Indexer) FindBy(t interface{}, field string, val string) ([]string, error) {
typeName := getTypeFQN(t)
i.mu.RLock(typeName)
defer i.mu.RUnlock(typeName)
resultPaths := make([]string, 0)
if fields, ok := i.indices[typeName]; ok {
if fields := i.indices.getTypeMapping(typeName); fields != nil {
for _, idx := range fields.IndicesByField[strcase.ToCamel(field)] {
idxVal := val
res, err := idx.Lookup(idxVal)
@@ -152,9 +163,13 @@ func (i Indexer) FindBy(t interface{}, field string, val string) ([]string, erro
}
// Delete deletes all indexed fields of a given type t on the Indexer.
func (i Indexer) Delete(t interface{}) error {
func (i *Indexer) Delete(t interface{}) error {
typeName := getTypeFQN(t)
if fields, ok := i.indices[typeName]; ok {
i.mu.Lock(typeName)
defer i.mu.Unlock(typeName)
if fields := i.indices.getTypeMapping(typeName); fields != nil {
for _, indices := range fields.IndicesByField {
for _, idx := range indices {
pkVal := valueOf(t, fields.PKFieldName)
@@ -170,10 +185,14 @@ func (i Indexer) Delete(t interface{}) error {
}
// FindByPartial allows for glob search across all indexes.
func (i Indexer) FindByPartial(t interface{}, field string, pattern string) ([]string, error) {
func (i *Indexer) FindByPartial(t interface{}, field string, pattern string) ([]string, error) {
typeName := getTypeFQN(t)
i.mu.RLock(typeName)
defer i.mu.RUnlock(typeName)
resultPaths := make([]string, 0)
if fields, ok := i.indices[typeName]; ok {
if fields := i.indices.getTypeMapping(typeName); fields != nil {
for _, idx := range fields.IndicesByField[strcase.ToCamel(field)] {
res, err := idx.Search(pattern)
if err != nil {
@@ -201,14 +220,18 @@ func (i Indexer) FindByPartial(t interface{}, field string, pattern string) ([]s
}
// Update updates all indexes on a value <from> to a value <to>.
func (i Indexer) Update(from, to interface{}) error {
func (i *Indexer) Update(from, to interface{}) error {
typeNameFrom := getTypeFQN(from)
typeNameTo := getTypeFQN(to)
i.mu.Lock(typeNameFrom)
defer i.mu.Unlock(typeNameFrom)
if typeNameFrom != typeNameTo {
return fmt.Errorf("update types do not match: from %v to %v", typeNameFrom, typeNameTo)
}
if fields, ok := i.indices[typeNameFrom]; ok {
if fields := i.indices.getTypeMapping(typeNameFrom); fields != nil {
for fName, indices := range fields.IndicesByField {
oldV := valueOf(from, fName)
newV := valueOf(to, fName)
@@ -240,7 +263,7 @@ func (i Indexer) Update(from, to interface{}) error {
}
// Query parses an OData query into something our indexer.Index understands and resolves it.
func (i Indexer) Query(t interface{}, q string) ([]string, error) {
func (i *Indexer) Query(t interface{}, q string) ([]string, error) {
query, err := godata.ParseFilterString(q)
if err != nil {
return nil, err
@@ -263,7 +286,7 @@ func (i Indexer) Query(t interface{}, q string) ([]string, error) {
// conventions and be in PascalCase. For a better overview on this contemplate reading the reflection package under the
// indexer directory. Traversal of the tree happens in a pre-order fashion.
// TODO implement logic for `and` operators.
func (i Indexer) resolveTree(t interface{}, tree *queryTree, partials *[]string) error {
func (i *Indexer) resolveTree(t interface{}, tree *queryTree, partials *[]string) error {
if partials == nil {
return fmt.Errorf("return value cannot be nil: partials")
}

View File

@@ -1,27 +1,55 @@
package indexer
import "github.com/owncloud/ocis/ocis-pkg/indexer/index"
import (
"github.com/owncloud/ocis/ocis-pkg/indexer/index"
"sync"
)
// typeMap stores the indexer layout at runtime.
type typeMap map[tName]typeMapping
type tName = string
type fieldName = string
type tName = string
type typeMap struct {
sync.Map
}
type typeMapping struct {
PKFieldName string
IndicesByField map[fieldName][]index.Index
}
func (m typeMap) addIndex(typeName string, pkName string, idx index.Index) {
if val, ok := m[typeName]; ok {
val.IndicesByField[idx.IndexBy()] = append(val.IndicesByField[idx.IndexBy()], idx)
func (m *typeMap) allTypeMappings() map[tName]*typeMapping {
var rv map[tName]*typeMapping
m.Range(func(key, value interface{}) bool {
rv[key.(string)] = value.(*typeMapping)
return true
})
return rv
}
func (m *typeMap) getTypeMapping(typeName string) *typeMapping {
if value, ok := m.Load(typeName); ok {
return value.(*typeMapping)
}
return nil
}
func (m *typeMap) deleteTypeMapping(typeName string) {
m.Delete(typeName)
}
func (m *typeMap) addIndex(typeName string, pkName string, idx index.Index) {
if val, ok := m.Load(typeName); ok {
rval := val.(*typeMapping)
rval.IndicesByField[idx.IndexBy()] = append(rval.IndicesByField[idx.IndexBy()], idx)
return
}
m[typeName] = typeMapping{
m.Store(typeName, &typeMapping{
PKFieldName: pkName,
IndicesByField: map[string][]index.Index{
idx.IndexBy(): {idx},
},
}
})
}

67
ocis-pkg/sync/mutex.go Normal file
View File

@@ -0,0 +1,67 @@
package sync
import (
"sync"
)
type NRWMutex struct {
m sync.Mutex
mm map[string]*nrw
}
type nrw struct {
m sync.RWMutex
c int
}
func NewNRWMutex() NRWMutex {
return NRWMutex{mm: make(map[string]*nrw)}
}
func (c *NRWMutex) Lock(k string) {
c.m.Lock()
m := c.get(k)
m.c++
c.m.Unlock()
m.m.Lock()
}
func (c *NRWMutex) Unlock(k string) {
c.m.Lock()
defer c.m.Unlock()
m := c.get(k)
m.m.Unlock()
m.c--
if m.c == 0 {
delete(c.mm, k)
}
}
func (c *NRWMutex) RLock(k string) {
c.m.Lock()
m := c.get(k)
m.c++
c.m.Unlock()
m.m.RLock()
}
func (c *NRWMutex) RUnlock(k string) {
c.m.Lock()
defer c.m.Unlock()
m := c.get(k)
m.m.RUnlock()
m.c--
if m.c == 0 {
delete(c.mm, k)
}
}
func (c *NRWMutex) get(k string) *nrw {
m, ok := c.mm[k]
if !ok {
m = &nrw{}
c.mm[k] = m
}
return m
}

View File

@@ -0,0 +1,33 @@
package sync
import (
"fmt"
"runtime"
"testing"
)
func HammerMutex(m *NRWMutex, loops int, cdone chan bool) {
for i := 0; i < loops; i++ {
id := fmt.Sprintf("%v", i)
m.Lock(id)
m.Unlock(id)
}
cdone <- true
}
func TestMutex(t *testing.T) {
if n := runtime.SetMutexProfileFraction(1); n != 0 {
t.Logf("got mutexrate %d expected 0", n)
}
defer runtime.SetMutexProfileFraction(0)
m := NewNRWMutex()
c := make(chan bool)
r := 10
for i := 0; i < r; i++ {
go HammerMutex(&m, 2000, c)
}
for i := 0; i < r; i++ {
<-c
}
}

View File

@@ -24,7 +24,7 @@ type OIDCProvider interface {
// OIDCAuth provides a middleware to check access secured by a static token.
func OIDCAuth(optionSetters ...Option) func(next http.Handler) http.Handler {
options := newOptions(optionSetters...)
tokenCache := cache.NewCache(cache.Size(options.UserinfoCacheSize))
tokenCache := cache.NewCache(options.UserinfoCacheSize)
h := oidcAuth{
logger: options.Logger,

10320
tests/k6/package-lock.json generated Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -8,11 +8,6 @@
"build": "rollup -c",
"build:w": "rollup -c -w"
},
"husky": {
"hooks": {
"pre-commit": "lint-staged"
}
},
"lint-staged": {
"*.{ts}": [
"eslint --fix"
@@ -40,7 +35,6 @@
"eslint-plugin-jest": "^23.8.2",
"eslint-plugin-prettier": "^3.2.0",
"eslint-plugin-simple-import-sort": "^6.0.1",
"husky": "^4.3.0",
"jest": "^25.4.0",
"k6": "^0.0.0",
"lint-staged": "^10.1.7",