use plain pkg module

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
This commit is contained in:
Jörn Friedrich Dreyer
2025-01-13 15:54:00 +01:00
committed by Florian Schade
parent 259cbc2e56
commit b07b5a1149
841 changed files with 1383 additions and 1366 deletions

94
pkg/sync/cache.go Normal file
View File

@@ -0,0 +1,94 @@
package sync
import (
"sync"
"sync/atomic"
"time"
)
// Cache is a barebones cache implementation.
type Cache struct {
// capacity and length have to be the first words
// in order to be 64-aligned on 32-bit architectures.
capacity, length uint64 // access atomically
entries sync.Map
pool sync.Pool
}
// CacheEntry represents an entry on the cache. You can type assert on V.
type CacheEntry struct {
V interface{}
expiration time.Time
}
// NewCache returns a new instance of Cache.
func NewCache(capacity int) Cache {
return Cache{
capacity: uint64(capacity),
pool: sync.Pool{New: func() interface{} {
return new(CacheEntry)
}},
}
}
// Load loads an entry by given key
func (c *Cache) Load(key string) *CacheEntry {
if mapEntry, ok := c.entries.Load(key); ok {
entry := mapEntry.(*CacheEntry)
if c.expired(entry) {
c.entries.Delete(key)
return nil
}
return entry
}
return nil
}
// Store adds an entry for given key and value
func (c *Cache) Store(key string, val interface{}, expiration time.Time) {
if c.length > c.capacity {
c.evict()
}
poolEntry := c.pool.Get() //nolint: ifshort
if mapEntry, loaded := c.entries.LoadOrStore(key, poolEntry); loaded {
entry := mapEntry.(*CacheEntry)
entry.V = val
entry.expiration = expiration
c.pool.Put(poolEntry)
} else {
entry := poolEntry.(*CacheEntry)
entry.V = val
entry.expiration = expiration
atomic.AddUint64(&c.length, 1)
}
}
// Delete removes an entry by given key
func (c *Cache) Delete(key string) bool {
_, loaded := c.entries.LoadAndDelete(key)
if loaded {
atomic.AddUint64(&c.length, ^uint64(0))
}
return loaded
}
// evict frees memory from the cache by removing entries that exceeded the cache TTL.
func (c *Cache) evict() {
c.entries.Range(func(key, mapEntry interface{}) bool {
entry := mapEntry.(*CacheEntry)
if c.expired(entry) {
c.Delete(key.(string))
}
return true
})
}
// expired checks if an entry is expired
func (c *Cache) expired(e *CacheEntry) bool {
return e.expiration.Before(time.Now())
}

105
pkg/sync/cache_test.go Normal file
View File

