Update module github.com/maypok86/otter/v2 to v2.1.0 (#180)

Co-authored-by: Renovate Bot <renovate@whitesourcesoftware.com>
This commit is contained in:
Taras
2025-06-30 13:42:15 +00:00
committed by GitHub
parent d6440d7d8b
commit 21ceaaf180
26 changed files with 1497 additions and 573 deletions

2
go.mod
View File

@@ -14,7 +14,7 @@ require (
github.com/joho/godotenv v1.5.1
github.com/jpillora/backoff v1.0.0
github.com/justinas/alice v1.2.0
github.com/maypok86/otter/v2 v2.0.0
github.com/maypok86/otter/v2 v2.1.0
github.com/prometheus/client_golang v1.22.0
github.com/realclientip/realclientip-go v1.0.0
github.com/rs/cors v1.11.1

2
go.sum
View File

@@ -126,6 +126,8 @@ github.com/maypok86/otter/v2 v2.0.0-20250616223213-73ec080fc790 h1:omxwgqzRsRWJp
github.com/maypok86/otter/v2 v2.0.0-20250616223213-73ec080fc790/go.mod h1:jX2xEKz9PrNVbDqnk8JUuOt5kURK8h7jd1kDYI5QsZk=
github.com/maypok86/otter/v2 v2.0.0 h1:t+A6nUOQJZTRAQpDGltBKZBcRDjFfLBPTxEZGFfh48A=
github.com/maypok86/otter/v2 v2.0.0/go.mod h1:jX2xEKz9PrNVbDqnk8JUuOt5kURK8h7jd1kDYI5QsZk=
github.com/maypok86/otter/v2 v2.1.0 h1:H+FO9NtLuSWYUlIUQ/kT6VNEpWSIF4w4GZJRDhxYb7k=
github.com/maypok86/otter/v2 v2.1.0/go.mod h1:jX2xEKz9PrNVbDqnk8JUuOt5kURK8h7jd1kDYI5QsZk=
github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0=
github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo=
github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0=

View File

@@ -1,3 +1,24 @@
## 2.1.0 - 2025-06-29
### ✨Features
- Added `Compute`, `ComputeIfAbsent` and `ComputeIfPresent` methods
- Added `LoadCacheFrom`, `LoadCacheFromFile`, `SaveCacheTo` and `SaveCacheToFile` functions
- Added `Clock` interface and option for time mocking
- Added `Keys` and `Values` iterators
- Added `Hottest` and `Coldest` iterators
### 🚀 Improvements
- Slightly reduced memory consumption
- Cache became significantly faster in cases when it's lightly populated
- Reduced number of allocations during refresh
### 🐞 Bug Fixes
- Fixed a bug in timer wheel ([#64](https://github.com/Yiling-J/theine-go/issues/64))
- Added usage of `context.WithoutCancel` during refresh execution ([#124](https://github.com/maypok86/otter/issues/124))
## 2.0.0 - 2025-06-18
### 📝 Description

View File

@@ -4,7 +4,7 @@
</p>
<p align="center">
<a href="https://pkg.go.dev/github.com/maypok86/otter"><img src="https://pkg.go.dev/badge/github.com/maypok86/otter.svg" alt="Go Reference"></a>
<a href="https://pkg.go.dev/github.com/maypok86/otter/v2"><img src="https://pkg.go.dev/badge/github.com/maypok86/otter/v2.svg" alt="Go Reference"></a>
<img src="https://github.com/maypok86/otter/actions/workflows/test.yml/badge.svg" />
<a href="https://github.com/maypok86/otter/actions?query=branch%3Amain+workflow%3ATest" >
<img src="https://gist.githubusercontent.com/maypok86/2aae2cd39836dc7c258df7ffec602d1c/raw/coverage.svg"/></a>
@@ -34,7 +34,7 @@ Otter is designed to provide an excellent developer experience while maintaining
Performance-wise, Otter provides:
- [High hit rates](https://maypok86.github.io/otter/performance/hit-ratio/) across all workload types via adaptive W-TinyLFU
- [High hit rates](https://maypok86.github.io/otter/performance/hit-ratio/) across all workload types via [adaptive W-TinyLFU](https://dl.acm.org/citation.cfm?id=3274816)
- [Excellent throughput](https://maypok86.github.io/otter/performance/throughput/) under high contention on most workload types
- Among the lowest [memory overheads](https://maypok86.github.io/otter/performance/memory-consumption/) across all cache capacities
- Automatic data structures configuration based on contention/parallelism and workload patterns
@@ -42,10 +42,12 @@ Performance-wise, Otter provides:
Otter also provides a highly configurable caching API, enabling any combination of these optional features:
- Size-based [eviction](https://maypok86.github.io/otter/user-guide/v2/features/eviction/#size-based) when a maximum is exceeded
- Time-based [expiration](https://maypok86.github.io/otter/user-guide/v2/features/eviction/#time-based) of entries (using [Hierarchical Timing Wheel](http://www.cs.columbia.edu/~nahum/w6998/papers/ton97-timing-wheels.pdf)), measured since last access or last write
- Time-based [expiration](https://maypok86.github.io/otter/user-guide/v2/features/eviction/#time-based) of entries, measured since last access or last write
- [Automatic loading](https://maypok86.github.io/otter/user-guide/v2/features/loading/) of entries into the cache
- [Asynchronously refresh](https://maypok86.github.io/otter/user-guide/v2/features/refresh/) when the first stale request for an entry occurs
- [Writes propagated](https://maypok86.github.io/otter/user-guide/v2/features/compute/) to an external resource
- Accumulation of cache access [statistics](https://maypok86.github.io/otter/user-guide/v2/features/statistics/)
- [Saving cache](https://maypok86.github.io/otter/user-guide/v2/features/persistence/) to a file and loading cache from a file
## 📚 Usage <a id="usage" />
@@ -125,16 +127,13 @@ func main() {
// Phase 2: Test cache stampede protection
// --------------------------------------
loader := otter.LoaderFunc[string, string](func(ctx context.Context, key string) (string, error) {
if key != "key" {
panic("incorrect key") // Validate key
}
loader := func(ctx context.Context, key string) (string, error) {
time.Sleep(200 * time.Millisecond) // Simulate slow load
return "value1", nil // Return new value
})
}
// Concurrent Gets would deduplicate loader calls
value, err := cache.Get(ctx, "key", loader)
value, err := cache.Get(ctx, "key", otter.LoaderFunc[string, string](loader))
if err != nil {
panic(err)
}
@@ -148,15 +147,12 @@ func main() {
// New loader that returns updated value
loader = func(ctx context.Context, key string) (string, error) {
if key != "key" {
panic("incorrect key")
}
time.Sleep(100 * time.Millisecond) // Simulate refresh
return "value2", nil // Return refreshed value
}
// This triggers async refresh but returns current value
value, err = cache.Get(ctx, "key", loader)
value, err = cache.Get(ctx, "key", otter.LoaderFunc[string, string](loader))
if err != nil {
panic(err)
}
@@ -221,7 +217,7 @@ Otter is based on the following papers:
- [TinyLFU: A Highly Efficient Cache Admission Policy](https://dl.acm.org/citation.cfm?id=3149371)
- [Adaptive Software Cache Management](https://dl.acm.org/citation.cfm?id=3274816)
- [Denial of Service via Algorithmic Complexity Attack](https://www.usenix.org/legacy/events/sec03/tech/full_papers/crosby/crosby.pdf)
- [Hashed and Hierarchical Timing Wheels](http://www.cs.columbia.edu/~nahum/w6998/papers/ton97-timing-wheels.pdf)
- [Hashed and Hierarchical Timing Wheels](https://ieeexplore.ieee.org/document/650142)
- [A large scale analysis of hundreds of in-memory cache clusters at Twitter](https://www.usenix.org/system/files/osdi20-yang.pdf)
## 👏 Contribute <a id="contribute" />

View File

@@ -21,6 +21,37 @@ import (
"time"
)
// ComputeOp tells the Compute methods what to do.
type ComputeOp int
const (
// CancelOp signals to Compute to not do anything as a result
// of executing the lambda. If the entry was not present in
// the map, nothing happens, and if it was present, the
// returned value is ignored.
CancelOp ComputeOp = iota
// WriteOp signals to Compute to update the entry to the
// value returned by the lambda, creating it if necessary.
WriteOp
// InvalidateOp signals to Compute to always discard the entry
// from the cache.
InvalidateOp
)
var computeOpStrings = []string{
"CancelOp",
"WriteOp",
"InvalidateOp",
}
// String implements [fmt.Stringer] interface.
func (co ComputeOp) String() string {
if co >= 0 && int(co) < len(computeOpStrings) {
return computeOpStrings[co]
}
return "<unknown otter.ComputeOp>"
}
// Cache is an in-memory cache implementation that supports full concurrency of retrievals and multiple ways to bound the cache.
type Cache[K comparable, V any] struct {
cache *cache[K, V]
@@ -100,6 +131,86 @@ func (c *Cache[K, V]) SetIfAbsent(key K, value V) (V, bool) {
return c.cache.SetIfAbsent(key, value)
}
// Compute either sets the computed new value for the key,
// invalidates the value for the key, or does nothing, based on
// the returned [ComputeOp]. When the op returned by remappingFunc
// is [WriteOp], the value is updated to the new value. If
// it is [InvalidateOp], the entry is removed from the cache
// altogether. And finally, if the op is [CancelOp] then the
// entry is left as-is. In other words, if it did not already
// exist, it is not created, and if it did exist, it is not
// updated. This is useful to synchronously execute some
// operation on the value without incurring the cost of
// updating the cache every time.
//
// The ok result indicates whether the entry is present in the cache after the compute operation.
// The actualValue result contains the value of the cache
// if a corresponding entry is present, or the zero value
// otherwise. You can think of these results as equivalent to regular key-value lookups in a map.
//
// This call locks a hash table bucket while the compute function
// is executed. It means that modifications on other entries in
// the bucket will be blocked until the remappingFunc executes. Consider
// this when the function includes long-running operations.
func (c *Cache[K, V]) Compute(
key K,
remappingFunc func(oldValue V, found bool) (newValue V, op ComputeOp),
) (actualValue V, ok bool) {
return c.cache.Compute(key, remappingFunc)
}
// ComputeIfAbsent returns the existing value for the key if
// present. Otherwise, it tries to compute the value using the
// provided function. If mappingFunc returns true as the cancel value, the computation is cancelled and the zero value
// for type V is returned.
//
// The ok result indicates whether the entry is present in the cache after the compute operation.
// The actualValue result contains the value of the cache
// if a corresponding entry is present, or the zero value
// otherwise. You can think of these results as equivalent to regular key-value lookups in a map.
//
// This call locks a hash table bucket while the compute function
// is executed. It means that modifications on other entries in
// the bucket will be blocked until the valueFn executes. Consider
// this when the function includes long-running operations.
func (c *Cache[K, V]) ComputeIfAbsent(
key K,
mappingFunc func() (newValue V, cancel bool),
) (actualValue V, ok bool) {
return c.cache.ComputeIfAbsent(key, mappingFunc)
}
// ComputeIfPresent returns the zero value for type V if the key is not found.
// Otherwise, it tries to compute the value using the provided function.
//
// ComputeIfPresent either sets the computed new value for the key,
// invalidates the value for the key, or does nothing, based on
// the returned [ComputeOp]. When the op returned by remappingFunc
// is [WriteOp], the value is updated to the new value. If
// it is [InvalidateOp], the entry is removed from the cache
// altogether. And finally, if the op is [CancelOp] then the
// entry is left as-is. In other words, if it did not already
// exist, it is not created, and if it did exist, it is not
// updated. This is useful to synchronously execute some
// operation on the value without incurring the cost of
// updating the cache every time.
//
// The ok result indicates whether the entry is present in the cache after the compute operation.
// The actualValue result contains the value of the cache
// if a corresponding entry is present, or the zero value
// otherwise. You can think of these results as equivalent to regular key-value lookups in a map.
//
// This call locks a hash table bucket while the compute function
// is executed. It means that modifications on other entries in
// the bucket will be blocked until the valueFn executes. Consider
// this when the function includes long-running operations.
func (c *Cache[K, V]) ComputeIfPresent(
key K,
remappingFunc func(oldValue V) (newValue V, op ComputeOp),
) (actualValue V, ok bool) {
return c.cache.ComputeIfPresent(key, remappingFunc)
}
// SetExpiresAfter specifies that the entry should be automatically removed from the cache once the duration has
// elapsed. The expiration policy determines when the entry's age is reset.
func (c *Cache[K, V]) SetExpiresAfter(key K, expiresAfter time.Duration) {
@@ -124,6 +235,10 @@ func (c *Cache[K, V]) SetRefreshableAfter(key K, refreshableAfter time.Duration)
//
// No observable state associated with this cache is modified until loading completes.
//
// WARNING: When performing a refresh (see [RefreshCalculator]),
// the [Loader] will receive a context wrapped in [context.WithoutCancel].
// If you need to control refresh cancellation, you can use closures or values stored in the context.
//
// WARNING: [Loader] must not attempt to update any mappings of this cache directly.
//
// WARNING: For any given key, every loader used with it should compute the same value.
@@ -146,6 +261,10 @@ func (c *Cache[K, V]) Get(ctx context.Context, key K, loader Loader[K, V]) (V, e
//
// NOTE: duplicate elements in keys will be ignored.
//
// WARNING: When performing a refresh (see [RefreshCalculator]),
// the [BulkLoader] will receive a context wrapped in [context.WithoutCancel].
// If you need to control refresh cancellation, you can use closures or values stored in the context.
//
// WARNING: [BulkLoader] must not attempt to update any mappings of this cache directly.
//
// WARNING: For any given key, every bulkLoader used with it should compute the same value.
@@ -170,6 +289,10 @@ func (c *Cache[K, V]) BulkGet(ctx context.Context, keys []K, bulkLoader BulkLoad
//
// Refresh returns a channel that will receive the result when it is ready. The returned channel will not be closed.
//
// WARNING: When performing a refresh (see [RefreshCalculator]),
// the [Loader] will receive a context wrapped in [context.WithoutCancel].
// If you need to control refresh cancellation, you can use closures or values stored in the context.
//
// WARNING: If the cache was constructed without [RefreshCalculator], then Refresh will return the nil channel.
//
// WARNING: Loader.Load and Loader.Reload must not attempt to update any mappings of this cache directly.
@@ -197,6 +320,10 @@ func (c *Cache[K, V]) Refresh(ctx context.Context, key K, loader Loader[K, V]) <
//
// NOTE: duplicate elements in keys will be ignored.
//
// WARNING: When performing a refresh (see [RefreshCalculator]),
// the [BulkLoader] will receive a context wrapped in [context.WithoutCancel].
// If you need to control refresh cancellation, you can use closures or values stored in the context.
//
// WARNING: If the cache was constructed without [RefreshCalculator], then BulkRefresh will return the nil channel.
//
// WARNING: BulkLoader.BulkLoad and BulkLoader.BulkReload must not attempt to update any mappings of this cache directly.
@@ -228,6 +355,26 @@ func (c *Cache[K, V]) All() iter.Seq2[K, V] {
return c.cache.All()
}
// Keys returns an iterator over all keys in the cache.
// The iteration order is not specified and is not guaranteed to be the same from one call to the next.
//
// Iterator is at least weakly consistent: he is safe for concurrent use,
// but if the cache is modified (including by eviction) after the iterator is
// created, it is undefined which of the changes (if any) will be reflected in that iterator.
func (c *Cache[K, V]) Keys() iter.Seq[K] {
return c.cache.Keys()
}
// Values returns an iterator over all values in the cache.
// The iteration order is not specified and is not guaranteed to be the same from one call to the next.
//
// Iterator is at least weakly consistent: he is safe for concurrent use,
// but if the cache is modified (including by eviction) after the iterator is
// created, it is undefined which of the changes (if any) will be reflected in that iterator.
func (c *Cache[K, V]) Values() iter.Seq[V] {
return c.cache.Values()
}
// InvalidateAll discards all entries in the cache. The behavior of this operation is undefined for an entry
// that is being loaded (or reloaded) and is otherwise not present.
func (c *Cache[K, V]) InvalidateAll() {
@@ -249,7 +396,7 @@ func (c *Cache[K, V]) SetMaximum(maximum uint64) {
}
// GetMaximum returns the maximum total weighted or unweighted size of this cache, depending on how the
// cache was constructed.
// cache was constructed. If this cache does not use a (weighted) size bound, then the method will return math.MaxUint64.
func (c *Cache[K, V]) GetMaximum() uint64 {
return c.cache.GetMaximum()
}
@@ -268,6 +415,30 @@ func (c *Cache[K, V]) WeightedSize() uint64 {
return c.cache.WeightedSize()
}
// Hottest returns an iterator for ordered traversal of the cache entries. The order of
// iteration is from the entries most likely to be retained (hottest) to the entries least
// likely to be retained (coldest). This order is determined by the eviction policy's best guess
// at the start of the iteration.
//
// WARNING: Beware that this iteration is performed within the eviction policy's exclusive lock, so the
// iteration should be short and simple. While the iteration is in progress further eviction
// maintenance will be halted.
func (c *Cache[K, V]) Hottest() iter.Seq[Entry[K, V]] {
return c.cache.Hottest()
}
// Coldest returns an iterator for ordered traversal of the cache entries. The order of
// iteration is from the entries least likely to be retained (coldest) to the entries most
// likely to be retained (hottest). This order is determined by the eviction policy's best guess
// at the start of the iteration.
//
// WARNING: Beware that this iteration is performed within the eviction policy's exclusive lock, so the
// iteration should be short and simple. While the iteration is in progress further eviction
// maintenance will be halted.
func (c *Cache[K, V]) Coldest() iter.Seq[Entry[K, V]] {
return c.cache.Coldest()
}
func (c *Cache[K, V]) has(key K) bool {
return c.cache.has(key)
}

File diff suppressed because it is too large Load Diff

232
vendor/github.com/maypok86/otter/v2/clock.go generated vendored Normal file
View File

@@ -0,0 +1,232 @@
// Copyright (c) 2025 Alexey Mayshev and contributors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otter
import (
"sync"
"sync/atomic"
"time"
"github.com/maypok86/otter/v2/internal/xmath"
)
// Clock is a time source that
// - Returns a time value representing the number of nanoseconds elapsed since some
// fixed but arbitrary point in time
// - Returns a channel that delivers “ticks” of a clock at intervals.
type Clock interface {
// NowNano returns the number of nanoseconds elapsed since this clock's fixed point of reference.
//
// By default, time.Now().UnixNano() is used.
NowNano() int64
// Tick returns a channel that delivers “ticks” of a clock at intervals.
//
// The cache uses this method only for proactive expiration and calls Tick(time.Second) in a separate goroutine.
//
// By default, [time.Tick] is used.
Tick(duration time.Duration) <-chan time.Time
}
type timeSource interface {
Clock
Init()
Sleep(duration time.Duration)
ProcessTick()
}
func newTimeSource(clock Clock) timeSource {
if clock == nil {
return &realSource{}
}
if r, ok := clock.(*realSource); ok {
return r
}
if f, ok := clock.(*fakeSource); ok {
return f
}
return newCustomSource(clock)
}
type customSource struct {
clock Clock
isInitialized atomic.Bool
}
func newCustomSource(clock Clock) *customSource {
return &customSource{
clock: clock,
}
}
func (cs *customSource) Init() {
if !cs.isInitialized.Load() {
cs.isInitialized.Store(true)
}
}
func (cs *customSource) NowNano() int64 {
if !cs.isInitialized.Load() {
return 0
}
return cs.clock.NowNano()
}
func (cs *customSource) Tick(duration time.Duration) <-chan time.Time {
return cs.clock.Tick(duration)
}
func (cs *customSource) Sleep(duration time.Duration) {
time.Sleep(duration)
}
func (cs *customSource) ProcessTick() {}
type realSource struct {
initMutex sync.Mutex
isInitialized atomic.Bool
start time.Time
startNanos atomic.Int64
}
func (c *realSource) Init() {
if !c.isInitialized.Load() {
c.initMutex.Lock()
if !c.isInitialized.Load() {
now := time.Now()
c.start = now
c.startNanos.Store(now.UnixNano())
c.isInitialized.Store(true)
}
c.initMutex.Unlock()
}
}
func (c *realSource) NowNano() int64 {
if !c.isInitialized.Load() {
return 0
}
return xmath.SaturatedAdd(c.startNanos.Load(), time.Since(c.start).Nanoseconds())
}
func (c *realSource) Tick(duration time.Duration) <-chan time.Time {
return time.Tick(duration)
}
func (c *realSource) Sleep(duration time.Duration) {
time.Sleep(duration)
}
func (c *realSource) ProcessTick() {}
type fakeSource struct {
mutex sync.Mutex
now time.Time
initOnce sync.Once
sleeps chan time.Duration
tickWg sync.WaitGroup
sleepWg sync.WaitGroup
firstSleep atomic.Bool
withTick atomic.Bool
ticker chan time.Time
enableTickOnce sync.Once
enableTick chan time.Duration
}
func (f *fakeSource) Init() {
f.initOnce.Do(func() {
f.mutex.Lock()
now := time.Now()
f.now = now
f.sleeps = make(chan time.Duration)
f.firstSleep.Store(true)
f.enableTick = make(chan time.Duration)
f.ticker = make(chan time.Time, 1)
f.mutex.Unlock()
go func() {
var (
dur time.Duration
d time.Duration
)
enabled := false
last := now
for {
select {
case d = <-f.enableTick:
enabled = true
for d <= dur {
if f.firstSleep.Load() {
f.tickWg.Add(1)
f.ticker <- last
f.tickWg.Wait()
f.firstSleep.Store(false)
}
last = last.Add(d)
f.tickWg.Add(1)
f.ticker <- last
dur -= d
}
case s := <-f.sleeps:
if enabled && f.firstSleep.Load() {
f.tickWg.Add(1)
f.ticker <- last
f.tickWg.Wait()
f.firstSleep.Store(false)
}
f.mutex.Lock()
f.now = f.now.Add(s)
f.mutex.Unlock()
dur += s
if enabled {
for d <= dur {
last = last.Add(d)
f.tickWg.Add(1)
f.ticker <- last
dur -= d
}
}
f.sleepWg.Done()
}
}
}()
})
}
func (f *fakeSource) NowNano() int64 {
return f.getNow().UnixNano()
}
func (f *fakeSource) Tick(d time.Duration) <-chan time.Time {
f.enableTickOnce.Do(func() {
f.enableTick <- d
})
return f.ticker
}
func (f *fakeSource) Sleep(d time.Duration) {
f.sleepWg.Add(1)
f.sleeps <- d
f.sleepWg.Wait()
}
func (f *fakeSource) getNow() time.Time {
f.mutex.Lock()
defer f.mutex.Unlock()
return f.now
}
func (f *fakeSource) ProcessTick() {
f.tickWg.Done()
}

View File

@@ -1,57 +0,0 @@
// Copyright (c) 2024 Alexey Mayshev and contributors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package clock
import (
"math"
"sync"
"sync/atomic"
"time"
)
type Real struct {
initMutex sync.Mutex
isInitialized atomic.Bool
start time.Time
startNanos atomic.Int64
}
func (c *Real) Init() {
if !c.isInitialized.Load() {
c.initMutex.Lock()
if !c.isInitialized.Load() {
now := time.Now()
c.start = now
c.startNanos.Store(now.UnixNano())
c.isInitialized.Store(true)
}
c.initMutex.Unlock()
}
}
func (c *Real) Offset() int64 {
if !c.isInitialized.Load() {
return 0
}
return saturatedAdd(c.startNanos.Load(), time.Since(c.start).Nanoseconds())
}
func saturatedAdd(a, b int64) int64 {
s := a + b
if s < a || s < b {
return math.MaxInt64
}
return s
}

View File

@@ -15,6 +15,8 @@
package deque
import (
"iter"
"github.com/maypok86/otter/v2/internal/generated/node"
)
@@ -172,6 +174,30 @@ func (d *Linked[K, V]) Tail() node.Node[K, V] {
return d.tail
}
func (d *Linked[K, V]) All() iter.Seq[node.Node[K, V]] {
return func(yield func(node.Node[K, V]) bool) {
cursor := d.head
for !node.Equals(cursor, nil) {
if !yield(cursor) {
return
}
cursor = d.getNext(cursor)
}
}
}
func (d *Linked[K, V]) Backward() iter.Seq[node.Node[K, V]] {
return func(yield func(node.Node[K, V]) bool) {
cursor := d.tail
for !node.Equals(cursor, nil) {
if !yield(cursor) {
return
}
cursor = d.getPrev(cursor)
}
}
}
func (d *Linked[K, V]) setPrev(to, n node.Node[K, V]) {
if d.isExp {
to.SetPrevExp(n)

View File

@@ -54,7 +54,7 @@ func NewVariable[K comparable, V any](nodeManager *node.Manager[K, V]) *Variable
for j := 0; j < len(wheel[i]); j++ {
var k K
var v V
fn := nodeManager.Create(k, v, math.MaxUint32, math.MaxUint32, 1)
fn := nodeManager.Create(k, v, math.MaxInt64, math.MaxInt64, 1)
fn.SetPrevExp(fn)
fn.SetNextExp(fn)
wheel[i][j] = fn
@@ -94,7 +94,6 @@ func (v *Variable[K, V]) Delete(n node.Node[K, V]) {
}
func (v *Variable[K, V]) DeleteExpired(nowNanos int64, expireNode func(n node.Node[K, V], nowNanos int64)) {
//nolint:gosec // there is no overflow
currentTime := uint64(nowNanos)
prevTime := v.time
v.time = currentTime
@@ -117,10 +116,7 @@ func (v *Variable[K, V]) deleteExpiredFromBucket(
expireNode func(n node.Node[K, V], nowNanos int64),
) {
mask := buckets[index] - 1
steps := buckets[index]
if delta < steps {
steps = delta
}
steps := min(delta+1, buckets[index])
start := prevTicks & mask
end := start + steps
timerWheel := v.wheel[index]
@@ -135,8 +131,7 @@ func (v *Variable[K, V]) deleteExpiredFromBucket(
n.SetPrevExp(nil)
n.SetNextExp(nil)
//nolint:gosec // there is no overflow
if uint64(n.ExpiresAt()) <= v.time {
if uint64(n.ExpiresAt()) < v.time {
expireNode(n, int64(v.time))
} else {
v.Add(n)
@@ -147,25 +142,6 @@ func (v *Variable[K, V]) deleteExpiredFromBucket(
}
}
/*
func (v *Variable[K, V]) Clear() {
for i := 0; i < len(v.wheel); i++ {
for j := 0; j < len(v.wheel[i]); j++ {
root := v.wheel[i][j]
n := root.NextExp()
// NOTE(maypok86): Maybe we should use the same approach as in DeleteExpired?
for !node.Equals(n, root) {
next := n.NextExp()
v.Delete(n)
n = next
}
}
}
}
*/
// link adds the entry at the tail of the bucket's list.
func link[K comparable, V any](root, n node.Node[K, V]) {
n.SetPrevExp(root.PrevExp())

View File

@@ -28,7 +28,6 @@ type BERW[K comparable, V any] struct {
refreshableAt atomic.Int64
weight uint32
state atomic.Uint32
frequency uint8
queueType uint8
}

View File

@@ -25,7 +25,6 @@ type BEW[K comparable, V any] struct {
expiresAt atomic.Int64
weight uint32
state atomic.Uint32
frequency uint8
queueType uint8
}

View File

@@ -23,7 +23,6 @@ type BRW[K comparable, V any] struct {
refreshableAt atomic.Int64
weight uint32
state atomic.Uint32
frequency uint8
queueType uint8
}

View File

@@ -19,7 +19,6 @@ type BS[K comparable, V any] struct {
prev *BS[K, V]
next *BS[K, V]
state atomic.Uint32
frequency uint8
queueType uint8
}

View File

@@ -24,7 +24,6 @@ type BSE[K comparable, V any] struct {
nextExp *BSE[K, V]
expiresAt atomic.Int64
state atomic.Uint32
frequency uint8
queueType uint8
}

View File

@@ -27,7 +27,6 @@ type BSER[K comparable, V any] struct {
expiresAt atomic.Int64
refreshableAt atomic.Int64
state atomic.Uint32
frequency uint8
queueType uint8
}

View File

@@ -22,7 +22,6 @@ type BSR[K comparable, V any] struct {
next *BSR[K, V]
refreshableAt atomic.Int64
state atomic.Uint32
frequency uint8
queueType uint8
}

View File

@@ -20,7 +20,6 @@ type BW[K comparable, V any] struct {
next *BW[K, V]
weight uint32
state atomic.Uint32
frequency uint8
queueType uint8
}

View File

@@ -0,0 +1,63 @@
// Copyright (c) 2025 Alexey Mayshev and contributors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package xiter
import "iter"
// Concat returns an iterator over the concatenation of the sequences.
func Concat[V any](seqs ...iter.Seq[V]) iter.Seq[V] {
return func(yield func(V) bool) {
for _, seq := range seqs {
for e := range seq {
if !yield(e) {
return
}
}
}
}
}
// MergeFunc merges two sequences of values ordered by the function f.
// Values appear in the output once for each time they appear in x
// and once for each time they appear in y.
// When equal values appear in both sequences,
// the output contains the values from x before the values from y.
// If the two input sequences are not ordered by f,
// the output sequence will not be ordered by f,
// but it will still contain every value from x and y exactly once.
func MergeFunc[V any](x, y iter.Seq[V], f func(V, V) int) iter.Seq[V] {
return func(yield func(V) bool) {
next, stop := iter.Pull(y)
defer stop()
v2, ok2 := next()
for v1 := range x {
for ok2 && f(v1, v2) > 0 {
if !yield(v2) {
return
}
v2, ok2 = next()
}
if !yield(v1) {
return
}
}
for ok2 {
if !yield(v2) {
return
}
v2, ok2 = next()
}
}
}

View File

@@ -14,6 +14,8 @@
package xmath
import "math"
func Abs(a int64) int64 {
if a < 0 {
return -a
@@ -50,3 +52,11 @@ func RoundUpPowerOf264(x uint64) uint64 {
x++
return x
}
func SaturatedAdd(a, b int64) int64 {
s := a + b
if s < a || s < b {
return math.MaxInt64
}
return s
}

View File

@@ -73,7 +73,7 @@ extra:
- icon: fontawesome/brands/github
link: https://github.com/maypok86
- icon: fontawesome/brands/golang
link: https://pkg.go.dev/github.com/maypok86/otter/
link: https://pkg.go.dev/github.com/maypok86/otter/v2
# Extensions
markdown_extensions:
@@ -138,7 +138,9 @@ nav:
- Loading: user-guide/v2/features/loading.md
- Refresh: user-guide/v2/features/refresh.md
- Bulk operations: user-guide/v2/features/bulk.md
- Compute: user-guide/v2/features/compute.md
- Statistics: user-guide/v2/features/statistics.md
- Persistence: user-guide/v2/features/persistence.md
- Extension: user-guide/v2/features/extension.md
- Iteration: user-guide/v2/features/iteration.md
- v1 manual:

View File

@@ -137,6 +137,14 @@ type Options[K comparable, V any] struct {
// Beware that configuring a cache with an executor that discards tasks or never runs them may
// experience non-deterministic behavior.
Executor func(fn func())
// Clock specifies a nanosecond-precision time source for use in determining when entries should be
// expired or refreshed. By default, time.Now().UnixNano() is used.
//
// The primary intent of this option is to facilitate testing of caches which have been configured
// with ExpiryCalculator or RefreshCalculator.
//
// NOTE: this clock is not used when recording statistics.
Clock Clock
// Logger specifies the Logger implementation that will be used for logging warning and errors.
//
// The cache will use slog.Default() by default.

155
vendor/github.com/maypok86/otter/v2/persistence.go generated vendored Normal file
View File

@@ -0,0 +1,155 @@
// Copyright (c) 2025 Alexey Mayshev and contributors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otter
import (
"encoding/gob"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"time"
)
// LoadCacheFromFile loads cache data from the given filePath.
//
// See SaveCacheToFile for saving cache data to file.
func LoadCacheFromFile[K comparable, V any](c *Cache[K, V], filePath string) error {
file, err := os.Open(filePath)
if err != nil {
return fmt.Errorf("otter: open file %s: %w", filePath, err)
}
//nolint:errcheck // it's ok
defer file.Close()
return LoadCacheFrom(c, file)
}
// LoadCacheFrom loads cache data from the given [io.Reader].
//
// See SaveCacheToFile for saving cache data to file.
func LoadCacheFrom[K comparable, V any](c *Cache[K, V], r io.Reader) error {
dec := gob.NewDecoder(r)
var savedMaximum uint64
if err := dec.Decode(&savedMaximum); err != nil {
return fmt.Errorf("otter: decode maximum: %w", err)
}
maximum := min(savedMaximum, c.GetMaximum())
maximum2 := maximum / 4
maximum1 := 2 * maximum2
size := uint64(0)
for size < maximum {
var entry Entry[K, V]
if err := dec.Decode(&entry); err != nil {
if errors.Is(err, io.EOF) {
break
}
return fmt.Errorf("otter: decode entry: %w", err)
}
nowNano := c.cache.clock.NowNano()
if c.cache.withExpiration && entry.ExpiresAtNano < nowNano {
continue
}
c.Set(entry.Key, entry.Value)
if c.cache.withExpiration && entry.ExpiresAtNano != unreachableExpiresAt {
expiresAfter := max(1, time.Duration(entry.ExpiresAtNano-nowNano))
c.SetExpiresAfter(entry.Key, expiresAfter)
}
if c.cache.withRefresh && entry.RefreshableAtNano != unreachableRefreshableAt {
refreshableAfter := max(1, time.Duration(entry.RefreshableAtNano-nowNano))
c.SetRefreshableAfter(entry.Key, refreshableAfter)
}
size += uint64(entry.Weight)
if size <= maximum2 {
c.GetIfPresent(entry.Key)
c.GetIfPresent(entry.Key)
continue
}
if size <= maximum1 {
c.GetIfPresent(entry.Key)
continue
}
}
return nil
}
// SaveCacheToFile atomically saves cache data to the given filePath.
//
// SaveCacheToFile may be called concurrently with other operations on the cache.
//
// The saved data may be loaded with LoadCacheFromFile.
//
// WARNING: Beware that this operation is performed within the eviction policy's exclusive lock.
// While the operation is in progress further eviction maintenance will be halted.
func SaveCacheToFile[K comparable, V any](c *Cache[K, V], filePath string) error {
// Create dir if it doesn't exist.
dir := filepath.Dir(filePath)
if _, err := os.Stat(dir); err != nil {
if !os.IsNotExist(err) {
return fmt.Errorf("otter: stat %s: %w", dir, err)
}
if err := os.MkdirAll(dir, 0o755); err != nil {
return fmt.Errorf("otter: create dir %s: %w", dir, err)
}
}
file, err := os.Create(filePath)
if err != nil {
return fmt.Errorf("otter: create file %s: %w", filePath, err)
}
//nolint:errcheck // it's ok
defer file.Close()
return SaveCacheTo(c, file)
}
// SaveCacheTo atomically saves cache data to the given [io.Writer].
//
// SaveCacheToFile may be called concurrently with other operations on the cache.
//
// The saved data may be loaded with LoadCacheFrom.
//
// WARNING: Beware that this operation is performed within the eviction policy's exclusive lock.
// While the operation is in progress further eviction maintenance will be halted.
func SaveCacheTo[K comparable, V any](c *Cache[K, V], w io.Writer) error {
enc := gob.NewEncoder(w)
maximum := c.GetMaximum()
if err := enc.Encode(maximum); err != nil {
return fmt.Errorf("otter: encode maximum: %w", err)
}
size := uint64(0)
for entry := range c.Hottest() {
if size >= maximum {
break
}
if err := enc.Encode(entry); err != nil {
return fmt.Errorf("otter: encode entry: %w", err)
}
size += uint64(entry.Weight)
}
return nil
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package tinylfu
package otter
import (
"github.com/maypok86/otter/v2/internal/deque"
@@ -25,83 +25,83 @@ const (
// The initial percent of the maximum weighted capacity dedicated to the main space.
percentMain = 0.99
// PercentMainProtected is the percent of the maximum weighted capacity dedicated to the main's protected space.
PercentMainProtected = 0.80
// percentMainProtected is the percent of the maximum weighted capacity dedicated to the main's protected space.
percentMainProtected = 0.80
// The difference in hit rates that restarts the climber.
hillClimberRestartThreshold = 0.05
// The percent of the total size to adapt the window by.
hillClimberStepPercent = 0.0625
// The rate to decrease the step size to adapt by.
hillClimberStepDecayRate = 0.98
// AdmitHashdosThreshold is the minimum popularity for allowing randomized admission.
AdmitHashdosThreshold = 6
// admitHashdosThreshold is the minimum popularity for allowing randomized admission.
admitHashdosThreshold = 6
// The maximum number of entries that can be transferred between queues.
queueTransferThreshold = 1_000
)
type Policy[K comparable, V any] struct {
Sketch *Sketch[K]
Window *deque.Linked[K, V]
Probation *deque.Linked[K, V]
Protected *deque.Linked[K, V]
Maximum uint64
WeightedSize uint64
WindowMaximum uint64
WindowWeightedSize uint64
MainProtectedMaximum uint64
MainProtectedWeightedSize uint64
StepSize float64
Adjustment int64
HitsInSample uint64
MissesInSample uint64
PreviousSampleHitRate float64
IsWeighted bool
Rand func() uint32
type policy[K comparable, V any] struct {
sketch *sketch[K]
window *deque.Linked[K, V]
probation *deque.Linked[K, V]
protected *deque.Linked[K, V]
maximum uint64
weightedSize uint64
windowMaximum uint64
windowWeightedSize uint64
mainProtectedMaximum uint64
mainProtectedWeightedSize uint64
stepSize float64
adjustment int64
hitsInSample uint64
missesInSample uint64
previousSampleHitRate float64
isWeighted bool
rand func() uint32
}
func NewPolicy[K comparable, V any](isWeighted bool) *Policy[K, V] {
return &Policy[K, V]{
Sketch: newSketch[K](),
Window: deque.NewLinked[K, V](isExp),
Probation: deque.NewLinked[K, V](isExp),
Protected: deque.NewLinked[K, V](isExp),
IsWeighted: isWeighted,
Rand: xruntime.Fastrand,
func newPolicy[K comparable, V any](isWeighted bool) *policy[K, V] {
return &policy[K, V]{
sketch: newSketch[K](),
window: deque.NewLinked[K, V](isExp),
probation: deque.NewLinked[K, V](isExp),
protected: deque.NewLinked[K, V](isExp),
isWeighted: isWeighted,
rand: xruntime.Fastrand,
}
}
// Access updates the eviction policy based on node accesses.
func (p *Policy[K, V]) Access(n node.Node[K, V]) {
p.Sketch.Increment(n.Key())
// access updates the eviction policy based on node accesses.
func (p *policy[K, V]) access(n node.Node[K, V]) {
p.sketch.increment(n.Key())
switch {
case n.InWindow():
reorder(p.Window, n)
reorder(p.window, n)
case n.InMainProbation():
p.reorderProbation(n)
case n.InMainProtected():
reorder(p.Protected, n)
reorder(p.protected, n)
}
p.HitsInSample++
p.hitsInSample++
}
// Add adds node to the eviction policy.
func (p *Policy[K, V]) Add(n node.Node[K, V], evictNode func(n node.Node[K, V], nowNanos int64)) {
// add adds node to the eviction policy.
func (p *policy[K, V]) add(n node.Node[K, V], evictNode func(n node.Node[K, V], nowNanos int64)) {
nodeWeight := uint64(n.Weight())
p.WeightedSize += nodeWeight
p.WindowWeightedSize += nodeWeight
if p.WeightedSize >= p.Maximum>>1 {
p.weightedSize += nodeWeight
p.windowWeightedSize += nodeWeight
if p.weightedSize >= p.maximum>>1 {
// Lazily initialize when close to the maximum
capacity := p.Maximum
if p.IsWeighted {
capacity := p.maximum
if p.isWeighted {
//nolint:gosec // there's no overflow
capacity = uint64(p.Window.Len()) + uint64(p.Probation.Len()) + uint64(p.Protected.Len())
capacity = uint64(p.window.Len()) + uint64(p.probation.Len()) + uint64(p.protected.Len())
}
p.Sketch.EnsureCapacity(capacity)
p.sketch.ensureCapacity(capacity)
}
p.Sketch.Increment(n.Key())
p.MissesInSample++
p.sketch.increment(n.Key())
p.missesInSample++
// ignore out-of-order write operations
if !n.IsAlive() {
@@ -109,144 +109,140 @@ func (p *Policy[K, V]) Add(n node.Node[K, V], evictNode func(n node.Node[K, V],
}
switch {
case nodeWeight > p.Maximum:
case nodeWeight > p.maximum:
evictNode(n, 0)
case nodeWeight > p.WindowMaximum:
p.Window.PushFront(n)
case nodeWeight > p.windowMaximum:
p.window.PushFront(n)
default:
p.Window.PushBack(n)
p.window.PushBack(n)
}
}
func (p *Policy[K, V]) Update(n, old node.Node[K, V], evictNode func(n node.Node[K, V], nowNanos int64)) {
func (p *policy[K, V]) update(n, old node.Node[K, V], evictNode func(n node.Node[K, V], nowNanos int64)) {
nodeWeight := uint64(n.Weight())
p.updateNode(n, old)
switch {
case n.InWindow():
p.WindowWeightedSize += nodeWeight
p.windowWeightedSize += nodeWeight
switch {
case nodeWeight > p.Maximum:
case nodeWeight > p.maximum:
evictNode(n, 0)
case nodeWeight <= p.WindowMaximum:
p.Access(n)
case p.Window.Contains(n):
p.Window.MoveToFront(n)
case nodeWeight <= p.windowMaximum:
p.access(n)
case p.window.Contains(n):
p.window.MoveToFront(n)
}
case n.InMainProbation():
if nodeWeight <= p.Maximum {
p.Access(n)
if nodeWeight <= p.maximum {
p.access(n)
} else {
evictNode(n, 0)
}
case n.InMainProtected():
p.MainProtectedWeightedSize += nodeWeight
if nodeWeight <= p.Maximum {
p.Access(n)
p.mainProtectedWeightedSize += nodeWeight
if nodeWeight <= p.maximum {
p.access(n)
} else {
evictNode(n, 0)
}
}
p.WeightedSize += nodeWeight
p.weightedSize += nodeWeight
}
func (p *Policy[K, V]) updateNode(n, old node.Node[K, V]) {
func (p *policy[K, V]) updateNode(n, old node.Node[K, V]) {
n.SetQueueType(old.GetQueueType())
switch {
case n.InWindow():
p.Window.UpdateNode(n, old)
p.window.UpdateNode(n, old)
case n.InMainProbation():
p.Probation.UpdateNode(n, old)
p.probation.UpdateNode(n, old)
default:
p.Protected.UpdateNode(n, old)
p.protected.UpdateNode(n, old)
}
p.MakeDead(old)
p.makeDead(old)
}
// Delete deletes node from the eviction policy.
func (p *Policy[K, V]) Delete(n node.Node[K, V]) {
// delete deletes node from the eviction policy.
func (p *policy[K, V]) delete(n node.Node[K, V]) {
// add may not have been processed yet
switch {
case n.InWindow():
p.Window.Delete(n)
p.window.Delete(n)
case n.InMainProbation():
p.Probation.Delete(n)
p.probation.Delete(n)
default:
p.Protected.Delete(n)
p.protected.Delete(n)
}
p.MakeDead(n)
p.makeDead(n)
}
func (p *Policy[K, V]) MakeDead(n node.Node[K, V]) {
func (p *policy[K, V]) makeDead(n node.Node[K, V]) {
if !n.IsDead() {
nodeWeight := uint64(n.Weight())
if n.InWindow() {
p.WindowWeightedSize -= nodeWeight
p.windowWeightedSize -= nodeWeight
} else if n.InMainProtected() {
p.MainProtectedWeightedSize -= nodeWeight
p.mainProtectedWeightedSize -= nodeWeight
}
p.WeightedSize -= nodeWeight
p.weightedSize -= nodeWeight
n.Die()
}
}
func (p *Policy[K, V]) SetMaximumSize(maximum uint64) {
if maximum == p.Maximum {
func (p *policy[K, V]) setMaximumSize(maximum uint64) {
if maximum == p.maximum {
return
}
window := maximum - uint64(percentMain*float64(maximum))
mainProtected := uint64(PercentMainProtected * float64(maximum-window))
mainProtected := uint64(percentMainProtected * float64(maximum-window))
p.Maximum = maximum
p.WindowMaximum = window
p.MainProtectedMaximum = mainProtected
p.maximum = maximum
p.windowMaximum = window
p.mainProtectedMaximum = mainProtected
p.HitsInSample = 0
p.MissesInSample = 0
p.StepSize = -hillClimberStepPercent * float64(maximum)
p.hitsInSample = 0
p.missesInSample = 0
p.stepSize = -hillClimberStepPercent * float64(maximum)
if p.Sketch != nil && !p.IsWeighted && p.WeightedSize >= (maximum>>1) {
// Lazily initialize when close to the maximum Size
p.Sketch.EnsureCapacity(maximum)
if p.sketch != nil && !p.isWeighted && p.weightedSize >= (maximum>>1) {
// Lazily initialize when close to the maximum size
p.sketch.ensureCapacity(maximum)
}
}
func (p *Policy[K, V]) EnsureCapacity(capacity uint64) {
p.Sketch.EnsureCapacity(capacity)
}
// Promote the node from probation to protected on access.
func (p *Policy[K, V]) reorderProbation(n node.Node[K, V]) {
func (p *policy[K, V]) reorderProbation(n node.Node[K, V]) {
nodeWeight := uint64(n.Weight())
if p.Probation.NotContains(n) {
if p.probation.NotContains(n) {
// Ignore stale accesses for an entry that is no longer present
return
} else if nodeWeight > p.MainProtectedMaximum {
reorder(p.Probation, n)
} else if nodeWeight > p.mainProtectedMaximum {
reorder(p.probation, n)
return
}
// If the protected space exceeds its maximum, the LRU items are demoted to the probation space.
// This is deferred to the adaption phase at the end of the maintenance cycle.
p.MainProtectedWeightedSize += nodeWeight
p.Probation.Delete(n)
p.Protected.PushBack(n)
p.mainProtectedWeightedSize += nodeWeight
p.probation.Delete(n)
p.protected.PushBack(n)
n.MakeMainProtected()
}
func (p *Policy[K, V]) EvictNodes(evictNode func(n node.Node[K, V], nowNanos int64)) {
candidate := p.EvictFromWindow()
func (p *policy[K, V]) evictNodes(evictNode func(n node.Node[K, V], nowNanos int64)) {
candidate := p.evictFromWindow()
p.evictFromMain(candidate, evictNode)
}
func (p *Policy[K, V]) EvictFromWindow() node.Node[K, V] {
func (p *policy[K, V]) evictFromWindow() node.Node[K, V] {
var first node.Node[K, V]
n := p.Window.Head()
for p.WindowWeightedSize > p.WindowMaximum {
// The pending operations will adjust the Size to reflect the correct weight
n := p.window.Head()
for p.windowWeightedSize > p.windowMaximum {
// The pending operations will adjust the size to reflect the correct weight
if node.Equals(n, nil) {
break
}
@@ -255,43 +251,43 @@ func (p *Policy[K, V]) EvictFromWindow() node.Node[K, V] {
nodeWeight := uint64(n.Weight())
if nodeWeight != 0 {
n.MakeMainProbation()
p.Window.Delete(n)
p.Probation.PushBack(n)
p.window.Delete(n)
p.probation.PushBack(n)
if first == nil {
first = n
}
p.WindowWeightedSize -= nodeWeight
p.windowWeightedSize -= nodeWeight
}
n = next
}
return first
}
func (p *Policy[K, V]) evictFromMain(candidate node.Node[K, V], evictNode func(n node.Node[K, V], nowNanos int64)) {
func (p *policy[K, V]) evictFromMain(candidate node.Node[K, V], evictNode func(n node.Node[K, V], nowNanos int64)) {
victimQueue := node.InMainProbationQueue
candidateQueue := node.InMainProbationQueue
victim := p.Probation.Head()
for p.WeightedSize > p.Maximum {
victim := p.probation.Head()
for p.weightedSize > p.maximum {
// Search the admission window for additional candidates
if node.Equals(candidate, nil) && candidateQueue == node.InMainProbationQueue {
candidate = p.Window.Head()
candidate = p.window.Head()
candidateQueue = node.InWindowQueue
}
// Try evicting from the protected and window queues
if node.Equals(candidate, nil) && node.Equals(victim, nil) {
if victimQueue == node.InMainProbationQueue {
victim = p.Protected.Head()
victim = p.protected.Head()
victimQueue = node.InMainProtectedQueue
continue
} else if victimQueue == node.InMainProtectedQueue {
victim = p.Window.Head()
victim = p.window.Head()
victimQueue = node.InWindowQueue
continue
}
// The pending operations will adjust the Size to reflect the correct weight
// The pending operations will adjust the size to reflect the correct weight
break
}
@@ -340,7 +336,7 @@ func (p *Policy[K, V]) evictFromMain(candidate node.Node[K, V], evictNode func(n
}
// Evict immediately if the candidate's weight exceeds the maximum
if uint64(candidate.Weight()) > p.Maximum {
if uint64(candidate.Weight()) > p.maximum {
evict := candidate
candidate = candidate.Next()
evictNode(evict, 0)
@@ -348,7 +344,7 @@ func (p *Policy[K, V]) evictFromMain(candidate node.Node[K, V], evictNode func(n
}
// Evict the entry with the lowest frequency
if p.Admit(candidate.Key(), victim.Key()) {
if p.admit(candidate.Key(), victim.Key()) {
evict := victim
victim = victim.Next()
evictNode(evict, 0)
@@ -361,25 +357,25 @@ func (p *Policy[K, V]) evictFromMain(candidate node.Node[K, V], evictNode func(n
}
}
func (p *Policy[K, V]) Admit(candidateKey, victimKey K) bool {
victimFreq := p.Sketch.Frequency(victimKey)
candidateFreq := p.Sketch.Frequency(candidateKey)
func (p *policy[K, V]) admit(candidateKey, victimKey K) bool {
victimFreq := p.sketch.frequency(victimKey)
candidateFreq := p.sketch.frequency(candidateKey)
if candidateFreq > victimFreq {
return true
}
if candidateFreq >= AdmitHashdosThreshold {
if candidateFreq >= admitHashdosThreshold {
// The maximum frequency is 15 and halved to 7 after a reset to age the history. An attack
// exploits that a hot candidate is rejected in favor of a hot victim. The threshold of a warm
// candidate reduces the number of random acceptances to minimize the impact on the hit rate.
return (p.Rand() & 127) == 0
return (p.rand() & 127) == 0
}
return false
}
func (p *Policy[K, V]) Climb() {
func (p *policy[K, V]) climb() {
p.determineAdjustment()
p.demoteFromMainProtected()
amount := p.Adjustment
amount := p.adjustment
if amount == 0 {
return
}
@@ -390,24 +386,24 @@ func (p *Policy[K, V]) Climb() {
}
}
func (p *Policy[K, V]) determineAdjustment() {
if p.Sketch.IsNotInitialized() {
p.PreviousSampleHitRate = 0.0
p.MissesInSample = 0
p.HitsInSample = 0
func (p *policy[K, V]) determineAdjustment() {
if p.sketch.isNotInitialized() {
p.previousSampleHitRate = 0.0
p.missesInSample = 0
p.hitsInSample = 0
return
}
requestCount := p.HitsInSample + p.MissesInSample
if requestCount < p.Sketch.SampleSize {
requestCount := p.hitsInSample + p.missesInSample
if requestCount < p.sketch.sampleSize {
return
}
hitRate := float64(p.HitsInSample) / float64(requestCount)
hitRateChange := hitRate - p.PreviousSampleHitRate
amount := p.StepSize
hitRate := float64(p.hitsInSample) / float64(requestCount)
hitRateChange := hitRate - p.previousSampleHitRate
amount := p.stepSize
if hitRateChange < 0 {
amount = -p.StepSize
amount = -p.stepSize
}
var nextStepSize float64
if abs(hitRateChange) >= hillClimberRestartThreshold {
@@ -415,20 +411,20 @@ func (p *Policy[K, V]) determineAdjustment() {
if amount >= 0 {
k = float64(1)
}
nextStepSize = hillClimberStepPercent * float64(p.Maximum) * k
nextStepSize = hillClimberStepPercent * float64(p.maximum) * k
} else {
nextStepSize = hillClimberStepDecayRate * amount
}
p.PreviousSampleHitRate = hitRate
p.Adjustment = int64(amount)
p.StepSize = nextStepSize
p.MissesInSample = 0
p.HitsInSample = 0
p.previousSampleHitRate = hitRate
p.adjustment = int64(amount)
p.stepSize = nextStepSize
p.missesInSample = 0
p.hitsInSample = 0
}
func (p *Policy[K, V]) demoteFromMainProtected() {
mainProtectedMaximum := p.MainProtectedMaximum
mainProtectedWeightedSize := p.MainProtectedWeightedSize
func (p *policy[K, V]) demoteFromMainProtected() {
mainProtectedMaximum := p.mainProtectedMaximum
mainProtectedWeightedSize := p.mainProtectedWeightedSize
if mainProtectedWeightedSize <= mainProtectedMaximum {
return
}
@@ -438,36 +434,36 @@ func (p *Policy[K, V]) demoteFromMainProtected() {
break
}
demoted := p.Protected.PopFront()
demoted := p.protected.PopFront()
if node.Equals(demoted, nil) {
break
}
demoted.MakeMainProbation()
p.Probation.PushBack(demoted)
p.probation.PushBack(demoted)
mainProtectedWeightedSize -= uint64(demoted.Weight())
}
p.MainProtectedWeightedSize = mainProtectedWeightedSize
p.mainProtectedWeightedSize = mainProtectedWeightedSize
}
func (p *Policy[K, V]) increaseWindow() {
if p.MainProtectedMaximum == 0 {
func (p *policy[K, V]) increaseWindow() {
if p.mainProtectedMaximum == 0 {
return
}
quota := p.Adjustment
if p.MainProtectedMaximum < uint64(p.Adjustment) {
quota = int64(p.MainProtectedMaximum)
quota := p.adjustment
if p.mainProtectedMaximum < uint64(p.adjustment) {
quota = int64(p.mainProtectedMaximum)
}
p.MainProtectedMaximum -= uint64(quota)
p.WindowMaximum += uint64(quota)
p.mainProtectedMaximum -= uint64(quota)
p.windowMaximum += uint64(quota)
p.demoteFromMainProtected()
for i := 0; i < queueTransferThreshold; i++ {
candidate := p.Probation.Head()
candidate := p.probation.Head()
probation := true
if node.Equals(candidate, nil) || quota < int64(candidate.Weight()) {
candidate = p.Protected.Head()
candidate = p.protected.Head()
probation = false
}
if node.Equals(candidate, nil) {
@@ -481,36 +477,36 @@ func (p *Policy[K, V]) increaseWindow() {
quota -= int64(weight)
if probation {
p.Probation.Delete(candidate)
p.probation.Delete(candidate)
} else {
p.MainProtectedWeightedSize -= weight
p.Protected.Delete(candidate)
p.mainProtectedWeightedSize -= weight
p.protected.Delete(candidate)
}
p.WindowWeightedSize += weight
p.Window.PushBack(candidate)
p.windowWeightedSize += weight
p.window.PushBack(candidate)
candidate.MakeWindow()
}
p.MainProtectedMaximum += uint64(quota)
p.WindowMaximum -= uint64(quota)
p.Adjustment = quota
p.mainProtectedMaximum += uint64(quota)
p.windowMaximum -= uint64(quota)
p.adjustment = quota
}
func (p *Policy[K, V]) decreaseWindow() {
if p.WindowMaximum <= 1 {
func (p *policy[K, V]) decreaseWindow() {
if p.windowMaximum <= 1 {
return
}
quota := -p.Adjustment
windowMaximum := max(0, p.WindowMaximum-1)
if windowMaximum < uint64(-p.Adjustment) {
quota := -p.adjustment
windowMaximum := max(0, p.windowMaximum-1)
if windowMaximum < uint64(-p.adjustment) {
quota = int64(windowMaximum)
}
p.MainProtectedMaximum += uint64(quota)
p.WindowMaximum -= uint64(quota)
p.mainProtectedMaximum += uint64(quota)
p.windowMaximum -= uint64(quota)
for i := 0; i < queueTransferThreshold; i++ {
candidate := p.Window.Head()
candidate := p.window.Head()
if node.Equals(candidate, nil) {
break
}
@@ -521,15 +517,15 @@ func (p *Policy[K, V]) decreaseWindow() {
}
quota -= weight
p.WindowWeightedSize -= uint64(weight)
p.Window.Delete(candidate)
p.Probation.PushBack(candidate)
p.windowWeightedSize -= uint64(weight)
p.window.Delete(candidate)
p.probation.PushBack(candidate)
candidate.MakeMainProbation()
}
p.MainProtectedMaximum -= uint64(quota)
p.WindowMaximum += uint64(quota)
p.Adjustment = -quota
p.mainProtectedMaximum -= uint64(quota)
p.windowMaximum += uint64(quota)
p.adjustment = -quota
}
func abs(a float64) float64 {

View File

@@ -12,11 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package tinylfu
package otter
import (
"math"
"math/bits"
"sync/atomic"
"github.com/maypok86/otter/v2/internal/xmath"
"github.com/maypok86/otter/v2/internal/xruntime"
@@ -27,76 +28,80 @@ const (
oneMask = 0x1111111111111111
)
// Sketch is a probabilistic multiset for estimating the popularity of an element within a time window. The
// sketch is a probabilistic multiset for estimating the popularity of an element within a time window. The
// maximum frequency of an element is limited to 15 (4-bits) and an aging process periodically
// halves the popularity of all elements.
type Sketch[K comparable] struct {
Table []uint64
SampleSize uint64
BlockMask uint64
Size uint64
hasher xruntime.Hasher[K]
type sketch[K comparable] struct {
table []uint64
sampleSize uint64
blockMask uint64
size uint64
hasher xruntime.Hasher[K]
isInitialized atomic.Bool
}
func newSketch[K comparable]() *Sketch[K] {
return &Sketch[K]{
func newSketch[K comparable]() *sketch[K] {
return &sketch[K]{
hasher: xruntime.NewHasher[K](),
}
}
func (s *Sketch[K]) EnsureCapacity(maximumSize uint64) {
if uint64(len(s.Table)) >= maximumSize {
func (s *sketch[K]) ensureCapacity(maximumSize uint64) {
if uint64(len(s.table)) >= maximumSize {
return
}
if !s.isInitialized.Load() {
s.isInitialized.Store(true)
}
newSize := xmath.RoundUpPowerOf264(maximumSize)
if newSize < 8 {
newSize = 8
}
s.Table = make([]uint64, newSize)
s.SampleSize = 10
s.table = make([]uint64, newSize)
s.sampleSize = 10
if maximumSize != 0 {
s.SampleSize = 10 * maximumSize
s.sampleSize = 10 * maximumSize
}
s.BlockMask = (uint64(len(s.Table)) >> 3) - 1
s.Size = 0
s.blockMask = (uint64(len(s.table)) >> 3) - 1
s.size = 0
s.hasher = xruntime.NewHasher[K]()
}
func (s *Sketch[K]) IsNotInitialized() bool {
return s.Table == nil
func (s *sketch[K]) isNotInitialized() bool {
return !s.isInitialized.Load()
}
func (s *Sketch[K]) Frequency(k K) uint64 {
if s.IsNotInitialized() {
func (s *sketch[K]) frequency(k K) uint64 {
if s.isNotInitialized() {
return 0
}
frequency := uint64(math.MaxUint64)
blockHash := s.hash(k)
counterHash := rehash(blockHash)
block := (blockHash & s.BlockMask) << 3
block := (blockHash & s.blockMask) << 3
for i := uint64(0); i < 4; i++ {
h := counterHash >> (i << 3)
index := (h >> 1) & 15
offset := h & 1
slot := block + offset + (i << 1)
count := (s.Table[slot] >> (index << 2)) & 0xf
count := (s.table[slot] >> (index << 2)) & 0xf
frequency = min(frequency, count)
}
return frequency
}
func (s *Sketch[K]) Increment(k K) {
if s.IsNotInitialized() {
func (s *sketch[K]) increment(k K) {
if s.isNotInitialized() {
return
}
blockHash := s.hash(k)
counterHash := rehash(blockHash)
block := (blockHash & s.BlockMask) << 3
block := (blockHash & s.blockMask) << 3
// Loop unrolling improves throughput by 10m ops/s
h0 := counterHash
@@ -120,34 +125,34 @@ func (s *Sketch[K]) Increment(k K) {
added = s.incrementAt(slot3, index3) || added
if added {
s.Size++
if s.Size == s.SampleSize {
s.size++
if s.size == s.sampleSize {
s.reset()
}
}
}
func (s *Sketch[K]) incrementAt(i, j uint64) bool {
func (s *sketch[K]) incrementAt(i, j uint64) bool {
offset := j << 2
mask := uint64(0xf) << offset
if (s.Table[i] & mask) != mask {
s.Table[i] += uint64(1) << offset
if (s.table[i] & mask) != mask {
s.table[i] += uint64(1) << offset
return true
}
return false
}
func (s *Sketch[K]) reset() {
func (s *sketch[K]) reset() {
count := 0
for i := 0; i < len(s.Table); i++ {
count += bits.OnesCount64(s.Table[i] & oneMask)
s.Table[i] = (s.Table[i] >> 1) & resetMask
for i := 0; i < len(s.table); i++ {
count += bits.OnesCount64(s.table[i] & oneMask)
s.table[i] = (s.table[i] >> 1) & resetMask
}
//nolint:gosec // there's no overflow
s.Size = (s.Size - (uint64(count) >> 2)) >> 1
s.size = (s.size - (uint64(count) >> 2)) >> 1
}
func (s *Sketch[K]) hash(k K) uint64 {
func (s *sketch[K]) hash(k K) uint64 {
return spread(s.hasher.Hash(k))
}

5
vendor/modules.txt vendored
View File

@@ -119,17 +119,16 @@ github.com/mailru/easyjson
github.com/mailru/easyjson/buffer
github.com/mailru/easyjson/jlexer
github.com/mailru/easyjson/jwriter
# github.com/maypok86/otter/v2 v2.0.0
# github.com/maypok86/otter/v2 v2.1.0
## explicit; go 1.24
github.com/maypok86/otter/v2
github.com/maypok86/otter/v2/internal/clock
github.com/maypok86/otter/v2/internal/deque
github.com/maypok86/otter/v2/internal/deque/queue
github.com/maypok86/otter/v2/internal/eviction/tinylfu
github.com/maypok86/otter/v2/internal/expiration
github.com/maypok86/otter/v2/internal/generated/node
github.com/maypok86/otter/v2/internal/hashmap
github.com/maypok86/otter/v2/internal/lossy
github.com/maypok86/otter/v2/internal/xiter
github.com/maypok86/otter/v2/internal/xmath
github.com/maypok86/otter/v2/internal/xruntime
github.com/maypok86/otter/v2/internal/xsync