move cache to sync package

rollback indexer map
use sync.pool for cache entries
add tests for cache
remove main locks from nrwmutex and use sync.map and sync.pool instead
bump dockerfile go version
This commit is contained in:
Florian Schade
2021-01-16 16:38:01 +01:00
parent b81156fc57
commit f13530425a
12 changed files with 238 additions and 224 deletions

View File

@@ -1,11 +1,11 @@
FROM webhippie/golang:1.14 as build
FROM webhippie/golang:1.15 as build
COPY ./ /ocis/
ENV CGO_ENABLED=0
ENV GOOS=linux
RUN apk update && \
apk upgrade && \
apk upgrade --ignore musl-dev && \
apk add make gcc bash && \
rm -rf /var/cache/apk/*

View File

@@ -6,7 +6,7 @@ import (
"crypto/sha256"
"encoding/hex"
"fmt"
"github.com/owncloud/ocis/ocis-pkg/cache"
"github.com/owncloud/ocis/ocis-pkg/sync"
"golang.org/x/crypto/bcrypt"
"path"
"regexp"
@@ -33,7 +33,7 @@ import (
)
// passwordValidCache caches basic auth password validations
var passwordValidCache = cache.NewCache(1024)
var passwordValidCache = sync.NewCache(1024)
// passwordValidCacheExpiration defines the entry lifetime
const passwordValidCacheExpiration = 10 * time.Minute

View File

@@ -1,16 +1,16 @@
Enhancement: remove locking from accounts service
Tags: ocis
Tags: accounts, ocis-pkg
In the past we locked every request in the accounts service. This is problematic as soon as we start to hammer the system with many users at the same time.
In the past we locked every request in the accounts service. This is problematic as a larger number of concurrent requests arrives at the accounts service.
The locking is now removed from the accounts service and is moved to the indexer.
Instead of doing locking for reads and writes we now differentiate them by using a named RWLock.
- remove locking from accounts service
- add a cached named rwlock pkg
- use sync.map in the cache pkg
- use named rwlock in indexer pkg
- use sync.map in indexer pkg
- add sync package with named mutex
- add named locking to indexer
- move cache into sync pkg
https://github.com/owncloud/ocis/pull/1212
https://github.com/owncloud/ocis/issues/966

View File

@@ -1,84 +0,0 @@
package cache
import (
"sync"
"time"
)
// Entry represents an entry on the cache. You can type assert on V.
type Entry struct {
V interface{}
expiration time.Time
}
// Cache is a barebones cache implementation.
type Cache struct {
sync.Map
sizeTotal int
sizeCurrent int
}
// NewCache returns a new instance of Cache.
func NewCache(sizeTotal int) Cache {
return Cache{
sizeTotal: sizeTotal,
}
}
// Get gets an entry by given key
func (c *Cache) Get(k string) *Entry {
if sme, ok := c.Load(k); ok {
e := sme.(*Entry)
if c.expired(e) {
c.Delete(k)
return nil
}
return e
}
return nil
}
// Set adds an entry for given key and value
func (c *Cache) Set(k string, val interface{}, expiration time.Time) {
if !c.fits() {
c.evict()
}
c.Store(k, &Entry{
val,
expiration,
})
c.sizeCurrent++
}
// Unset removes an entry by given key
func (c *Cache) Unset(k string) bool {
if _, ok := c.Load(k); !ok {
return false
}
c.Delete(k)
c.sizeCurrent--
return true
}
// evict frees memory from the cache by removing entries that exceeded the cache TTL.
func (c *Cache) evict() {
c.Range(func(k, sme interface{}) bool {
e := sme.(*Entry)
if c.expired(e) {
c.Delete(k)
c.sizeCurrent--
}
return true
})
}
// expired checks if an entry is expired
func (c *Cache) expired(e *Entry) bool {
return e.expiration.Before(time.Now())
}
// fits returns whether the cache fits more entries.
func (c *Cache) fits() bool {
return c.sizeTotal > c.sizeCurrent
}

View File

@@ -3,6 +3,10 @@ package indexer
import (
"fmt"
"github.com/owncloud/ocis/ocis-pkg/sync"
"path"
"strings"
"github.com/CiscoM31/godata"
"github.com/iancoleman/strcase"
"github.com/owncloud/ocis/ocis-pkg/indexer/config"
@@ -12,17 +16,13 @@ import (
_ "github.com/owncloud/ocis/ocis-pkg/indexer/index/disk" // to populate index
"github.com/owncloud/ocis/ocis-pkg/indexer/option"
"github.com/owncloud/ocis/ocis-pkg/indexer/registry"
"github.com/owncloud/ocis/ocis-pkg/sync"
"path"
"strings"
)
// Indexer is a facade to configure and query over multiple indices.
type Indexer struct {
config *config.Config
indices typeMap
mu sync.NRWMutex
mu sync.NRWMutex
}
// IdxAddResult represents the result of an Add call on an index
@@ -49,8 +49,8 @@ func getRegistryStrategy(cfg *config.Config) string {
// Reset takes care of deleting all indices from storage and from the internal map of indices
func (i *Indexer) Reset() error {
for j := range i.indices.allTypeMappings() {
for _, indices := range i.indices.getTypeMapping(j).IndicesByField {
for j := range i.indices {
for _, indices := range i.indices[j].IndicesByField {
for _, idx := range indices {
err := idx.Delete()
if err != nil {
@@ -58,7 +58,7 @@ func (i *Indexer) Reset() error {
}
}
}
i.indices.deleteTypeMapping(j)
delete(i.indices, j)
}
return nil
@@ -107,7 +107,7 @@ func (i *Indexer) Add(t interface{}) ([]IdxAddResult, error) {
defer i.mu.Unlock(typeName)
var results []IdxAddResult
if fields := i.indices.getTypeMapping(typeName); fields != nil {
if fields, ok := i.indices[typeName]; ok {
for _, indices := range fields.IndicesByField {
for _, idx := range indices {
pkVal := valueOf(t, fields.PKFieldName)
@@ -135,7 +135,7 @@ func (i *Indexer) FindBy(t interface{}, field string, val string) ([]string, err
defer i.mu.RUnlock(typeName)
resultPaths := make([]string, 0)
if fields := i.indices.getTypeMapping(typeName); fields != nil {
if fields, ok := i.indices[typeName]; ok {
for _, idx := range fields.IndicesByField[strcase.ToCamel(field)] {
idxVal := val
res, err := idx.Lookup(idxVal)
@@ -169,7 +169,7 @@ func (i *Indexer) Delete(t interface{}) error {
i.mu.Lock(typeName)
defer i.mu.Unlock(typeName)
if fields := i.indices.getTypeMapping(typeName); fields != nil {
if fields, ok := i.indices[typeName]; ok {
for _, indices := range fields.IndicesByField {
for _, idx := range indices {
pkVal := valueOf(t, fields.PKFieldName)
@@ -192,7 +192,7 @@ func (i *Indexer) FindByPartial(t interface{}, field string, pattern string) ([]
defer i.mu.RUnlock(typeName)
resultPaths := make([]string, 0)
if fields := i.indices.getTypeMapping(typeName); fields != nil {
if fields, ok := i.indices[typeName]; ok {
for _, idx := range fields.IndicesByField[strcase.ToCamel(field)] {
res, err := idx.Search(pattern)
if err != nil {
@@ -231,7 +231,7 @@ func (i *Indexer) Update(from, to interface{}) error {
return fmt.Errorf("update types do not match: from %v to %v", typeNameFrom, typeNameTo)
}
if fields := i.indices.getTypeMapping(typeNameFrom); fields != nil {
if fields, ok := i.indices[typeNameFrom]; ok {
for fName, indices := range fields.IndicesByField {
oldV := valueOf(from, fName)
newV := valueOf(to, fName)

View File

@@ -1,55 +1,27 @@
package indexer
import (
"github.com/owncloud/ocis/ocis-pkg/indexer/index"
"sync"
)
import "github.com/owncloud/ocis/ocis-pkg/indexer/index"
// typeMap stores the indexer layout at runtime.
type fieldName = string
type typeMap map[tName]typeMapping
type tName = string
type typeMap struct {
sync.Map
}
type fieldName = string
type typeMapping struct {
PKFieldName string
IndicesByField map[fieldName][]index.Index
}
func (m *typeMap) allTypeMappings() map[tName]*typeMapping {
var rv map[tName]*typeMapping
m.Range(func(key, value interface{}) bool {
rv[key.(string)] = value.(*typeMapping)
return true
})
return rv
}
func (m *typeMap) getTypeMapping(typeName string) *typeMapping {
if value, ok := m.Load(typeName); ok {
return value.(*typeMapping)
}
return nil
}
func (m *typeMap) deleteTypeMapping(typeName string) {
m.Delete(typeName)
}
func (m *typeMap) addIndex(typeName string, pkName string, idx index.Index) {
if val, ok := m.Load(typeName); ok {
rval := val.(*typeMapping)
rval.IndicesByField[idx.IndexBy()] = append(rval.IndicesByField[idx.IndexBy()], idx)
func (m typeMap) addIndex(typeName string, pkName string, idx index.Index) {
if val, ok := m[typeName]; ok {
val.IndicesByField[idx.IndexBy()] = append(val.IndicesByField[idx.IndexBy()], idx)
return
}
m.Store(typeName, &typeMapping{
m[typeName] = typeMapping{
PKFieldName: pkName,
IndicesByField: map[string][]index.Index{
idx.IndexBy(): {idx},
},
})
}
}

97
ocis-pkg/sync/cache.go Normal file
View File

@@ -0,0 +1,97 @@
package sync
import (
"sync"
"time"
)
// Cache is a barebones cache implementation.
type Cache struct {
entries sync.Map
pool sync.Pool
capacity int
length int
}
// CacheEntry represents an entry on the cache. You can type assert on V.
type CacheEntry struct {
V interface{}
expiration time.Time
}
// NewCache returns a new instance of Cache.
func NewCache(capacity int) Cache {
return Cache{
capacity: capacity,
pool: sync.Pool{New: func() interface{} {
return new(CacheEntry)
}},
}
}
// Get gets an entry by given key
func (c *Cache) Get(key string) *CacheEntry {
if mapEntry, ok := c.entries.Load(key); ok {
entry := mapEntry.(*CacheEntry)
if c.expired(entry) {
c.entries.Delete(key)
return nil
}
return entry
}
return nil
}
// Set adds an entry for given key and value
func (c *Cache) Set(key string, val interface{}, expiration time.Time) {
if !c.fits() {
c.evict()
}
poolEntry := c.pool.Get()
if mapEntry, loaded := c.entries.LoadOrStore(key, poolEntry); loaded {
entry := mapEntry.(*CacheEntry)
entry.V = val
entry.expiration = expiration
c.pool.Put(poolEntry)
} else {
entry := poolEntry.(*CacheEntry)
entry.V = val
entry.expiration = expiration
c.length++
}
}
// Unset removes an entry by given key
func (c *Cache) Unset(key string) bool {
if _, loaded := c.entries.LoadAndDelete(key); !loaded {
return false
}
c.length--
return true
}
// evict frees memory from the cache by removing entries that exceeded the cache TTL.
func (c *Cache) evict() {
c.entries.Range(func(key, mapEntry interface{}) bool {
entry := mapEntry.(*CacheEntry)
if c.expired(entry) {
c.entries.Delete(key)
c.length--
}
return true
})
}
// expired checks if an entry is expired
func (c *Cache) expired(e *CacheEntry) bool {
return e.expiration.Before(time.Now())
}
// fits returns whether the cache fits more entries.
func (c *Cache) fits() bool {
return c.capacity > c.length
}

View File

@@ -0,0 +1,54 @@
package sync
import (
"github.com/stretchr/testify/assert"
"strconv"
"testing"
"time"
)
func TestCache_Get(t *testing.T) {
size := 1024
c := NewCache(size)
for i := 0; i < size; i++ {
c.Set(strconv.Itoa(i), i, time.Now().Add(10*time.Second))
}
for i := 0; i < size; i++ {
assert.Equal(t, i, c.Get(strconv.Itoa(i)).V, "entry value is the same")
}
assert.Nil(t, c.Get("unknown"), "entry is nil if unknown")
wait := 10 * time.Millisecond
c.Set("expired", size, time.Now().Add(wait))
time.Sleep(wait + 1)
assert.Nil(t, c.Get(strconv.Itoa(size)), "entry is nil if it's expired")
}
func TestCache_Set(t *testing.T) {
c := NewCache(1)
c.Set("new", "new", time.Now().Add(10*time.Millisecond))
assert.Equal(t, "new", c.Get("new").V, "new entries can be added")
assert.Equal(t, 1, c.length, "adding new entries will increase the cache size")
replacedExpiration := time.Now().Add(10 * time.Millisecond)
c.Set("new", "updated", replacedExpiration)
assert.Equal(t, "updated", c.Get("new").V, "entry values can be updated")
assert.Equal(t, replacedExpiration, c.Get("new").expiration, "entry expiration can be updated")
time.Sleep(11 * time.Millisecond)
c.Set("eviction", "eviction", time.Now())
assert.Equal(t, 1, c.length, "expired entries get removed")
}
func TestCache_Unset(t *testing.T) {
c := NewCache(1)
c.Set("new", "new", time.Now().Add(10*time.Millisecond))
c.Unset("new")
assert.Nil(t, c.Get("new"), "entries can be removed")
assert.Equal(t, 0, c.length, "removing a entry decreases the cache size")
}

View File

@@ -1,74 +0,0 @@
package sync
import (
"sync"
)
// NRWMutex works the same as RWMutex, the only difference is that it stores mutexes in a map and reuses them.
// It's handy if you want to write-lock, write-unlock, read-lock and read-unlock for specific names only.
type NRWMutex struct {
m sync.Mutex
mm map[string]*nrw
}
type nrw struct {
m sync.RWMutex
c int
}
// NewNRWMutex returns a new instance of NRWMutex.
func NewNRWMutex() NRWMutex {
return NRWMutex{mm: make(map[string]*nrw)}
}
// Lock locks rw for writing.
func (c *NRWMutex) Lock(k string) {
c.m.Lock()
m := c.get(k)
m.c++
c.m.Unlock()
m.m.Lock()
}
// Unlock unlocks rw for writing.
func (c *NRWMutex) Unlock(k string) {
c.m.Lock()
defer c.m.Unlock()
m := c.get(k)
m.m.Unlock()
m.c--
if m.c == 0 {
delete(c.mm, k)
}
}
// RLock locks rw for reading.
func (c *NRWMutex) RLock(k string) {
c.m.Lock()
m := c.get(k)
m.c++
c.m.Unlock()
m.m.RLock()
}
// RUnlock undoes a single RLock call.
func (c *NRWMutex) RUnlock(k string) {
c.m.Lock()
defer c.m.Unlock()
m := c.get(k)
m.m.RUnlock()
m.c--
if m.c == 0 {
delete(c.mm, k)
}
}
func (c *NRWMutex) get(k string) *nrw {
m, ok := c.mm[k]
if !ok {
m = &nrw{}
c.mm[k] = m
}
return m
}

49
ocis-pkg/sync/nrwmutex.go Normal file
View File

@@ -0,0 +1,49 @@
package sync
import (
"sync"
)
// NRWMutex works the same as RWMutex, the only difference is that it stores mutexes in a map and reuses them.
// It's handy if you want to write-lock, write-unlock, read-lock and read-unlock for specific names only.
type NRWMutex struct {
pool sync.Pool
mus sync.Map
}
// NewNRWMutex returns a new instance of NRWMutex.
func NewNRWMutex() NRWMutex {
return NRWMutex{pool: sync.Pool{New: func() interface{} {
return new(sync.RWMutex)
}}}
}
// Lock locks rw for writing.
func (m *NRWMutex) Lock(name string) {
m.loadOrStore(name).Lock()
}
// Unlock unlocks rw for writing.
func (m *NRWMutex) Unlock(name string) {
m.loadOrStore(name).Unlock()
}
// RLock locks rw for reading.
func (m *NRWMutex) RLock(name string) {
m.loadOrStore(name).RLock()
}
// RUnlock undoes a single RLock call.
func (m *NRWMutex) RUnlock(name string) {
m.loadOrStore(name).RUnlock()
}
func (m *NRWMutex) loadOrStore(name string) *sync.RWMutex {
pmu := m.pool.Get()
mmu, loaded := m.mus.LoadOrStore(name, pmu)
if loaded {
m.pool.Put(pmu)
}
return mmu.(*sync.RWMutex)
}

View File

@@ -6,16 +6,16 @@ import (
"testing"
)
func HammerMutex(m *NRWMutex, loops int, cdone chan bool) {
func HammerMutex(m *NRWMutex, loops int, c chan bool) {
for i := 0; i < loops; i++ {
id := fmt.Sprintf("%v", i)
m.Lock(id)
m.Unlock(id)
}
cdone <- true
c <- true
}
func TestMutex(t *testing.T) {
func TestNRWMutex(t *testing.T) {
if n := runtime.SetMutexProfileFraction(1); n != 0 {
t.Logf("got mutexrate %d expected 0", n)
}

View File

@@ -10,9 +10,9 @@ import (
"github.com/dgrijalva/jwt-go"
gOidc "github.com/coreos/go-oidc"
"github.com/owncloud/ocis/ocis-pkg/cache"
"github.com/owncloud/ocis/ocis-pkg/log"
"github.com/owncloud/ocis/ocis-pkg/oidc"
"github.com/owncloud/ocis/ocis-pkg/sync"
"golang.org/x/oauth2"
)
@@ -24,7 +24,7 @@ type OIDCProvider interface {
// OIDCAuth provides a middleware to check access secured by a static token.
func OIDCAuth(optionSetters ...Option) func(next http.Handler) http.Handler {
options := newOptions(optionSetters...)
tokenCache := cache.NewCache(options.UserinfoCacheSize)
tokenCache := sync.NewCache(options.UserinfoCacheSize)
h := oidcAuth{
logger: options.Logger,
@@ -74,7 +74,7 @@ type oidcAuth struct {
providerFunc func() (OIDCProvider, error)
httpClient *http.Client
oidcIss string
tokenCache *cache.Cache
tokenCache *sync.Cache
tokenCacheTTL time.Duration
}