@@ -0,0 +1,105 @@
package sync
import (
"github.com/stretchr/testify/assert"
"strconv"
"sync"
"testing"
"time"
)
func cacheRunner(size int) (*Cache, func(f func(v string))) {
c := NewCache(size)
run := func(f func(v string)) {
wg := sync.WaitGroup{}
for i := 0; i < size; i++ {
wg.Add(1)
go func(v string) {
f(v)
wg.Done()
}(strconv.Itoa(i))
}
wg.Wait()
}
return &c, run
}
func BenchmarkCache(b *testing.B) {
b.ReportAllocs()
size := 1024
c, cr := cacheRunner(size)
cr(func(v string) { c.Store(v, v, time.Now().Add(100*time.Millisecond)) })
cr(func(v string) { c.Delete(v) })
}
func TestCache(t *testing.T) {
size := 1024
c, cr := cacheRunner(size)
cr(func(v string) { c.Store(v, v, time.Now().Add(100*time.Millisecond)) })
assert.Equal(t, size, int(c.length), "length is atomic")
cr(func(v string) { c.Delete(v) })
assert.Equal(t, 0, int(c.length), "delete is atomic")
cr(func(v string) {
time.Sleep(101 * time.Millisecond)
c.evict()
})
assert.Equal(t, 0, int(c.length), "evict is atomic")
}
func TestCache_Load(t *testing.T) {
size := 1024
c, cr := cacheRunner(size)
cr(func(v string) {
c.Store(v, v, time.Now().Add(10*time.Second))
})
cr(func(v string) {
assert.Equal(t, v, c.Load(v).V, "entry value is the same")
})
cr(func(v string) {
assert.Nil(t, c.Load(v+strconv.Itoa(size)), "entry is nil if unknown")
})
cr(func(v string) {
wait := 100 * time.Millisecond
c.Store(v, v, time.Now().Add(wait))
time.Sleep(wait + 1)
assert.Nil(t, c.Load(v), "entry is nil if it's expired")
})
}
func TestCache_Store(t *testing.T) {
c, cr := cacheRunner(1024)
cr(func(v string) {
c.Store(v, v, time.Now().Add(100*time.Millisecond))
assert.Equal(t, v, c.Load(v).V, "new entries can be added")
})
cr(func(v string) {
replacedExpiration := time.Now().Add(10 * time.Minute)
c.Store(v, "old", time.Now().Add(10*time.Minute))
c.Store(v, "updated", replacedExpiration)
assert.Equal(t, "updated", c.Load(v).V, "entry values can be updated")
assert.Equal(t, replacedExpiration, c.Load(v).expiration, "entry expiration can be updated")
})
}
func TestCache_Delete(t *testing.T) {
c, cr := cacheRunner(1024)
cr(func(v string) {
c.Store(v, v, time.Now().Add(100*time.Millisecond))
c.Delete(v)
assert.Nil(t, c.Load(v), "entries can be deleted")
})
assert.Equal(t, 0, int(c.length), "removing a entry decreases the cache size")
}

49
pkg/sync/mutex.go Normal file
View File

@@ -0,0 +1,49 @@
package sync
import (
"sync"
)
// NamedRWMutex works the same as RWMutex, the only difference is that it stores mutexes in a map and reuses them.
// It's handy if you want to write-lock, write-unlock, read-lock and read-unlock for specific names only.
type NamedRWMutex struct {
pool sync.Pool
mus sync.Map
}
// NewNamedRWMutex returns a new instance of NamedRWMutex.
func NewNamedRWMutex() NamedRWMutex {
return NamedRWMutex{pool: sync.Pool{New: func() interface{} {
return new(sync.RWMutex)
}}}
}
// Lock locks rw for writing.
func (m *NamedRWMutex) Lock(name string) {
m.loadOrStore(name).Lock()
}
// Unlock unlocks rw for writing.
func (m *NamedRWMutex) Unlock(name string) {
m.loadOrStore(name).Unlock()
}
// RLock locks rw for reading.
func (m *NamedRWMutex) RLock(name string) {
m.loadOrStore(name).RLock()
}
// RUnlock undoes a single RLock call.
func (m *NamedRWMutex) RUnlock(name string) {
m.loadOrStore(name).RUnlock()
}
func (m *NamedRWMutex) loadOrStore(name string) *sync.RWMutex {
pmu := m.pool.Get()
mmu, loaded := m.mus.LoadOrStore(name, pmu)
if loaded {
m.pool.Put(pmu)
}
return mmu.(*sync.RWMutex)
}

33
pkg/sync/mutex_test.go Normal file
View File

@@ -0,0 +1,33 @@
package sync
import (
"fmt"
"runtime"
"testing"
)
func HammerMutex(m *NamedRWMutex, loops int, c chan bool) {
for i := 0; i < loops; i++ {
id := fmt.Sprintf("%v", i)
m.Lock(id)
m.Unlock(id)
}
c <- true
}
func TestNamedRWMutex(t *testing.T) {
if n := runtime.SetMutexProfileFraction(1); n != 0 {
t.Logf("got mutexrate %d expected 0", n)
}
defer runtime.SetMutexProfileFraction(0)
m := NewNamedRWMutex()
c := make(chan bool)
r := 10
for i := 0; i < r; i++ {
go HammerMutex(&m, 2000, c)
}
for i := 0; i < r; i++ {
<-c
}
}

8
pkg/sync/sync.go Normal file
View File

@@ -0,0 +1,8 @@
package sync
import "sync"
var (
// ParsingViperConfig addresses the fact that config parsing using Viper is not thread safe.
ParsingViperConfig sync.Mutex
)