From 21ceaaf1803fe9e3aef5d901f75f3cdb2c3dc295 Mon Sep 17 00:00:00 2001 From: Taras <505555+ribtoks@users.noreply.github.com> Date: Mon, 30 Jun 2025 13:42:15 +0000 Subject: [PATCH] Update module github.com/maypok86/otter/v2 to v2.1.0 (#180) Co-authored-by: Renovate Bot --- go.mod | 2 +- go.sum | 2 + .../github.com/maypok86/otter/v2/CHANGELOG.md | 21 + vendor/github.com/maypok86/otter/v2/README.md | 24 +- vendor/github.com/maypok86/otter/v2/cache.go | 173 +++- .../maypok86/otter/v2/cache_impl.go | 811 ++++++++++++------ vendor/github.com/maypok86/otter/v2/clock.go | 232 +++++ .../maypok86/otter/v2/internal/clock/real.go | 57 -- .../otter/v2/internal/deque/linked.go | 26 + .../otter/v2/internal/expiration/variable.go | 30 +- .../otter/v2/internal/generated/node/berw.go | 1 - .../otter/v2/internal/generated/node/bew.go | 1 - .../otter/v2/internal/generated/node/brw.go | 1 - .../otter/v2/internal/generated/node/bs.go | 1 - .../otter/v2/internal/generated/node/bse.go | 1 - .../otter/v2/internal/generated/node/bser.go | 1 - .../otter/v2/internal/generated/node/bsr.go | 1 - .../otter/v2/internal/generated/node/bw.go | 1 - .../maypok86/otter/v2/internal/xiter/xiter.go | 63 ++ .../maypok86/otter/v2/internal/xmath/xmath.go | 10 + .../github.com/maypok86/otter/v2/mkdocs.yml | 4 +- .../github.com/maypok86/otter/v2/options.go | 8 + .../maypok86/otter/v2/persistence.go | 155 ++++ .../{internal/eviction/tinylfu => }/policy.go | 360 ++++---- .../{internal/eviction/tinylfu => }/sketch.go | 79 +- vendor/modules.txt | 5 +- 26 files changed, 1497 insertions(+), 573 deletions(-) create mode 100644 vendor/github.com/maypok86/otter/v2/clock.go delete mode 100644 vendor/github.com/maypok86/otter/v2/internal/clock/real.go create mode 100644 vendor/github.com/maypok86/otter/v2/internal/xiter/xiter.go create mode 100644 vendor/github.com/maypok86/otter/v2/persistence.go rename vendor/github.com/maypok86/otter/v2/{internal/eviction/tinylfu => }/policy.go (52%) rename vendor/github.com/maypok86/otter/v2/{internal/eviction/tinylfu => }/sketch.go (64%) diff --git a/go.mod b/go.mod index c821175c..1d7f5080 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/joho/godotenv v1.5.1 github.com/jpillora/backoff v1.0.0 github.com/justinas/alice v1.2.0 - github.com/maypok86/otter/v2 v2.0.0 + github.com/maypok86/otter/v2 v2.1.0 github.com/prometheus/client_golang v1.22.0 github.com/realclientip/realclientip-go v1.0.0 github.com/rs/cors v1.11.1 diff --git a/go.sum b/go.sum index 23ee119f..a2dbfd1e 100644 --- a/go.sum +++ b/go.sum @@ -126,6 +126,8 @@ github.com/maypok86/otter/v2 v2.0.0-20250616223213-73ec080fc790 h1:omxwgqzRsRWJp github.com/maypok86/otter/v2 v2.0.0-20250616223213-73ec080fc790/go.mod h1:jX2xEKz9PrNVbDqnk8JUuOt5kURK8h7jd1kDYI5QsZk= github.com/maypok86/otter/v2 v2.0.0 h1:t+A6nUOQJZTRAQpDGltBKZBcRDjFfLBPTxEZGFfh48A= github.com/maypok86/otter/v2 v2.0.0/go.mod h1:jX2xEKz9PrNVbDqnk8JUuOt5kURK8h7jd1kDYI5QsZk= +github.com/maypok86/otter/v2 v2.1.0 h1:H+FO9NtLuSWYUlIUQ/kT6VNEpWSIF4w4GZJRDhxYb7k= +github.com/maypok86/otter/v2 v2.1.0/go.mod h1:jX2xEKz9PrNVbDqnk8JUuOt5kURK8h7jd1kDYI5QsZk= github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0= github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo= github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0= diff --git a/vendor/github.com/maypok86/otter/v2/CHANGELOG.md b/vendor/github.com/maypok86/otter/v2/CHANGELOG.md index e8871244..f626664d 100644 --- a/vendor/github.com/maypok86/otter/v2/CHANGELOG.md +++ b/vendor/github.com/maypok86/otter/v2/CHANGELOG.md @@ -1,3 +1,24 @@ +## 2.1.0 - 2025-06-29 + +### ✨Features + +- Added `Compute`, `ComputeIfAbsent` and `ComputeIfPresent` methods +- Added `LoadCacheFrom`, `LoadCacheFromFile`, `SaveCacheTo` and `SaveCacheToFile` functions +- Added `Clock` interface and option for time mocking +- Added `Keys` and `Values` iterators +- Added `Hottest` and `Coldest` iterators + +### 🚀 Improvements + +- Slightly reduced memory consumption +- Cache became significantly faster in cases when it's lightly populated +- Reduced number of allocations during refresh + +### 🐞 Bug Fixes + +- Fixed a bug in timer wheel ([#64](https://github.com/Yiling-J/theine-go/issues/64)) +- Added usage of `context.WithoutCancel` during refresh execution ([#124](https://github.com/maypok86/otter/issues/124)) + ## 2.0.0 - 2025-06-18 ### 📝 Description diff --git a/vendor/github.com/maypok86/otter/v2/README.md b/vendor/github.com/maypok86/otter/v2/README.md index ecc64ecb..55dd7657 100644 --- a/vendor/github.com/maypok86/otter/v2/README.md +++ b/vendor/github.com/maypok86/otter/v2/README.md @@ -4,7 +4,7 @@

-Go Reference +Go Reference @@ -34,7 +34,7 @@ Otter is designed to provide an excellent developer experience while maintaining Performance-wise, Otter provides: -- [High hit rates](https://maypok86.github.io/otter/performance/hit-ratio/) across all workload types via adaptive W-TinyLFU +- [High hit rates](https://maypok86.github.io/otter/performance/hit-ratio/) across all workload types via [adaptive W-TinyLFU](https://dl.acm.org/citation.cfm?id=3274816) - [Excellent throughput](https://maypok86.github.io/otter/performance/throughput/) under high contention on most workload types - Among the lowest [memory overheads](https://maypok86.github.io/otter/performance/memory-consumption/) across all cache capacities - Automatic data structures configuration based on contention/parallelism and workload patterns @@ -42,10 +42,12 @@ Performance-wise, Otter provides: Otter also provides a highly configurable caching API, enabling any combination of these optional features: - Size-based [eviction](https://maypok86.github.io/otter/user-guide/v2/features/eviction/#size-based) when a maximum is exceeded -- Time-based [expiration](https://maypok86.github.io/otter/user-guide/v2/features/eviction/#time-based) of entries (using [Hierarchical Timing Wheel](http://www.cs.columbia.edu/~nahum/w6998/papers/ton97-timing-wheels.pdf)), measured since last access or last write +- Time-based [expiration](https://maypok86.github.io/otter/user-guide/v2/features/eviction/#time-based) of entries, measured since last access or last write - [Automatic loading](https://maypok86.github.io/otter/user-guide/v2/features/loading/) of entries into the cache - [Asynchronously refresh](https://maypok86.github.io/otter/user-guide/v2/features/refresh/) when the first stale request for an entry occurs +- [Writes propagated](https://maypok86.github.io/otter/user-guide/v2/features/compute/) to an external resource - Accumulation of cache access [statistics](https://maypok86.github.io/otter/user-guide/v2/features/statistics/) +- [Saving cache](https://maypok86.github.io/otter/user-guide/v2/features/persistence/) to a file and loading cache from a file ## 📚 Usage @@ -125,16 +127,13 @@ func main() { // Phase 2: Test cache stampede protection // -------------------------------------- - loader := otter.LoaderFunc[string, string](func(ctx context.Context, key string) (string, error) { - if key != "key" { - panic("incorrect key") // Validate key - } + loader := func(ctx context.Context, key string) (string, error) { time.Sleep(200 * time.Millisecond) // Simulate slow load return "value1", nil // Return new value - }) + } // Concurrent Gets would deduplicate loader calls - value, err := cache.Get(ctx, "key", loader) + value, err := cache.Get(ctx, "key", otter.LoaderFunc[string, string](loader)) if err != nil { panic(err) } @@ -148,15 +147,12 @@ func main() { // New loader that returns updated value loader = func(ctx context.Context, key string) (string, error) { - if key != "key" { - panic("incorrect key") - } time.Sleep(100 * time.Millisecond) // Simulate refresh return "value2", nil // Return refreshed value } // This triggers async refresh but returns current value - value, err = cache.Get(ctx, "key", loader) + value, err = cache.Get(ctx, "key", otter.LoaderFunc[string, string](loader)) if err != nil { panic(err) } @@ -221,7 +217,7 @@ Otter is based on the following papers: - [TinyLFU: A Highly Efficient Cache Admission Policy](https://dl.acm.org/citation.cfm?id=3149371) - [Adaptive Software Cache Management](https://dl.acm.org/citation.cfm?id=3274816) - [Denial of Service via Algorithmic Complexity Attack](https://www.usenix.org/legacy/events/sec03/tech/full_papers/crosby/crosby.pdf) -- [Hashed and Hierarchical Timing Wheels](http://www.cs.columbia.edu/~nahum/w6998/papers/ton97-timing-wheels.pdf) +- [Hashed and Hierarchical Timing Wheels](https://ieeexplore.ieee.org/document/650142) - [A large scale analysis of hundreds of in-memory cache clusters at Twitter](https://www.usenix.org/system/files/osdi20-yang.pdf) ## 👏 Contribute diff --git a/vendor/github.com/maypok86/otter/v2/cache.go b/vendor/github.com/maypok86/otter/v2/cache.go index 2782d422..5c5adc98 100644 --- a/vendor/github.com/maypok86/otter/v2/cache.go +++ b/vendor/github.com/maypok86/otter/v2/cache.go @@ -21,6 +21,37 @@ import ( "time" ) +// ComputeOp tells the Compute methods what to do. +type ComputeOp int + +const ( + // CancelOp signals to Compute to not do anything as a result + // of executing the lambda. If the entry was not present in + // the map, nothing happens, and if it was present, the + // returned value is ignored. + CancelOp ComputeOp = iota + // WriteOp signals to Compute to update the entry to the + // value returned by the lambda, creating it if necessary. + WriteOp + // InvalidateOp signals to Compute to always discard the entry + // from the cache. + InvalidateOp +) + +var computeOpStrings = []string{ + "CancelOp", + "WriteOp", + "InvalidateOp", +} + +// String implements [fmt.Stringer] interface. +func (co ComputeOp) String() string { + if co >= 0 && int(co) < len(computeOpStrings) { + return computeOpStrings[co] + } + return "" +} + // Cache is an in-memory cache implementation that supports full concurrency of retrievals and multiple ways to bound the cache. type Cache[K comparable, V any] struct { cache *cache[K, V] @@ -100,6 +131,86 @@ func (c *Cache[K, V]) SetIfAbsent(key K, value V) (V, bool) { return c.cache.SetIfAbsent(key, value) } +// Compute either sets the computed new value for the key, +// invalidates the value for the key, or does nothing, based on +// the returned [ComputeOp]. When the op returned by remappingFunc +// is [WriteOp], the value is updated to the new value. If +// it is [InvalidateOp], the entry is removed from the cache +// altogether. And finally, if the op is [CancelOp] then the +// entry is left as-is. In other words, if it did not already +// exist, it is not created, and if it did exist, it is not +// updated. This is useful to synchronously execute some +// operation on the value without incurring the cost of +// updating the cache every time. +// +// The ok result indicates whether the entry is present in the cache after the compute operation. +// The actualValue result contains the value of the cache +// if a corresponding entry is present, or the zero value +// otherwise. You can think of these results as equivalent to regular key-value lookups in a map. +// +// This call locks a hash table bucket while the compute function +// is executed. It means that modifications on other entries in +// the bucket will be blocked until the remappingFunc executes. Consider +// this when the function includes long-running operations. +func (c *Cache[K, V]) Compute( + key K, + remappingFunc func(oldValue V, found bool) (newValue V, op ComputeOp), +) (actualValue V, ok bool) { + return c.cache.Compute(key, remappingFunc) +} + +// ComputeIfAbsent returns the existing value for the key if +// present. Otherwise, it tries to compute the value using the +// provided function. If mappingFunc returns true as the cancel value, the computation is cancelled and the zero value +// for type V is returned. +// +// The ok result indicates whether the entry is present in the cache after the compute operation. +// The actualValue result contains the value of the cache +// if a corresponding entry is present, or the zero value +// otherwise. You can think of these results as equivalent to regular key-value lookups in a map. +// +// This call locks a hash table bucket while the compute function +// is executed. It means that modifications on other entries in +// the bucket will be blocked until the valueFn executes. Consider +// this when the function includes long-running operations. +func (c *Cache[K, V]) ComputeIfAbsent( + key K, + mappingFunc func() (newValue V, cancel bool), +) (actualValue V, ok bool) { + return c.cache.ComputeIfAbsent(key, mappingFunc) +} + +// ComputeIfPresent returns the zero value for type V if the key is not found. +// Otherwise, it tries to compute the value using the provided function. +// +// ComputeIfPresent either sets the computed new value for the key, +// invalidates the value for the key, or does nothing, based on +// the returned [ComputeOp]. When the op returned by remappingFunc +// is [WriteOp], the value is updated to the new value. If +// it is [InvalidateOp], the entry is removed from the cache +// altogether. And finally, if the op is [CancelOp] then the +// entry is left as-is. In other words, if it did not already +// exist, it is not created, and if it did exist, it is not +// updated. This is useful to synchronously execute some +// operation on the value without incurring the cost of +// updating the cache every time. +// +// The ok result indicates whether the entry is present in the cache after the compute operation. +// The actualValue result contains the value of the cache +// if a corresponding entry is present, or the zero value +// otherwise. You can think of these results as equivalent to regular key-value lookups in a map. +// +// This call locks a hash table bucket while the compute function +// is executed. It means that modifications on other entries in +// the bucket will be blocked until the valueFn executes. Consider +// this when the function includes long-running operations. +func (c *Cache[K, V]) ComputeIfPresent( + key K, + remappingFunc func(oldValue V) (newValue V, op ComputeOp), +) (actualValue V, ok bool) { + return c.cache.ComputeIfPresent(key, remappingFunc) +} + // SetExpiresAfter specifies that the entry should be automatically removed from the cache once the duration has // elapsed. The expiration policy determines when the entry's age is reset. func (c *Cache[K, V]) SetExpiresAfter(key K, expiresAfter time.Duration) { @@ -124,6 +235,10 @@ func (c *Cache[K, V]) SetRefreshableAfter(key K, refreshableAfter time.Duration) // // No observable state associated with this cache is modified until loading completes. // +// WARNING: When performing a refresh (see [RefreshCalculator]), +// the [Loader] will receive a context wrapped in [context.WithoutCancel]. +// If you need to control refresh cancellation, you can use closures or values stored in the context. +// // WARNING: [Loader] must not attempt to update any mappings of this cache directly. // // WARNING: For any given key, every loader used with it should compute the same value. @@ -146,6 +261,10 @@ func (c *Cache[K, V]) Get(ctx context.Context, key K, loader Loader[K, V]) (V, e // // NOTE: duplicate elements in keys will be ignored. // +// WARNING: When performing a refresh (see [RefreshCalculator]), +// the [BulkLoader] will receive a context wrapped in [context.WithoutCancel]. +// If you need to control refresh cancellation, you can use closures or values stored in the context. +// // WARNING: [BulkLoader] must not attempt to update any mappings of this cache directly. // // WARNING: For any given key, every bulkLoader used with it should compute the same value. @@ -170,6 +289,10 @@ func (c *Cache[K, V]) BulkGet(ctx context.Context, keys []K, bulkLoader BulkLoad // // Refresh returns a channel that will receive the result when it is ready. The returned channel will not be closed. // +// WARNING: When performing a refresh (see [RefreshCalculator]), +// the [Loader] will receive a context wrapped in [context.WithoutCancel]. +// If you need to control refresh cancellation, you can use closures or values stored in the context. +// // WARNING: If the cache was constructed without [RefreshCalculator], then Refresh will return the nil channel. // // WARNING: Loader.Load and Loader.Reload must not attempt to update any mappings of this cache directly. @@ -197,6 +320,10 @@ func (c *Cache[K, V]) Refresh(ctx context.Context, key K, loader Loader[K, V]) < // // NOTE: duplicate elements in keys will be ignored. // +// WARNING: When performing a refresh (see [RefreshCalculator]), +// the [BulkLoader] will receive a context wrapped in [context.WithoutCancel]. +// If you need to control refresh cancellation, you can use closures or values stored in the context. +// // WARNING: If the cache was constructed without [RefreshCalculator], then BulkRefresh will return the nil channel. // // WARNING: BulkLoader.BulkLoad and BulkLoader.BulkReload must not attempt to update any mappings of this cache directly. @@ -228,6 +355,26 @@ func (c *Cache[K, V]) All() iter.Seq2[K, V] { return c.cache.All() } +// Keys returns an iterator over all keys in the cache. +// The iteration order is not specified and is not guaranteed to be the same from one call to the next. +// +// Iterator is at least weakly consistent: he is safe for concurrent use, +// but if the cache is modified (including by eviction) after the iterator is +// created, it is undefined which of the changes (if any) will be reflected in that iterator. +func (c *Cache[K, V]) Keys() iter.Seq[K] { + return c.cache.Keys() +} + +// Values returns an iterator over all values in the cache. +// The iteration order is not specified and is not guaranteed to be the same from one call to the next. +// +// Iterator is at least weakly consistent: he is safe for concurrent use, +// but if the cache is modified (including by eviction) after the iterator is +// created, it is undefined which of the changes (if any) will be reflected in that iterator. +func (c *Cache[K, V]) Values() iter.Seq[V] { + return c.cache.Values() +} + // InvalidateAll discards all entries in the cache. The behavior of this operation is undefined for an entry // that is being loaded (or reloaded) and is otherwise not present. func (c *Cache[K, V]) InvalidateAll() { @@ -249,7 +396,7 @@ func (c *Cache[K, V]) SetMaximum(maximum uint64) { } // GetMaximum returns the maximum total weighted or unweighted size of this cache, depending on how the -// cache was constructed. +// cache was constructed. If this cache does not use a (weighted) size bound, then the method will return math.MaxUint64. func (c *Cache[K, V]) GetMaximum() uint64 { return c.cache.GetMaximum() } @@ -268,6 +415,30 @@ func (c *Cache[K, V]) WeightedSize() uint64 { return c.cache.WeightedSize() } +// Hottest returns an iterator for ordered traversal of the cache entries. The order of +// iteration is from the entries most likely to be retained (hottest) to the entries least +// likely to be retained (coldest). This order is determined by the eviction policy's best guess +// at the start of the iteration. +// +// WARNING: Beware that this iteration is performed within the eviction policy's exclusive lock, so the +// iteration should be short and simple. While the iteration is in progress further eviction +// maintenance will be halted. +func (c *Cache[K, V]) Hottest() iter.Seq[Entry[K, V]] { + return c.cache.Hottest() +} + +// Coldest returns an iterator for ordered traversal of the cache entries. The order of +// iteration is from the entries least likely to be retained (coldest) to the entries most +// likely to be retained (hottest). This order is determined by the eviction policy's best guess +// at the start of the iteration. +// +// WARNING: Beware that this iteration is performed within the eviction policy's exclusive lock, so the +// iteration should be short and simple. While the iteration is in progress further eviction +// maintenance will be halted. +func (c *Cache[K, V]) Coldest() iter.Seq[Entry[K, V]] { + return c.cache.Coldest() +} + func (c *Cache[K, V]) has(key K) bool { return c.cache.has(key) } diff --git a/vendor/github.com/maypok86/otter/v2/cache_impl.go b/vendor/github.com/maypok86/otter/v2/cache_impl.go index e7968a23..7eabcae4 100644 --- a/vendor/github.com/maypok86/otter/v2/cache_impl.go +++ b/vendor/github.com/maypok86/otter/v2/cache_impl.go @@ -15,31 +15,32 @@ package otter import ( + "cmp" "context" "errors" "fmt" "iter" + "math" "runtime" "sync" "sync/atomic" "time" - "github.com/maypok86/otter/v2/internal/clock" "github.com/maypok86/otter/v2/internal/deque/queue" - "github.com/maypok86/otter/v2/internal/eviction/tinylfu" "github.com/maypok86/otter/v2/internal/expiration" "github.com/maypok86/otter/v2/internal/generated/node" "github.com/maypok86/otter/v2/internal/hashmap" "github.com/maypok86/otter/v2/internal/lossy" + "github.com/maypok86/otter/v2/internal/xiter" "github.com/maypok86/otter/v2/internal/xmath" "github.com/maypok86/otter/v2/internal/xruntime" "github.com/maypok86/otter/v2/stats" ) const ( - unreachableExpiresAfter = xruntime.MaxDuration - unreachableRefreshableAfter = xruntime.MaxDuration - noTime = int64(0) + unreachableExpiresAt = int64(xruntime.MaxDuration) + unreachableRefreshableAt = int64(xruntime.MaxDuration) + noTime = int64(0) minWriteBufferSize = 4 writeBufferRetries = 100 @@ -81,11 +82,12 @@ type cache[K comparable, V any] struct { _ [xruntime.CacheLineSize - 4]byte nodeManager *node.Manager[K, V] hashmap *hashmap.Map[K, V, node.Node[K, V]] - evictionPolicy *tinylfu.Policy[K, V] + evictionPolicy *policy[K, V] expirationPolicy *expiration.Variable[K, V] stats stats.Recorder logger Logger - clock *clock.Real + clock timeSource + statsClock *realSource readBuffer *lossy.Striped[K, V] writeBuffer *queue.MPSC[task[K, V]] executor func(fn func()) @@ -105,7 +107,6 @@ type cache[K comparable, V any] struct { withEviction bool isWeighted bool withMaintenance bool - withStats bool } // newCache returns a new cache instance based on the settings from Options. @@ -121,41 +122,44 @@ func newCache[K comparable, V any](o *Options[K, V]) *cache[K, V] { maximum := o.getMaximum() withEviction := maximum > 0 - var readBuffer *lossy.Striped[K, V] - if withEviction { - readBuffer = lossy.NewStriped(maxStripedBufferSize, nodeManager) - } - withStats := o.StatsRecorder != nil + if withStats { + _, ok := o.StatsRecorder.(*stats.NoopRecorder) + withStats = withStats && !ok + } + statsRecorder := o.StatsRecorder if !withStats { - o.StatsRecorder = &stats.NoopRecorder{} + statsRecorder = &stats.NoopRecorder{} } c := &cache[K, V]{ nodeManager: nodeManager, hashmap: hashmap.NewWithSize[K, V, node.Node[K, V]](nodeManager, o.getInitialCapacity()), - stats: o.StatsRecorder, + stats: statsRecorder, logger: o.getLogger(), - readBuffer: readBuffer, singleflight: &group[K, V]{}, executor: o.getExecutor(), hasDefaultExecutor: o.Executor == nil, weigher: o.getWeigher(), onDeletion: o.OnDeletion, onAtomicDeletion: o.OnAtomicDeletion, - clock: &clock.Real{}, - withStats: withStats, + clock: newTimeSource(o.Clock), + statsClock: &realSource{}, expiryCalculator: o.ExpiryCalculator, refreshCalculator: o.RefreshCalculator, isWeighted: withWeight, } + if withStats { + c.statsClock.Init() + } + c.withEviction = withEviction if c.withEviction { - c.evictionPolicy = tinylfu.NewPolicy[K, V](withWeight) + c.evictionPolicy = newPolicy[K, V](withWeight) if o.hasInitialCapacity() { //nolint:gosec // there's no overflow - c.evictionPolicy.EnsureCapacity(min(maximum, uint64(o.getInitialCapacity()))) + c.evictionPolicy.sketch.ensureCapacity(min(maximum, uint64(o.getInitialCapacity()))) } } @@ -169,6 +173,7 @@ func newCache[K comparable, V any](o *Options[K, V]) *cache[K, V] { c.withMaintenance = c.withEviction || c.withExpiration if c.withMaintenance { + c.readBuffer = lossy.NewStriped(maxStripedBufferSize, nodeManager) c.writeBuffer = queue.NewMPSC[task[K, V]](minWriteBufferSize, maxWriteBufferSize) } if c.withTime { @@ -188,29 +193,29 @@ func newCache[K comparable, V any](o *Options[K, V]) *cache[K, V] { func (c *cache[K, V]) newNode(key K, value V, old node.Node[K, V]) node.Node[K, V] { weight := c.weigher(key, value) - expiresAt := int64(unreachableExpiresAfter) + expiresAt := unreachableExpiresAt if c.withExpiration && old != nil { expiresAt = old.ExpiresAt() } - refreshableAt := int64(unreachableRefreshableAfter) + refreshableAt := unreachableRefreshableAt if c.withRefresh && old != nil { refreshableAt = old.RefreshableAt() } return c.nodeManager.Create(key, value, expiresAt, refreshableAt, weight) } -func (c *cache[K, V]) nodeToEntry(n node.Node[K, V], offset int64) Entry[K, V] { +func (c *cache[K, V]) nodeToEntry(n node.Node[K, V], nanos int64) Entry[K, V] { nowNano := noTime if c.withTime { - nowNano = offset + nowNano = nanos } - expiresAt := int64(unreachableExpiresAfter) + expiresAt := unreachableExpiresAt if c.withExpiration { expiresAt = n.ExpiresAt() } - refreshableAt := int64(unreachableRefreshableAfter) + refreshableAt := unreachableRefreshableAt if c.withRefresh { refreshableAt = n.RefreshableAt() } @@ -233,8 +238,8 @@ func (c *cache[K, V]) has(key K) bool { // GetIfPresent returns the value associated with the key in this cache. func (c *cache[K, V]) GetIfPresent(key K) (V, bool) { - offset := c.clock.Offset() - n := c.getNode(key, offset) + nowNano := c.clock.NowNano() + n := c.getNode(key, nowNano) if n == nil { return zeroValue[V](), false } @@ -243,7 +248,7 @@ func (c *cache[K, V]) GetIfPresent(key K) (V, bool) { } // getNode returns the node associated with the key in this cache. -func (c *cache[K, V]) getNode(key K, offset int64) node.Node[K, V] { +func (c *cache[K, V]) getNode(key K, nowNano int64) node.Node[K, V] { n := c.hashmap.Get(key) if n == nil { c.stats.RecordMisses(1) @@ -252,13 +257,13 @@ func (c *cache[K, V]) getNode(key K, offset int64) node.Node[K, V] { } return nil } - if n.HasExpired(offset) { + if n.HasExpired(nowNano) { c.stats.RecordMisses(1) c.scheduleDrainBuffers() return nil } - c.afterRead(n, offset, true, true) + c.afterRead(n, nowNano, true, true) return n } @@ -267,22 +272,22 @@ func (c *cache[K, V]) getNode(key K, offset int64) node.Node[K, V] { // // Unlike getNode, this function does not produce any side effects // such as updating statistics or the eviction policy. -func (c *cache[K, V]) getNodeQuietly(key K, offset int64) node.Node[K, V] { +func (c *cache[K, V]) getNodeQuietly(key K, nowNano int64) node.Node[K, V] { n := c.hashmap.Get(key) - if n == nil || !n.IsAlive() || n.HasExpired(offset) { + if n == nil || !n.IsAlive() || n.HasExpired(nowNano) { return nil } return n } -func (c *cache[K, V]) afterRead(got node.Node[K, V], offset int64, recordHit, calcExpiresAt bool) { +func (c *cache[K, V]) afterRead(got node.Node[K, V], nowNano int64, recordHit, calcExpiresAt bool) { if recordHit { c.stats.RecordHits(1) } if calcExpiresAt { - c.calcExpiresAtAfterRead(got, offset) + c.calcExpiresAtAfterRead(got, nowNano) } delayable := c.skipReadBuffer() || c.readBuffer.Add(got) != lossy.Full @@ -309,36 +314,36 @@ func (c *cache[K, V]) SetIfAbsent(key K, value V) (V, bool) { return c.set(key, value, true) } -func (c *cache[K, V]) calcExpiresAtAfterRead(n node.Node[K, V], offset int64) { +func (c *cache[K, V]) calcExpiresAtAfterRead(n node.Node[K, V], nowNano int64) { if !c.withExpiration { return } - expiresAfter := c.expiryCalculator.ExpireAfterRead(c.nodeToEntry(n, offset)) - c.setExpiresAfterRead(n, offset, expiresAfter) + expiresAfter := c.expiryCalculator.ExpireAfterRead(c.nodeToEntry(n, nowNano)) + c.setExpiresAfterRead(n, nowNano, expiresAfter) } -func (c *cache[K, V]) setExpiresAfterRead(n node.Node[K, V], offset int64, expiresAfter time.Duration) { +func (c *cache[K, V]) setExpiresAfterRead(n node.Node[K, V], nowNano int64, expiresAfter time.Duration) { if expiresAfter <= 0 { return } expiresAt := n.ExpiresAt() - currentDuration := time.Duration(expiresAt - offset) + currentDuration := time.Duration(expiresAt - nowNano) diff := xmath.Abs(int64(expiresAfter - currentDuration)) if diff > 0 { - n.CASExpiresAt(expiresAt, offset+int64(expiresAfter)) + n.CASExpiresAt(expiresAt, nowNano+int64(expiresAfter)) } } // GetEntry returns the cache entry associated with the key in this cache. func (c *cache[K, V]) GetEntry(key K) (Entry[K, V], bool) { - offset := c.clock.Offset() - n := c.getNode(key, offset) + nowNano := c.clock.NowNano() + n := c.getNode(key, nowNano) if n == nil { return Entry[K, V]{}, false } - return c.nodeToEntry(n, offset), true + return c.nodeToEntry(n, nowNano), true } // GetEntryQuietly returns the cache entry associated with the key in this cache. @@ -346,12 +351,12 @@ func (c *cache[K, V]) GetEntry(key K) (Entry[K, V], bool) { // Unlike GetEntry, this function does not produce any side effects // such as updating statistics or the eviction policy. func (c *cache[K, V]) GetEntryQuietly(key K) (Entry[K, V], bool) { - offset := c.clock.Offset() - n := c.getNodeQuietly(key, offset) + nowNano := c.clock.NowNano() + n := c.getNodeQuietly(key, nowNano) if n == nil { return Entry[K, V]{}, false } - return c.nodeToEntry(n, offset), true + return c.nodeToEntry(n, nowNano), true } // SetExpiresAfter specifies that the entry should be automatically removed from the cache once the duration has @@ -361,14 +366,14 @@ func (c *cache[K, V]) SetExpiresAfter(key K, expiresAfter time.Duration) { return } - offset := c.clock.Offset() + nowNano := c.clock.NowNano() n := c.hashmap.Get(key) if n == nil { return } - c.setExpiresAfterRead(n, offset, expiresAfter) - c.afterRead(n, offset, false, false) + c.setExpiresAfterRead(n, nowNano, expiresAfter) + c.afterRead(n, nowNano, false, false) } // SetRefreshableAfter specifies that each entry should be eligible for reloading once a fixed duration has elapsed. @@ -378,25 +383,25 @@ func (c *cache[K, V]) SetRefreshableAfter(key K, refreshableAfter time.Duration) return } - offset := c.clock.Offset() + nowNano := c.clock.NowNano() n := c.hashmap.Get(key) if n == nil { return } - entry := c.nodeToEntry(n, offset) + entry := c.nodeToEntry(n, nowNano) currentDuration := entry.RefreshableAfter() if refreshableAfter > 0 && currentDuration != refreshableAfter { - n.SetRefreshableAt(offset + int64(refreshableAfter)) + n.SetRefreshableAt(nowNano + int64(refreshableAfter)) } } -func (c *cache[K, V]) calcExpiresAtAfterWrite(n, old node.Node[K, V], offset int64) { +func (c *cache[K, V]) calcExpiresAtAfterWrite(n, old node.Node[K, V], nowNano int64) { if !c.withExpiration { return } - entry := c.nodeToEntry(n, offset) + entry := c.nodeToEntry(n, nowNano) currentDuration := entry.ExpiresAfter() var expiresAfter time.Duration if old == nil { @@ -406,55 +411,254 @@ func (c *cache[K, V]) calcExpiresAtAfterWrite(n, old node.Node[K, V], offset int } if expiresAfter > 0 && currentDuration != expiresAfter { - n.SetExpiresAt(offset + int64(expiresAfter)) + n.SetExpiresAt(nowNano + int64(expiresAfter)) } } func (c *cache[K, V]) set(key K, value V, onlyIfAbsent bool) (V, bool) { - var ( - old node.Node[K, V] - n node.Node[K, V] - ) - offset := c.clock.Offset() - c.hashmap.Compute(key, func(current node.Node[K, V]) node.Node[K, V] { + var old node.Node[K, V] + nowNano := c.clock.NowNano() + n := c.hashmap.Compute(key, func(current node.Node[K, V]) node.Node[K, V] { old = current - if onlyIfAbsent && current != nil { + if onlyIfAbsent && current != nil && !current.HasExpired(nowNano) { // no op - c.calcExpiresAtAfterRead(old, offset) + c.calcExpiresAtAfterRead(old, nowNano) return current } // set - c.singleflight.delete(key) - n = c.newNode(key, value, old) - c.calcExpiresAtAfterWrite(n, old, offset) - c.calcRefreshableAt(n, old, nil, offset) - c.makeRetired(old) - if old != nil { - cause := CauseReplacement - if old.HasExpired(offset) { - cause = CauseExpiration - } - c.notifyAtomicDeletion(old.Key(), old.Value(), cause) - } - return n + return c.atomicSet(key, value, old, nil, nowNano) }) if onlyIfAbsent { - if old == nil { - c.afterWrite(n, nil, offset) + if old == nil || old.HasExpired(nowNano) { + c.afterWrite(n, old, nowNano) return value, true } - c.afterRead(old, offset, false, false) + c.afterRead(old, nowNano, false, false) return old.Value(), false } - c.afterWrite(n, old, offset) + c.afterWrite(n, old, nowNano) if old != nil { return old.Value(), false } return value, true } -func (c *cache[K, V]) afterWrite(n, old node.Node[K, V], offset int64) { +func (c *cache[K, V]) atomicSet(key K, value V, old node.Node[K, V], cl *call[K, V], nowNano int64) node.Node[K, V] { + if cl == nil { + c.singleflight.delete(key) + } + n := c.newNode(key, value, old) + c.calcExpiresAtAfterWrite(n, old, nowNano) + c.calcRefreshableAt(n, old, cl, nowNano) + c.makeRetired(old) + if old != nil { + cause := getCause(old, nowNano, CauseReplacement) + c.notifyAtomicDeletion(old.Key(), old.Value(), cause) + } + return n +} + +//nolint:unparam // it's ok +func (c *cache[K, V]) atomicDelete(key K, old node.Node[K, V], cl *call[K, V], nowNano int64) node.Node[K, V] { + if cl == nil { + c.singleflight.delete(key) + } + if old != nil { + cause := getCause(old, nowNano, CauseInvalidation) + c.makeRetired(old) + c.notifyAtomicDeletion(old.Key(), old.Value(), cause) + } + return nil +} + +// Compute either sets the computed new value for the key, +// invalidates the value for the key, or does nothing, based on +// the returned [ComputeOp]. When the op returned by remappingFunc +// is [WriteOp], the value is updated to the new value. If +// it is [InvalidateOp], the entry is removed from the cache +// altogether. And finally, if the op is [CancelOp] then the +// entry is left as-is. In other words, if it did not already +// exist, it is not created, and if it did exist, it is not +// updated. This is useful to synchronously execute some +// operation on the value without incurring the cost of +// updating the cache every time. +// +// The ok result indicates whether the entry is present in the cache after the compute operation. +// The actualValue result contains the value of the cache +// if a corresponding entry is present, or the zero value otherwise. +// You can think of these results as equivalent to regular key-value lookups in a map. +// +// This call locks a hash table bucket while the compute function +// is executed. It means that modifications on other entries in +// the bucket will be blocked until the remappingFunc executes. Consider +// this when the function includes long-running operations. +func (c *cache[K, V]) Compute( + key K, + remappingFunc func(oldValue V, found bool) (newValue V, op ComputeOp), +) (V, bool) { + return c.doCompute(key, remappingFunc, c.clock.NowNano(), true) +} + +// ComputeIfAbsent returns the existing value for the key if +// present. Otherwise, it tries to compute the value using the +// provided function. If mappingFunc returns true as the cancel value, the computation is cancelled and the zero value +// for type V is returned. +// +// The ok result indicates whether the entry is present in the cache after the compute operation. +// The actualValue result contains the value of the cache +// if a corresponding entry is present, or the zero value +// otherwise. You can think of these results as equivalent to regular key-value lookups in a map. +// +// This call locks a hash table bucket while the compute function +// is executed. It means that modifications on other entries in +// the bucket will be blocked until the valueFn executes. Consider +// this when the function includes long-running operations. +func (c *cache[K, V]) ComputeIfAbsent( + key K, + mappingFunc func() (newValue V, cancel bool), +) (V, bool) { + nowNano := c.clock.NowNano() + if n := c.getNode(key, nowNano); n != nil { + return n.Value(), true + } + + return c.doCompute(key, func(oldValue V, found bool) (newValue V, op ComputeOp) { + if found { + return oldValue, CancelOp + } + newValue, cancel := mappingFunc() + if cancel { + return zeroValue[V](), CancelOp + } + return newValue, WriteOp + }, nowNano, false) +} + +// ComputeIfPresent returns the zero value for type V if the key is not found. +// Otherwise, it tries to compute the value using the provided function. +// +// ComputeIfPresent either sets the computed new value for the key, +// invalidates the value for the key, or does nothing, based on +// the returned [ComputeOp]. When the op returned by remappingFunc +// is [WriteOp], the value is updated to the new value. If +// it is [InvalidateOp], the entry is removed from the cache +// altogether. And finally, if the op is [CancelOp] then the +// entry is left as-is. In other words, if it did not already +// exist, it is not created, and if it did exist, it is not +// updated. This is useful to synchronously execute some +// operation on the value without incurring the cost of +// updating the cache every time. +// +// The ok result indicates whether the entry is present in the cache after the compute operation. +// The actualValue result contains the value of the cache +// if a corresponding entry is present, or the zero value +// otherwise. You can think of these results as equivalent to regular key-value lookups in a map. +// +// This call locks a hash table bucket while the compute function +// is executed. It means that modifications on other entries in +// the bucket will be blocked until the valueFn executes. Consider +// this when the function includes long-running operations. +func (c *cache[K, V]) ComputeIfPresent( + key K, + remappingFunc func(oldValue V) (newValue V, op ComputeOp), +) (V, bool) { + nowNano := c.clock.NowNano() + if n := c.getNode(key, nowNano); n == nil { + return zeroValue[V](), false + } + + return c.doCompute(key, func(oldValue V, found bool) (newValue V, op ComputeOp) { + if found { + return remappingFunc(oldValue) + } + return zeroValue[V](), CancelOp + }, nowNano, false) +} + +func (c *cache[K, V]) doCompute( + key K, + remappingFunc func(oldValue V, found bool) (newValue V, op ComputeOp), + nowNano int64, + recordStats bool, +) (V, bool) { + var ( + old node.Node[K, V] + op ComputeOp + notValidOp bool + panicErr error + ) + computedNode := c.hashmap.Compute(key, func(oldNode node.Node[K, V]) node.Node[K, V] { + var ( + oldValue V + actualValue V + found bool + ) + if oldNode != nil && !oldNode.HasExpired(nowNano) { + oldValue = oldNode.Value() + found = true + } + old = oldNode + + func() { + defer func() { + if r := recover(); r != nil { + panicErr = newPanicError(r) + } + }() + + actualValue, op = remappingFunc(oldValue, found) + }() + if panicErr != nil { + return oldNode + } + if op == CancelOp { + if oldNode != nil && oldNode.HasExpired(nowNano) { + return c.atomicDelete(key, oldNode, nil, nowNano) + } + return oldNode + } + if op == WriteOp { + return c.atomicSet(key, actualValue, old, nil, nowNano) + } + if op == InvalidateOp { + return c.atomicDelete(key, old, nil, nowNano) + } + notValidOp = true + return oldNode + }) + if panicErr != nil { + panic(panicErr) + } + if notValidOp { + panic(fmt.Sprintf("otter: invalid ComputeOp: %d", op)) + } + if recordStats { + if old != nil && !old.HasExpired(nowNano) { + c.stats.RecordHits(1) + } else { + c.stats.RecordMisses(1) + } + } + switch op { + case CancelOp: + if computedNode == nil { + c.afterDelete(old, nowNano, false) + return zeroValue[V](), false + } + return computedNode.Value(), true + case WriteOp: + c.afterWrite(computedNode, old, nowNano) + case InvalidateOp: + c.afterDelete(old, nowNano, false) + } + if computedNode == nil { + return zeroValue[V](), false + } + return computedNode.Value(), true +} + +func (c *cache[K, V]) afterWrite(n, old node.Node[K, V], nowNano int64) { if !c.withMaintenance { if old != nil { c.notifyDeletion(old.Key(), old.Value(), CauseReplacement) @@ -469,11 +673,7 @@ func (c *cache[K, V]) afterWrite(n, old node.Node[K, V], offset int64) { } // update - cause := CauseReplacement - if old.HasExpired(offset) { - cause = CauseExpiration - } - + cause := getCause(old, nowNano, CauseReplacement) c.afterWriteTask(c.getTask(n, old, updateReason, cause)) } @@ -482,12 +682,20 @@ type refreshableKey[K comparable, V any] struct { old node.Node[K, V] } -func (c *cache[K, V]) refreshKey(ctx context.Context, rk refreshableKey[K, V], loader Loader[K, V]) <-chan RefreshResult[K, V] { +func (c *cache[K, V]) refreshKey( + ctx context.Context, + rk refreshableKey[K, V], + loader Loader[K, V], + isManual bool, +) <-chan RefreshResult[K, V] { if !c.withRefresh { return nil } - ch := make(chan RefreshResult[K, V], 1) + var ch chan RefreshResult[K, V] + if isManual { + ch = make(chan RefreshResult[K, V], 1) + } c.executor(func() { var refresher func(ctx context.Context, key K) (V, error) @@ -499,26 +707,26 @@ func (c *cache[K, V]) refreshKey(ctx context.Context, rk refreshableKey[K, V], l refresher = loader.Load } - loadCtx, cancel := context.WithCancel(ctx) - defer cancel() - cl, shouldLoad := c.singleflight.startCall(rk.key, true) if shouldLoad { //nolint:errcheck // there is no need to check error _ = c.wrapLoad(func() error { + loadCtx := context.WithoutCancel(ctx) return c.singleflight.doCall(loadCtx, cl, refresher, c.afterDeleteCall) }) } cl.wait() if cl.err != nil && !cl.isNotFound { - c.logger.Error(loadCtx, "Returned an error during the refreshing", cl.err) + c.logger.Error(ctx, "Returned an error during the refreshing", cl.err) } - ch <- RefreshResult[K, V]{ - Key: cl.key, - Value: cl.value, - Err: cl.err, + if isManual { + ch <- RefreshResult[K, V]{ + Key: cl.key, + Value: cl.value, + Err: cl.err, + } } }) @@ -547,31 +755,23 @@ func (c *cache[K, V]) refreshKey(ctx context.Context, rk refreshableKey[K, V], l func (c *cache[K, V]) Get(ctx context.Context, key K, loader Loader[K, V]) (V, error) { c.singleflight.init() - offset := c.clock.Offset() - n := c.getNode(key, offset) + nowNano := c.clock.NowNano() + n := c.getNode(key, nowNano) if n != nil { - if !n.IsFresh(offset) { + if !n.IsFresh(nowNano) { c.refreshKey(ctx, refreshableKey[K, V]{ key: n.Key(), old: n, - }, loader) + }, loader, false) } return n.Value(), nil } - if c.withStats { - c.clock.Init() - } - - // node.Node compute? - loadCtx, cancel := context.WithCancel(ctx) - defer cancel() - cl, shouldLoad := c.singleflight.startCall(key, false) if shouldLoad { //nolint:errcheck // there is no need to check error _ = c.wrapLoad(func() error { - return c.singleflight.doCall(loadCtx, cl, loader.Load, c.afterDeleteCall) + return c.singleflight.doCall(ctx, cl, loader.Load, c.afterDeleteCall) }) } cl.wait() @@ -583,13 +783,13 @@ func (c *cache[K, V]) Get(ctx context.Context, key K, loader Loader[K, V]) (V, e return cl.value, nil } -func (c *cache[K, V]) calcRefreshableAt(n, old node.Node[K, V], cl *call[K, V], offset int64) { +func (c *cache[K, V]) calcRefreshableAt(n, old node.Node[K, V], cl *call[K, V], nowNano int64) { if !c.withRefresh { return } var refreshableAfter time.Duration - entry := c.nodeToEntry(n, offset) + entry := c.nodeToEntry(n, nowNano) currentDuration := entry.RefreshableAfter() //nolint:gocritic // it's ok if cl != nil && cl.isRefresh && old != nil { @@ -608,7 +808,7 @@ func (c *cache[K, V]) calcRefreshableAt(n, old node.Node[K, V], cl *call[K, V], } if refreshableAfter > 0 && currentDuration != refreshableAfter { - n.SetRefreshableAt(offset + int64(refreshableAfter)) + n.SetRefreshableAt(nowNano + int64(refreshableAfter)) } } @@ -618,23 +818,17 @@ func (c *cache[K, V]) afterDeleteCall(cl *call[K, V]) { deleted bool old node.Node[K, V] ) - offset := c.clock.Offset() + nowNano := c.clock.NowNano() newNode := c.hashmap.Compute(cl.key, func(oldNode node.Node[K, V]) node.Node[K, V] { - defer cl.cancel() - isCorrectCall := cl.isFake || c.singleflight.deleteCall(cl) old = oldNode if isCorrectCall && cl.isNotFound { - if oldNode != nil { - deleted = true - c.makeRetired(oldNode) - c.notifyAtomicDeletion(oldNode.Key(), oldNode.Value(), CauseInvalidation) - } - return nil + deleted = oldNode != nil + return c.atomicDelete(cl.key, oldNode, cl, nowNano) } if cl.err != nil { if cl.isRefresh && oldNode != nil { - c.calcRefreshableAt(oldNode, oldNode, cl, offset) + c.calcRefreshableAt(oldNode, oldNode, cl, nowNano) } return oldNode } @@ -642,34 +836,34 @@ func (c *cache[K, V]) afterDeleteCall(cl *call[K, V]) { return oldNode } inserted = true - n := c.newNode(cl.key, cl.value, old) - c.calcExpiresAtAfterWrite(n, old, offset) - c.calcRefreshableAt(n, old, cl, offset) - c.makeRetired(old) - if old != nil { - cause := CauseReplacement - if old.HasExpired(offset) { - cause = CauseExpiration - } - c.notifyAtomicDeletion(old.Key(), old.Value(), cause) - } - return n + return c.atomicSet(cl.key, cl.value, old, cl, nowNano) }) + cl.cancel() if deleted { - c.afterDelete(old, offset, false) + c.afterDelete(old, nowNano, false) } if inserted { - c.afterWrite(newNode, old, offset) + c.afterWrite(newNode, old, nowNano) } } -func (c *cache[K, V]) bulkRefreshKeys(ctx context.Context, rks []refreshableKey[K, V], bulkLoader BulkLoader[K, V]) <-chan []RefreshResult[K, V] { +func (c *cache[K, V]) bulkRefreshKeys( + ctx context.Context, + rks []refreshableKey[K, V], + bulkLoader BulkLoader[K, V], + isManual bool, +) <-chan []RefreshResult[K, V] { if !c.withRefresh { return nil } - ch := make(chan []RefreshResult[K, V], 1) + var ch chan []RefreshResult[K, V] + if isManual { + ch = make(chan []RefreshResult[K, V], 1) + } if len(rks) == 0 { - ch <- []RefreshResult[K, V]{} + if isManual { + ch <- []RefreshResult[K, V]{} + } return ch } @@ -678,8 +872,11 @@ func (c *cache[K, V]) bulkRefreshKeys(ctx context.Context, rks []refreshableKey[ toLoadCalls map[K]*call[K, V] toReloadCalls map[K]*call[K, V] foundCalls []*call[K, V] + results []RefreshResult[K, V] ) - results := make([]RefreshResult[K, V], 0, len(rks)) + if isManual { + results = make([]RefreshResult[K, V], 0, len(rks)) + } i := 0 for _, rk := range rks { cl, shouldLoad := c.singleflight.startCall(rk.key, true) @@ -689,13 +886,11 @@ func (c *cache[K, V]) bulkRefreshKeys(ctx context.Context, rks []refreshableKey[ toReloadCalls = make(map[K]*call[K, V], len(rks)-i) } cl.value = rk.old.Value() - toReloadCalls[rk.key] = cl } else { if toLoadCalls == nil { toLoadCalls = make(map[K]*call[K, V], len(rks)-i) } - toLoadCalls[rk.key] = cl } } else { @@ -707,20 +902,23 @@ func (c *cache[K, V]) bulkRefreshKeys(ctx context.Context, rks []refreshableKey[ i++ } + loadCtx := context.WithoutCancel(ctx) if len(toLoadCalls) > 0 { loadErr := c.wrapLoad(func() error { - return c.singleflight.doBulkCall(ctx, toLoadCalls, bulkLoader.BulkLoad, c.afterDeleteCall) + return c.singleflight.doBulkCall(loadCtx, toLoadCalls, bulkLoader.BulkLoad, c.afterDeleteCall) }) if loadErr != nil { c.logger.Error(ctx, "BulkLoad returned an error", loadErr) } - for _, cl := range toLoadCalls { - results = append(results, RefreshResult[K, V]{ - Key: cl.key, - Value: cl.value, - Err: cl.err, - }) + if isManual { + for _, cl := range toLoadCalls { + results = append(results, RefreshResult[K, V]{ + Key: cl.key, + Value: cl.value, + Err: cl.err, + }) + } } } if len(toReloadCalls) > 0 { @@ -735,13 +933,25 @@ func (c *cache[K, V]) bulkRefreshKeys(ctx context.Context, rks []refreshableKey[ } reloadErr := c.wrapLoad(func() error { - return c.singleflight.doBulkCall(ctx, toReloadCalls, reload, c.afterDeleteCall) + return c.singleflight.doBulkCall(loadCtx, toReloadCalls, reload, c.afterDeleteCall) }) if reloadErr != nil { c.logger.Error(ctx, "BulkReload returned an error", reloadErr) } - for _, cl := range toReloadCalls { + if isManual { + for _, cl := range toReloadCalls { + results = append(results, RefreshResult[K, V]{ + Key: cl.key, + Value: cl.value, + Err: cl.err, + }) + } + } + } + for _, cl := range foundCalls { + cl.wait() + if isManual { results = append(results, RefreshResult[K, V]{ Key: cl.key, Value: cl.value, @@ -749,15 +959,9 @@ func (c *cache[K, V]) bulkRefreshKeys(ctx context.Context, rks []refreshableKey[ }) } } - for _, cl := range foundCalls { - cl.wait() - results = append(results, RefreshResult[K, V]{ - Key: cl.key, - Value: cl.value, - Err: cl.err, - }) + if isManual { + ch <- results } - ch <- results }) return ch @@ -782,7 +986,7 @@ func (c *cache[K, V]) bulkRefreshKeys(ctx context.Context, rks []refreshableKey[ func (c *cache[K, V]) BulkGet(ctx context.Context, keys []K, bulkLoader BulkLoader[K, V]) (map[K]V, error) { c.singleflight.init() - offset := c.clock.Offset() + nowNano := c.clock.NowNano() result := make(map[K]V, len(keys)) var ( misses map[K]*call[K, V] @@ -796,9 +1000,9 @@ func (c *cache[K, V]) BulkGet(ctx context.Context, keys []K, bulkLoader BulkLoad continue } - n := c.getNode(key, offset) + n := c.getNode(key, nowNano) if n != nil { - if c.withRefresh && !n.IsFresh(offset) { + if !n.IsFresh(nowNano) { if toRefresh == nil { toRefresh = make([]refreshableKey[K, V], 0, len(keys)-len(result)) } @@ -819,28 +1023,19 @@ func (c *cache[K, V]) BulkGet(ctx context.Context, keys []K, bulkLoader BulkLoad misses[key] = nil } - c.bulkRefreshKeys(ctx, toRefresh, bulkLoader) + c.bulkRefreshKeys(ctx, toRefresh, bulkLoader, false) if len(misses) == 0 { return result, nil } - loadCtx, cancel := context.WithCancel(ctx) - defer cancel() - - if c.withStats { - c.clock.Init() - } - var toLoadCalls map[K]*call[K, V] i := 0 for key := range misses { - // node.Node compute? cl, shouldLoad := c.singleflight.startCall(key, false) if shouldLoad { if toLoadCalls == nil { toLoadCalls = make(map[K]*call[K, V], len(misses)-i) } - toLoadCalls[key] = cl } misses[key] = cl @@ -850,7 +1045,7 @@ func (c *cache[K, V]) BulkGet(ctx context.Context, keys []K, bulkLoader BulkLoad var loadErr error if len(toLoadCalls) > 0 { loadErr = c.wrapLoad(func() error { - return c.singleflight.doBulkCall(loadCtx, toLoadCalls, bulkLoader.BulkLoad, c.afterDeleteCall) + return c.singleflight.doBulkCall(ctx, toLoadCalls, bulkLoader.BulkLoad, c.afterDeleteCall) }) } if loadErr != nil { @@ -886,17 +1081,15 @@ func (c *cache[K, V]) BulkGet(ctx context.Context, keys []K, bulkLoader BulkLoad } func (c *cache[K, V]) wrapLoad(fn func() error) error { - startTime := c.clock.Offset() + startTime := c.statsClock.NowNano() err := fn() - if c.withStats { - loadTime := time.Duration(c.clock.Offset() - startTime) - if err == nil || errors.Is(err, ErrNotFound) { - c.stats.RecordLoadSuccess(loadTime) - } else { - c.stats.RecordLoadFailure(loadTime) - } + loadTime := time.Duration(c.statsClock.NowNano() - startTime) + if err == nil || errors.Is(err, ErrNotFound) { + c.stats.RecordLoadSuccess(loadTime) + } else { + c.stats.RecordLoadFailure(loadTime) } var pe *panicError @@ -931,13 +1124,13 @@ func (c *cache[K, V]) Refresh(ctx context.Context, key K, loader Loader[K, V]) < c.singleflight.init() - offset := c.clock.Offset() - n := c.getNode(key, offset) + nowNano := c.clock.NowNano() + n := c.getNodeQuietly(key, nowNano) return c.refreshKey(ctx, refreshableKey[K, V]{ key: key, old: n, - }, loader) + }, loader, true) } // BulkRefresh loads a new value for each key, asynchronously. While the new value is loading the @@ -968,17 +1161,17 @@ func (c *cache[K, V]) BulkRefresh(ctx context.Context, keys []K, bulkLoader Bulk uniq[k] = struct{}{} } - offset := c.clock.Offset() + nowNano := c.clock.NowNano() toRefresh := make([]refreshableKey[K, V], 0, len(uniq)) for key := range uniq { - n := c.getNode(key, offset) + n := c.getNodeQuietly(key, nowNano) toRefresh = append(toRefresh, refreshableKey[K, V]{ key: key, old: n, }) } - return c.bulkRefreshKeys(ctx, toRefresh, bulkLoader) + return c.bulkRefreshKeys(ctx, toRefresh, bulkLoader, true) } // Invalidate discards any cached value for the key. @@ -987,24 +1180,19 @@ func (c *cache[K, V]) BulkRefresh(ctx context.Context, keys []K, bulkLoader Bulk // present. func (c *cache[K, V]) Invalidate(key K) (value V, invalidated bool) { var d node.Node[K, V] - offset := c.clock.Offset() + nowNano := c.clock.NowNano() c.hashmap.Compute(key, func(n node.Node[K, V]) node.Node[K, V] { - c.singleflight.delete(key) - if n != nil { - d = n - c.makeRetired(d) - c.notifyAtomicDeletion(d.Key(), d.Value(), CauseInvalidation) - } - return nil + d = n + return c.atomicDelete(key, d, nil, nowNano) }) - c.afterDelete(d, offset, false) + c.afterDelete(d, nowNano, false) if d != nil { return d.Value(), true } return zeroValue[V](), false } -func (c *cache[K, V]) deleteNodeFromMap(n node.Node[K, V], cause DeletionCause) node.Node[K, V] { +func (c *cache[K, V]) deleteNodeFromMap(n node.Node[K, V], nowNano int64, cause DeletionCause) node.Node[K, V] { var deleted node.Node[K, V] c.hashmap.Compute(n.Key(), func(current node.Node[K, V]) node.Node[K, V] { c.singleflight.delete(n.Key()) @@ -1013,6 +1201,7 @@ func (c *cache[K, V]) deleteNodeFromMap(n node.Node[K, V], cause DeletionCause) } if n.AsPointer() == current.AsPointer() { deleted = current + cause := getCause(deleted, nowNano, cause) c.makeRetired(deleted) c.notifyAtomicDeletion(deleted.Key(), deleted.Value(), cause) return nil @@ -1022,11 +1211,11 @@ func (c *cache[K, V]) deleteNodeFromMap(n node.Node[K, V], cause DeletionCause) return deleted } -func (c *cache[K, V]) deleteNode(n node.Node[K, V], offset int64) { - c.afterDelete(c.deleteNodeFromMap(n, CauseInvalidation), offset, true) +func (c *cache[K, V]) deleteNode(n node.Node[K, V], nowNano int64) { + c.afterDelete(c.deleteNodeFromMap(n, nowNano, CauseInvalidation), nowNano, true) } -func (c *cache[K, V]) afterDelete(deleted node.Node[K, V], offset int64, withLock bool) { +func (c *cache[K, V]) afterDelete(deleted node.Node[K, V], nowNano int64, alreadyLocked bool) { if deleted == nil { return } @@ -1037,16 +1226,12 @@ func (c *cache[K, V]) afterDelete(deleted node.Node[K, V], offset int64, withLoc } // delete - cause := CauseInvalidation - if deleted.HasExpired(offset) { - cause = CauseExpiration - } - + cause := getCause(deleted, nowNano, CauseInvalidation) t := c.getTask(deleted, nil, deleteReason, cause) - if withLock { + if alreadyLocked { c.runTask(t) } else { - c.afterWriteTask(c.getTask(deleted, nil, deleteReason, cause)) + c.afterWriteTask(t) } } @@ -1077,14 +1262,14 @@ func (c *cache[K, V]) notifyAtomicDeletion(key K, value V, cause DeletionCause) } func (c *cache[K, V]) periodicCleanUp() { - ticker := time.NewTicker(time.Second) - defer ticker.Stop() + tick := c.clock.Tick(time.Second) for { select { case <-c.doneClose: return - case <-ticker.C: + case <-tick: c.CleanUp() + c.clock.ProcessTick() } } } @@ -1095,10 +1280,10 @@ func (c *cache[K, V]) evictNode(n node.Node[K, V], nowNanos int64) { cause = CauseExpiration } - deleted := c.deleteNodeFromMap(n, cause) != nil + deleted := c.deleteNodeFromMap(n, nowNanos, cause) != nil if c.withEviction { - c.evictionPolicy.Delete(n) + c.evictionPolicy.delete(n) } if c.withExpiration { c.expirationPolicy.Delete(n) @@ -1112,22 +1297,74 @@ func (c *cache[K, V]) evictNode(n node.Node[K, V], nowNanos int64) { } } +func (c *cache[K, V]) nodes() iter.Seq[node.Node[K, V]] { + return func(yield func(node.Node[K, V]) bool) { + c.hashmap.Range(func(n node.Node[K, V]) bool { + nowNano := c.clock.NowNano() + if !n.IsAlive() || n.HasExpired(nowNano) { + c.scheduleDrainBuffers() + return true + } + + return yield(n) + }) + } +} + +func (c *cache[K, V]) entries() iter.Seq[Entry[K, V]] { + return func(yield func(Entry[K, V]) bool) { + for n := range c.nodes() { + if !yield(c.nodeToEntry(n, c.clock.NowNano())) { + return + } + } + } +} + // All returns an iterator over all entries in the cache. // // Iterator is at least weakly consistent: he is safe for concurrent use, // but if the cache is modified (including by eviction) after the iterator is // created, it is undefined which of the changes (if any) will be reflected in that iterator. func (c *cache[K, V]) All() iter.Seq2[K, V] { - offset := c.clock.Offset() return func(yield func(K, V) bool) { - c.hashmap.Range(func(n node.Node[K, V]) bool { - if !n.IsAlive() || n.HasExpired(offset) { - c.scheduleDrainBuffers() - return true + for n := range c.nodes() { + if !yield(n.Key(), n.Value()) { + return } + } + } +} - return yield(n.Key(), n.Value()) - }) +// Keys returns an iterator over all keys in the cache. +// The iteration order is not specified and is not guaranteed to be the same from one call to the next. +// +// Iterator is at least weakly consistent: he is safe for concurrent use, +// but if the cache is modified (including by eviction) after the iterator is +// created, it is undefined which of the changes (if any) will be reflected in that iterator. +func (c *cache[K, V]) Keys() iter.Seq[K] { + return func(yield func(K) bool) { + for n := range c.nodes() { + if !yield(n.Key()) { + return + } + } + } +} + +// Values returns an iterator over all values in the cache. +// The iteration order is not specified and is not guaranteed to be the same from one call to the next. +// +// Iterator is at least weakly consistent: he is safe for concurrent use, +// but if the cache is modified (including by eviction) after the iterator is +// created, it is undefined which of the changes (if any) will be reflected in that iterator. +func (c *cache[K, V]) Values() iter.Seq[V] { + return func(yield func(V) bool) { + for n := range c.nodes() { + if !yield(n.Value()) { + return + } + } } } @@ -1136,10 +1373,8 @@ func (c *cache[K, V]) All() iter.Seq2[K, V] { func (c *cache[K, V]) InvalidateAll() { c.evictionMutex.Lock() - if !c.skipReadBuffer() { - c.readBuffer.DrainTo(func(n node.Node[K, V]) {}) - } if c.withMaintenance { + c.readBuffer.DrainTo(func(n node.Node[K, V]) {}) for { t := c.writeBuffer.TryPop() if t == nil { @@ -1155,11 +1390,11 @@ func (c *cache[K, V]) InvalidateAll() { nodes = append(nodes, n) return true }) - offset := c.clock.Offset() + nowNano := c.clock.NowNano() for len(nodes) > 0 && c.writeBuffer.Size() < threshold { n := nodes[len(nodes)-1] nodes = nodes[:len(nodes)-1] - c.deleteNode(n, offset) + c.deleteNode(n, nowNano) } c.evictionMutex.Unlock() @@ -1190,7 +1425,8 @@ func (c *cache[K, V]) shouldDrainBuffers(delayable bool) bool { } func (c *cache[K, V]) skipReadBuffer() bool { - return !c.withEviction + return !c.withMaintenance || // without read buffer + (!c.withExpiration && c.withEviction && c.evictionPolicy.sketch.isNotInitialized()) } func (c *cache[K, V]) afterWriteTask(t *task[K, V]) { @@ -1206,10 +1442,7 @@ func (c *cache[K, V]) afterWriteTask(t *task[K, V]) { // In scenarios where the writing goroutines cannot make progress then they attempt to provide // assistance by performing the eviction work directly. This can resolve cases where the // maintenance task is scheduled but not running. - c.evictionMutex.Lock() - c.maintenance(t) - c.evictionMutex.Unlock() - c.rescheduleCleanUpIfIncomplete() + c.performCleanUp(t) } func (c *cache[K, V]) scheduleAfterWrite() { @@ -1324,6 +1557,10 @@ func (c *cache[K, V]) drainReadBuffer() { } func (c *cache[K, V]) drainWriteBuffer() { + if !c.withMaintenance { + return + } + for i := uint32(0); i <= maxWriteBufferSize; i++ { t := c.writeBuffer.TryPop() if t == nil { @@ -1346,7 +1583,7 @@ func (c *cache[K, V]) runTask(t *task[K, V]) { c.expirationPolicy.Add(n) } if c.withEviction { - c.evictionPolicy.Add(n, c.evictNode) + c.evictionPolicy.add(n, c.evictNode) } case updateReason: old := t.oldNode() @@ -1357,7 +1594,7 @@ func (c *cache[K, V]) runTask(t *task[K, V]) { } } if c.withEviction { - c.evictionPolicy.Update(n, old, c.evictNode) + c.evictionPolicy.update(n, old, c.evictNode) } c.notifyDeletion(old.Key(), old.Value(), t.deletionCause) case deleteReason: @@ -1365,7 +1602,7 @@ func (c *cache[K, V]) runTask(t *task[K, V]) { c.expirationPolicy.Delete(n) } if c.withEviction { - c.evictionPolicy.Delete(n) + c.evictionPolicy.delete(n) } c.notifyDeletion(n.Key(), n.Value(), t.deletionCause) default: @@ -1377,7 +1614,7 @@ func (c *cache[K, V]) runTask(t *task[K, V]) { func (c *cache[K, V]) onAccess(n node.Node[K, V]) { if c.withEviction { - c.evictionPolicy.Access(n) + c.evictionPolicy.access(n) } if c.withExpiration && !node.Equals(n.NextExp(), nil) { c.expirationPolicy.Delete(n) @@ -1389,7 +1626,7 @@ func (c *cache[K, V]) onAccess(n node.Node[K, V]) { func (c *cache[K, V]) expireNodes() { if c.withExpiration { - c.expirationPolicy.DeleteExpired(c.clock.Offset(), c.evictNode) + c.expirationPolicy.DeleteExpired(c.clock.NowNano(), c.evictNode) } } @@ -1397,14 +1634,14 @@ func (c *cache[K, V]) evictNodes() { if !c.withEviction { return } - c.evictionPolicy.EvictNodes(c.evictNode) + c.evictionPolicy.evictNodes(c.evictNode) } func (c *cache[K, V]) climb() { if !c.withEviction { return } - c.evictionPolicy.Climb() + c.evictionPolicy.climb() } func (c *cache[K, V]) getTask(n, old node.Node[K, V], writeReason reason, cause DeletionCause) *task[K, V] { @@ -1442,20 +1679,24 @@ func (c *cache[K, V]) SetMaximum(maximum uint64) { return } c.evictionMutex.Lock() - c.evictionPolicy.SetMaximumSize(maximum) + c.evictionPolicy.setMaximumSize(maximum) c.maintenance(nil) c.evictionMutex.Unlock() c.rescheduleCleanUpIfIncomplete() } // GetMaximum returns the maximum total weighted or unweighted size of this cache, depending on how the -// cache was constructed. +// cache was constructed. If this cache does not use a (weighted) size bound, then the method will return math.MaxUint64. func (c *cache[K, V]) GetMaximum() uint64 { + if !c.withEviction { + return uint64(math.MaxUint64) + } + c.evictionMutex.Lock() if c.drainStatus.Load() == required { c.maintenance(nil) } - result := c.evictionPolicy.Maximum + result := c.evictionPolicy.maximum c.evictionMutex.Unlock() c.rescheduleCleanUpIfIncomplete() return result @@ -1489,12 +1730,91 @@ func (c *cache[K, V]) WeightedSize() uint64 { if c.drainStatus.Load() == required { c.maintenance(nil) } - result := c.evictionPolicy.WeightedSize + result := c.evictionPolicy.weightedSize c.evictionMutex.Unlock() c.rescheduleCleanUpIfIncomplete() return result } +// Hottest returns an iterator for ordered traversal of the cache entries. The order of +// iteration is from the entries most likely to be retained (hottest) to the entries least +// likely to be retained (coldest). This order is determined by the eviction policy's best guess +// at the start of the iteration. +// +// WARNING: Beware that this iteration is performed within the eviction policy's exclusive lock, so the +// iteration should be short and simple. While the iteration is in progress further eviction +// maintenance will be halted. +func (c *cache[K, V]) Hottest() iter.Seq[Entry[K, V]] { + return c.evictionOrder(true) +} + +// Coldest returns an iterator for ordered traversal of the cache entries. The order of +// iteration is from the entries least likely to be retained (coldest) to the entries most +// likely to be retained (hottest). This order is determined by the eviction policy's best guess +// at the start of the iteration. +// +// WARNING: Beware that this iteration is performed within the eviction policy's exclusive lock, so the +// iteration should be short and simple. While the iteration is in progress further eviction +// maintenance will be halted. +func (c *cache[K, V]) Coldest() iter.Seq[Entry[K, V]] { + return c.evictionOrder(false) +} + +func (c *cache[K, V]) evictionOrder(hottest bool) iter.Seq[Entry[K, V]] { + if !c.withEviction { + return c.entries() + } + + return func(yield func(Entry[K, V]) bool) { + comparator := func(a node.Node[K, V], b node.Node[K, V]) int { + return cmp.Compare( + c.evictionPolicy.sketch.frequency(a.Key()), + c.evictionPolicy.sketch.frequency(b.Key()), + ) + } + + var seq iter.Seq[node.Node[K, V]] + if hottest { + secondary := xiter.MergeFunc( + c.evictionPolicy.probation.Backward(), + c.evictionPolicy.window.Backward(), + comparator, + ) + seq = xiter.Concat( + c.evictionPolicy.protected.Backward(), + secondary, + ) + } else { + primary := xiter.MergeFunc( + c.evictionPolicy.window.All(), + c.evictionPolicy.probation.All(), + func(a node.Node[K, V], b node.Node[K, V]) int { + return -comparator(a, b) + }, + ) + + seq = xiter.Concat( + primary, + c.evictionPolicy.protected.All(), + ) + } + + c.evictionMutex.Lock() + defer c.evictionMutex.Unlock() + c.maintenance(nil) + + for n := range seq { + nowNano := c.clock.NowNano() + if !n.IsAlive() || n.HasExpired(nowNano) { + continue + } + if !yield(c.nodeToEntry(n, nowNano)) { + return + } + } + } +} + func (c *cache[K, V]) makeRetired(n node.Node[K, V]) { if n != nil && c.withMaintenance && n.IsAlive() { n.Retire() @@ -1507,8 +1827,15 @@ func (c *cache[K, V]) makeDead(n node.Node[K, V]) { } if c.withEviction { - c.evictionPolicy.MakeDead(n) + c.evictionPolicy.makeDead(n) } else if !n.IsDead() { n.Die() } } + +func getCause[K comparable, V any](n node.Node[K, V], nowNano int64, cause DeletionCause) DeletionCause { + if n.HasExpired(nowNano) { + return CauseExpiration + } + return cause +} diff --git a/vendor/github.com/maypok86/otter/v2/clock.go b/vendor/github.com/maypok86/otter/v2/clock.go new file mode 100644 index 00000000..1d7aec1d --- /dev/null +++ b/vendor/github.com/maypok86/otter/v2/clock.go @@ -0,0 +1,232 @@ +// Copyright (c) 2025 Alexey Mayshev and contributors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otter + +import ( + "sync" + "sync/atomic" + "time" + + "github.com/maypok86/otter/v2/internal/xmath" +) + +// Clock is a time source that +// - Returns a time value representing the number of nanoseconds elapsed since some +// fixed but arbitrary point in time +// - Returns a channel that delivers “ticks” of a clock at intervals. +type Clock interface { + // NowNano returns the number of nanoseconds elapsed since this clock's fixed point of reference. + // + // By default, time.Now().UnixNano() is used. + NowNano() int64 + // Tick returns a channel that delivers “ticks” of a clock at intervals. + // + // The cache uses this method only for proactive expiration and calls Tick(time.Second) in a separate goroutine. + // + // By default, [time.Tick] is used. + Tick(duration time.Duration) <-chan time.Time +} + +type timeSource interface { + Clock + Init() + Sleep(duration time.Duration) + ProcessTick() +} + +func newTimeSource(clock Clock) timeSource { + if clock == nil { + return &realSource{} + } + if r, ok := clock.(*realSource); ok { + return r + } + if f, ok := clock.(*fakeSource); ok { + return f + } + return newCustomSource(clock) +} + +type customSource struct { + clock Clock + isInitialized atomic.Bool +} + +func newCustomSource(clock Clock) *customSource { + return &customSource{ + clock: clock, + } +} + +func (cs *customSource) Init() { + if !cs.isInitialized.Load() { + cs.isInitialized.Store(true) + } +} + +func (cs *customSource) NowNano() int64 { + if !cs.isInitialized.Load() { + return 0 + } + return cs.clock.NowNano() +} + +func (cs *customSource) Tick(duration time.Duration) <-chan time.Time { + return cs.clock.Tick(duration) +} + +func (cs *customSource) Sleep(duration time.Duration) { + time.Sleep(duration) +} + +func (cs *customSource) ProcessTick() {} + +type realSource struct { + initMutex sync.Mutex + isInitialized atomic.Bool + start time.Time + startNanos atomic.Int64 +} + +func (c *realSource) Init() { + if !c.isInitialized.Load() { + c.initMutex.Lock() + if !c.isInitialized.Load() { + now := time.Now() + c.start = now + c.startNanos.Store(now.UnixNano()) + c.isInitialized.Store(true) + } + c.initMutex.Unlock() + } +} + +func (c *realSource) NowNano() int64 { + if !c.isInitialized.Load() { + return 0 + } + return xmath.SaturatedAdd(c.startNanos.Load(), time.Since(c.start).Nanoseconds()) +} + +func (c *realSource) Tick(duration time.Duration) <-chan time.Time { + return time.Tick(duration) +} + +func (c *realSource) Sleep(duration time.Duration) { + time.Sleep(duration) +} + +func (c *realSource) ProcessTick() {} + +type fakeSource struct { + mutex sync.Mutex + now time.Time + initOnce sync.Once + sleeps chan time.Duration + tickWg sync.WaitGroup + sleepWg sync.WaitGroup + firstSleep atomic.Bool + withTick atomic.Bool + ticker chan time.Time + enableTickOnce sync.Once + enableTick chan time.Duration +} + +func (f *fakeSource) Init() { + f.initOnce.Do(func() { + f.mutex.Lock() + now := time.Now() + f.now = now + f.sleeps = make(chan time.Duration) + f.firstSleep.Store(true) + f.enableTick = make(chan time.Duration) + f.ticker = make(chan time.Time, 1) + f.mutex.Unlock() + + go func() { + var ( + dur time.Duration + d time.Duration + ) + enabled := false + last := now + for { + select { + case d = <-f.enableTick: + enabled = true + for d <= dur { + if f.firstSleep.Load() { + f.tickWg.Add(1) + f.ticker <- last + f.tickWg.Wait() + f.firstSleep.Store(false) + } + last = last.Add(d) + f.tickWg.Add(1) + f.ticker <- last + dur -= d + } + case s := <-f.sleeps: + if enabled && f.firstSleep.Load() { + f.tickWg.Add(1) + f.ticker <- last + f.tickWg.Wait() + f.firstSleep.Store(false) + } + f.mutex.Lock() + f.now = f.now.Add(s) + f.mutex.Unlock() + dur += s + if enabled { + for d <= dur { + last = last.Add(d) + f.tickWg.Add(1) + f.ticker <- last + dur -= d + } + } + f.sleepWg.Done() + } + } + }() + }) +} + +func (f *fakeSource) NowNano() int64 { + return f.getNow().UnixNano() +} + +func (f *fakeSource) Tick(d time.Duration) <-chan time.Time { + f.enableTickOnce.Do(func() { + f.enableTick <- d + }) + return f.ticker +} + +func (f *fakeSource) Sleep(d time.Duration) { + f.sleepWg.Add(1) + f.sleeps <- d + f.sleepWg.Wait() +} + +func (f *fakeSource) getNow() time.Time { + f.mutex.Lock() + defer f.mutex.Unlock() + return f.now +} + +func (f *fakeSource) ProcessTick() { + f.tickWg.Done() +} diff --git a/vendor/github.com/maypok86/otter/v2/internal/clock/real.go b/vendor/github.com/maypok86/otter/v2/internal/clock/real.go deleted file mode 100644 index 3be390b7..00000000 --- a/vendor/github.com/maypok86/otter/v2/internal/clock/real.go +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright (c) 2024 Alexey Mayshev and contributors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package clock - -import ( - "math" - "sync" - "sync/atomic" - "time" -) - -type Real struct { - initMutex sync.Mutex - isInitialized atomic.Bool - start time.Time - startNanos atomic.Int64 -} - -func (c *Real) Init() { - if !c.isInitialized.Load() { - c.initMutex.Lock() - if !c.isInitialized.Load() { - now := time.Now() - c.start = now - c.startNanos.Store(now.UnixNano()) - c.isInitialized.Store(true) - } - c.initMutex.Unlock() - } -} - -func (c *Real) Offset() int64 { - if !c.isInitialized.Load() { - return 0 - } - return saturatedAdd(c.startNanos.Load(), time.Since(c.start).Nanoseconds()) -} - -func saturatedAdd(a, b int64) int64 { - s := a + b - if s < a || s < b { - return math.MaxInt64 - } - return s -} diff --git a/vendor/github.com/maypok86/otter/v2/internal/deque/linked.go b/vendor/github.com/maypok86/otter/v2/internal/deque/linked.go index 8172a090..6923df71 100644 --- a/vendor/github.com/maypok86/otter/v2/internal/deque/linked.go +++ b/vendor/github.com/maypok86/otter/v2/internal/deque/linked.go @@ -15,6 +15,8 @@ package deque import ( + "iter" + "github.com/maypok86/otter/v2/internal/generated/node" ) @@ -172,6 +174,30 @@ func (d *Linked[K, V]) Tail() node.Node[K, V] { return d.tail } +func (d *Linked[K, V]) All() iter.Seq[node.Node[K, V]] { + return func(yield func(node.Node[K, V]) bool) { + cursor := d.head + for !node.Equals(cursor, nil) { + if !yield(cursor) { + return + } + cursor = d.getNext(cursor) + } + } +} + +func (d *Linked[K, V]) Backward() iter.Seq[node.Node[K, V]] { + return func(yield func(node.Node[K, V]) bool) { + cursor := d.tail + for !node.Equals(cursor, nil) { + if !yield(cursor) { + return + } + cursor = d.getPrev(cursor) + } + } +} + func (d *Linked[K, V]) setPrev(to, n node.Node[K, V]) { if d.isExp { to.SetPrevExp(n) diff --git a/vendor/github.com/maypok86/otter/v2/internal/expiration/variable.go b/vendor/github.com/maypok86/otter/v2/internal/expiration/variable.go index 7142d180..ee9b61ac 100644 --- a/vendor/github.com/maypok86/otter/v2/internal/expiration/variable.go +++ b/vendor/github.com/maypok86/otter/v2/internal/expiration/variable.go @@ -54,7 +54,7 @@ func NewVariable[K comparable, V any](nodeManager *node.Manager[K, V]) *Variable for j := 0; j < len(wheel[i]); j++ { var k K var v V - fn := nodeManager.Create(k, v, math.MaxUint32, math.MaxUint32, 1) + fn := nodeManager.Create(k, v, math.MaxInt64, math.MaxInt64, 1) fn.SetPrevExp(fn) fn.SetNextExp(fn) wheel[i][j] = fn @@ -94,7 +94,6 @@ func (v *Variable[K, V]) Delete(n node.Node[K, V]) { } func (v *Variable[K, V]) DeleteExpired(nowNanos int64, expireNode func(n node.Node[K, V], nowNanos int64)) { - //nolint:gosec // there is no overflow currentTime := uint64(nowNanos) prevTime := v.time v.time = currentTime @@ -117,10 +116,7 @@ func (v *Variable[K, V]) deleteExpiredFromBucket( expireNode func(n node.Node[K, V], nowNanos int64), ) { mask := buckets[index] - 1 - steps := buckets[index] - if delta < steps { - steps = delta - } + steps := min(delta+1, buckets[index]) start := prevTicks & mask end := start + steps timerWheel := v.wheel[index] @@ -135,8 +131,7 @@ func (v *Variable[K, V]) deleteExpiredFromBucket( n.SetPrevExp(nil) n.SetNextExp(nil) - //nolint:gosec // there is no overflow - if uint64(n.ExpiresAt()) <= v.time { + if uint64(n.ExpiresAt()) < v.time { expireNode(n, int64(v.time)) } else { v.Add(n) @@ -147,25 +142,6 @@ func (v *Variable[K, V]) deleteExpiredFromBucket( } } -/* -func (v *Variable[K, V]) Clear() { - for i := 0; i < len(v.wheel); i++ { - for j := 0; j < len(v.wheel[i]); j++ { - root := v.wheel[i][j] - n := root.NextExp() - // NOTE(maypok86): Maybe we should use the same approach as in DeleteExpired? - - for !node.Equals(n, root) { - next := n.NextExp() - v.Delete(n) - - n = next - } - } - } -} -*/ - // link adds the entry at the tail of the bucket's list. func link[K comparable, V any](root, n node.Node[K, V]) { n.SetPrevExp(root.PrevExp()) diff --git a/vendor/github.com/maypok86/otter/v2/internal/generated/node/berw.go b/vendor/github.com/maypok86/otter/v2/internal/generated/node/berw.go index 6eef8c69..4b881c03 100644 --- a/vendor/github.com/maypok86/otter/v2/internal/generated/node/berw.go +++ b/vendor/github.com/maypok86/otter/v2/internal/generated/node/berw.go @@ -28,7 +28,6 @@ type BERW[K comparable, V any] struct { refreshableAt atomic.Int64 weight uint32 state atomic.Uint32 - frequency uint8 queueType uint8 } diff --git a/vendor/github.com/maypok86/otter/v2/internal/generated/node/bew.go b/vendor/github.com/maypok86/otter/v2/internal/generated/node/bew.go index eb4ec018..18fbe059 100644 --- a/vendor/github.com/maypok86/otter/v2/internal/generated/node/bew.go +++ b/vendor/github.com/maypok86/otter/v2/internal/generated/node/bew.go @@ -25,7 +25,6 @@ type BEW[K comparable, V any] struct { expiresAt atomic.Int64 weight uint32 state atomic.Uint32 - frequency uint8 queueType uint8 } diff --git a/vendor/github.com/maypok86/otter/v2/internal/generated/node/brw.go b/vendor/github.com/maypok86/otter/v2/internal/generated/node/brw.go index 8632791e..c0f0686e 100644 --- a/vendor/github.com/maypok86/otter/v2/internal/generated/node/brw.go +++ b/vendor/github.com/maypok86/otter/v2/internal/generated/node/brw.go @@ -23,7 +23,6 @@ type BRW[K comparable, V any] struct { refreshableAt atomic.Int64 weight uint32 state atomic.Uint32 - frequency uint8 queueType uint8 } diff --git a/vendor/github.com/maypok86/otter/v2/internal/generated/node/bs.go b/vendor/github.com/maypok86/otter/v2/internal/generated/node/bs.go index 88707605..448b69cb 100644 --- a/vendor/github.com/maypok86/otter/v2/internal/generated/node/bs.go +++ b/vendor/github.com/maypok86/otter/v2/internal/generated/node/bs.go @@ -19,7 +19,6 @@ type BS[K comparable, V any] struct { prev *BS[K, V] next *BS[K, V] state atomic.Uint32 - frequency uint8 queueType uint8 } diff --git a/vendor/github.com/maypok86/otter/v2/internal/generated/node/bse.go b/vendor/github.com/maypok86/otter/v2/internal/generated/node/bse.go index 5d572370..9a27c5f7 100644 --- a/vendor/github.com/maypok86/otter/v2/internal/generated/node/bse.go +++ b/vendor/github.com/maypok86/otter/v2/internal/generated/node/bse.go @@ -24,7 +24,6 @@ type BSE[K comparable, V any] struct { nextExp *BSE[K, V] expiresAt atomic.Int64 state atomic.Uint32 - frequency uint8 queueType uint8 } diff --git a/vendor/github.com/maypok86/otter/v2/internal/generated/node/bser.go b/vendor/github.com/maypok86/otter/v2/internal/generated/node/bser.go index 9e374b5e..4eb61764 100644 --- a/vendor/github.com/maypok86/otter/v2/internal/generated/node/bser.go +++ b/vendor/github.com/maypok86/otter/v2/internal/generated/node/bser.go @@ -27,7 +27,6 @@ type BSER[K comparable, V any] struct { expiresAt atomic.Int64 refreshableAt atomic.Int64 state atomic.Uint32 - frequency uint8 queueType uint8 } diff --git a/vendor/github.com/maypok86/otter/v2/internal/generated/node/bsr.go b/vendor/github.com/maypok86/otter/v2/internal/generated/node/bsr.go index 56005c33..d00edcbf 100644 --- a/vendor/github.com/maypok86/otter/v2/internal/generated/node/bsr.go +++ b/vendor/github.com/maypok86/otter/v2/internal/generated/node/bsr.go @@ -22,7 +22,6 @@ type BSR[K comparable, V any] struct { next *BSR[K, V] refreshableAt atomic.Int64 state atomic.Uint32 - frequency uint8 queueType uint8 } diff --git a/vendor/github.com/maypok86/otter/v2/internal/generated/node/bw.go b/vendor/github.com/maypok86/otter/v2/internal/generated/node/bw.go index 84f8f84c..23cca819 100644 --- a/vendor/github.com/maypok86/otter/v2/internal/generated/node/bw.go +++ b/vendor/github.com/maypok86/otter/v2/internal/generated/node/bw.go @@ -20,7 +20,6 @@ type BW[K comparable, V any] struct { next *BW[K, V] weight uint32 state atomic.Uint32 - frequency uint8 queueType uint8 } diff --git a/vendor/github.com/maypok86/otter/v2/internal/xiter/xiter.go b/vendor/github.com/maypok86/otter/v2/internal/xiter/xiter.go new file mode 100644 index 00000000..836060fd --- /dev/null +++ b/vendor/github.com/maypok86/otter/v2/internal/xiter/xiter.go @@ -0,0 +1,63 @@ +// Copyright (c) 2025 Alexey Mayshev and contributors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package xiter + +import "iter" + +// Concat returns an iterator over the concatenation of the sequences. +func Concat[V any](seqs ...iter.Seq[V]) iter.Seq[V] { + return func(yield func(V) bool) { + for _, seq := range seqs { + for e := range seq { + if !yield(e) { + return + } + } + } + } +} + +// MergeFunc merges two sequences of values ordered by the function f. +// Values appear in the output once for each time they appear in x +// and once for each time they appear in y. +// When equal values appear in both sequences, +// the output contains the values from x before the values from y. +// If the two input sequences are not ordered by f, +// the output sequence will not be ordered by f, +// but it will still contain every value from x and y exactly once. +func MergeFunc[V any](x, y iter.Seq[V], f func(V, V) int) iter.Seq[V] { + return func(yield func(V) bool) { + next, stop := iter.Pull(y) + defer stop() + v2, ok2 := next() + for v1 := range x { + for ok2 && f(v1, v2) > 0 { + if !yield(v2) { + return + } + v2, ok2 = next() + } + if !yield(v1) { + return + } + } + for ok2 { + if !yield(v2) { + return + } + v2, ok2 = next() + } + } +} diff --git a/vendor/github.com/maypok86/otter/v2/internal/xmath/xmath.go b/vendor/github.com/maypok86/otter/v2/internal/xmath/xmath.go index 6b720461..a2e1aea0 100644 --- a/vendor/github.com/maypok86/otter/v2/internal/xmath/xmath.go +++ b/vendor/github.com/maypok86/otter/v2/internal/xmath/xmath.go @@ -14,6 +14,8 @@ package xmath +import "math" + func Abs(a int64) int64 { if a < 0 { return -a @@ -50,3 +52,11 @@ func RoundUpPowerOf264(x uint64) uint64 { x++ return x } + +func SaturatedAdd(a, b int64) int64 { + s := a + b + if s < a || s < b { + return math.MaxInt64 + } + return s +} diff --git a/vendor/github.com/maypok86/otter/v2/mkdocs.yml b/vendor/github.com/maypok86/otter/v2/mkdocs.yml index 7206f3ee..bb13f7f5 100644 --- a/vendor/github.com/maypok86/otter/v2/mkdocs.yml +++ b/vendor/github.com/maypok86/otter/v2/mkdocs.yml @@ -73,7 +73,7 @@ extra: - icon: fontawesome/brands/github link: https://github.com/maypok86 - icon: fontawesome/brands/golang - link: https://pkg.go.dev/github.com/maypok86/otter/ + link: https://pkg.go.dev/github.com/maypok86/otter/v2 # Extensions markdown_extensions: @@ -138,7 +138,9 @@ nav: - Loading: user-guide/v2/features/loading.md - Refresh: user-guide/v2/features/refresh.md - Bulk operations: user-guide/v2/features/bulk.md + - Compute: user-guide/v2/features/compute.md - Statistics: user-guide/v2/features/statistics.md + - Persistence: user-guide/v2/features/persistence.md - Extension: user-guide/v2/features/extension.md - Iteration: user-guide/v2/features/iteration.md - v1 manual: diff --git a/vendor/github.com/maypok86/otter/v2/options.go b/vendor/github.com/maypok86/otter/v2/options.go index 94eaac08..7b9ace97 100644 --- a/vendor/github.com/maypok86/otter/v2/options.go +++ b/vendor/github.com/maypok86/otter/v2/options.go @@ -137,6 +137,14 @@ type Options[K comparable, V any] struct { // Beware that configuring a cache with an executor that discards tasks or never runs them may // experience non-deterministic behavior. Executor func(fn func()) + // Clock specifies a nanosecond-precision time source for use in determining when entries should be + // expired or refreshed. By default, time.Now().UnixNano() is used. + // + // The primary intent of this option is to facilitate testing of caches which have been configured + // with ExpiryCalculator or RefreshCalculator. + // + // NOTE: this clock is not used when recording statistics. + Clock Clock // Logger specifies the Logger implementation that will be used for logging warning and errors. // // The cache will use slog.Default() by default. diff --git a/vendor/github.com/maypok86/otter/v2/persistence.go b/vendor/github.com/maypok86/otter/v2/persistence.go new file mode 100644 index 00000000..b717fc7e --- /dev/null +++ b/vendor/github.com/maypok86/otter/v2/persistence.go @@ -0,0 +1,155 @@ +// Copyright (c) 2025 Alexey Mayshev and contributors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otter + +import ( + "encoding/gob" + "errors" + "fmt" + "io" + "os" + "path/filepath" + "time" +) + +// LoadCacheFromFile loads cache data from the given filePath. +// +// See SaveCacheToFile for saving cache data to file. +func LoadCacheFromFile[K comparable, V any](c *Cache[K, V], filePath string) error { + file, err := os.Open(filePath) + if err != nil { + return fmt.Errorf("otter: open file %s: %w", filePath, err) + } + //nolint:errcheck // it's ok + defer file.Close() + + return LoadCacheFrom(c, file) +} + +// LoadCacheFrom loads cache data from the given [io.Reader]. +// +// See SaveCacheToFile for saving cache data to file. +func LoadCacheFrom[K comparable, V any](c *Cache[K, V], r io.Reader) error { + dec := gob.NewDecoder(r) + + var savedMaximum uint64 + if err := dec.Decode(&savedMaximum); err != nil { + return fmt.Errorf("otter: decode maximum: %w", err) + } + + maximum := min(savedMaximum, c.GetMaximum()) + maximum2 := maximum / 4 + maximum1 := 2 * maximum2 + size := uint64(0) + for size < maximum { + var entry Entry[K, V] + if err := dec.Decode(&entry); err != nil { + if errors.Is(err, io.EOF) { + break + } + + return fmt.Errorf("otter: decode entry: %w", err) + } + + nowNano := c.cache.clock.NowNano() + if c.cache.withExpiration && entry.ExpiresAtNano < nowNano { + continue + } + c.Set(entry.Key, entry.Value) + if c.cache.withExpiration && entry.ExpiresAtNano != unreachableExpiresAt { + expiresAfter := max(1, time.Duration(entry.ExpiresAtNano-nowNano)) + c.SetExpiresAfter(entry.Key, expiresAfter) + } + if c.cache.withRefresh && entry.RefreshableAtNano != unreachableRefreshableAt { + refreshableAfter := max(1, time.Duration(entry.RefreshableAtNano-nowNano)) + c.SetRefreshableAfter(entry.Key, refreshableAfter) + } + size += uint64(entry.Weight) + + if size <= maximum2 { + c.GetIfPresent(entry.Key) + c.GetIfPresent(entry.Key) + continue + } + if size <= maximum1 { + c.GetIfPresent(entry.Key) + continue + } + } + + return nil +} + +// SaveCacheToFile atomically saves cache data to the given filePath. +// +// SaveCacheToFile may be called concurrently with other operations on the cache. +// +// The saved data may be loaded with LoadCacheFromFile. +// +// WARNING: Beware that this operation is performed within the eviction policy's exclusive lock. +// While the operation is in progress further eviction maintenance will be halted. +func SaveCacheToFile[K comparable, V any](c *Cache[K, V], filePath string) error { + // Create dir if it doesn't exist. + dir := filepath.Dir(filePath) + if _, err := os.Stat(dir); err != nil { + if !os.IsNotExist(err) { + return fmt.Errorf("otter: stat %s: %w", dir, err) + } + if err := os.MkdirAll(dir, 0o755); err != nil { + return fmt.Errorf("otter: create dir %s: %w", dir, err) + } + } + + file, err := os.Create(filePath) + if err != nil { + return fmt.Errorf("otter: create file %s: %w", filePath, err) + } + //nolint:errcheck // it's ok + defer file.Close() + + return SaveCacheTo(c, file) +} + +// SaveCacheTo atomically saves cache data to the given [io.Writer]. +// +// SaveCacheToFile may be called concurrently with other operations on the cache. +// +// The saved data may be loaded with LoadCacheFrom. +// +// WARNING: Beware that this operation is performed within the eviction policy's exclusive lock. +// While the operation is in progress further eviction maintenance will be halted. +func SaveCacheTo[K comparable, V any](c *Cache[K, V], w io.Writer) error { + enc := gob.NewEncoder(w) + + maximum := c.GetMaximum() + if err := enc.Encode(maximum); err != nil { + return fmt.Errorf("otter: encode maximum: %w", err) + } + + size := uint64(0) + for entry := range c.Hottest() { + if size >= maximum { + break + } + + if err := enc.Encode(entry); err != nil { + return fmt.Errorf("otter: encode entry: %w", err) + } + + size += uint64(entry.Weight) + } + + return nil +} diff --git a/vendor/github.com/maypok86/otter/v2/internal/eviction/tinylfu/policy.go b/vendor/github.com/maypok86/otter/v2/policy.go similarity index 52% rename from vendor/github.com/maypok86/otter/v2/internal/eviction/tinylfu/policy.go rename to vendor/github.com/maypok86/otter/v2/policy.go index b96b2543..b141b8e6 100644 --- a/vendor/github.com/maypok86/otter/v2/internal/eviction/tinylfu/policy.go +++ b/vendor/github.com/maypok86/otter/v2/policy.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tinylfu +package otter import ( "github.com/maypok86/otter/v2/internal/deque" @@ -25,83 +25,83 @@ const ( // The initial percent of the maximum weighted capacity dedicated to the main space. percentMain = 0.99 - // PercentMainProtected is the percent of the maximum weighted capacity dedicated to the main's protected space. - PercentMainProtected = 0.80 + // percentMainProtected is the percent of the maximum weighted capacity dedicated to the main's protected space. + percentMainProtected = 0.80 // The difference in hit rates that restarts the climber. hillClimberRestartThreshold = 0.05 // The percent of the total size to adapt the window by. hillClimberStepPercent = 0.0625 // The rate to decrease the step size to adapt by. hillClimberStepDecayRate = 0.98 - // AdmitHashdosThreshold is the minimum popularity for allowing randomized admission. - AdmitHashdosThreshold = 6 + // admitHashdosThreshold is the minimum popularity for allowing randomized admission. + admitHashdosThreshold = 6 // The maximum number of entries that can be transferred between queues. queueTransferThreshold = 1_000 ) -type Policy[K comparable, V any] struct { - Sketch *Sketch[K] - Window *deque.Linked[K, V] - Probation *deque.Linked[K, V] - Protected *deque.Linked[K, V] - Maximum uint64 - WeightedSize uint64 - WindowMaximum uint64 - WindowWeightedSize uint64 - MainProtectedMaximum uint64 - MainProtectedWeightedSize uint64 - StepSize float64 - Adjustment int64 - HitsInSample uint64 - MissesInSample uint64 - PreviousSampleHitRate float64 - IsWeighted bool - Rand func() uint32 +type policy[K comparable, V any] struct { + sketch *sketch[K] + window *deque.Linked[K, V] + probation *deque.Linked[K, V] + protected *deque.Linked[K, V] + maximum uint64 + weightedSize uint64 + windowMaximum uint64 + windowWeightedSize uint64 + mainProtectedMaximum uint64 + mainProtectedWeightedSize uint64 + stepSize float64 + adjustment int64 + hitsInSample uint64 + missesInSample uint64 + previousSampleHitRate float64 + isWeighted bool + rand func() uint32 } -func NewPolicy[K comparable, V any](isWeighted bool) *Policy[K, V] { - return &Policy[K, V]{ - Sketch: newSketch[K](), - Window: deque.NewLinked[K, V](isExp), - Probation: deque.NewLinked[K, V](isExp), - Protected: deque.NewLinked[K, V](isExp), - IsWeighted: isWeighted, - Rand: xruntime.Fastrand, +func newPolicy[K comparable, V any](isWeighted bool) *policy[K, V] { + return &policy[K, V]{ + sketch: newSketch[K](), + window: deque.NewLinked[K, V](isExp), + probation: deque.NewLinked[K, V](isExp), + protected: deque.NewLinked[K, V](isExp), + isWeighted: isWeighted, + rand: xruntime.Fastrand, } } -// Access updates the eviction policy based on node accesses. -func (p *Policy[K, V]) Access(n node.Node[K, V]) { - p.Sketch.Increment(n.Key()) +// access updates the eviction policy based on node accesses. +func (p *policy[K, V]) access(n node.Node[K, V]) { + p.sketch.increment(n.Key()) switch { case n.InWindow(): - reorder(p.Window, n) + reorder(p.window, n) case n.InMainProbation(): p.reorderProbation(n) case n.InMainProtected(): - reorder(p.Protected, n) + reorder(p.protected, n) } - p.HitsInSample++ + p.hitsInSample++ } -// Add adds node to the eviction policy. -func (p *Policy[K, V]) Add(n node.Node[K, V], evictNode func(n node.Node[K, V], nowNanos int64)) { +// add adds node to the eviction policy. +func (p *policy[K, V]) add(n node.Node[K, V], evictNode func(n node.Node[K, V], nowNanos int64)) { nodeWeight := uint64(n.Weight()) - p.WeightedSize += nodeWeight - p.WindowWeightedSize += nodeWeight - if p.WeightedSize >= p.Maximum>>1 { + p.weightedSize += nodeWeight + p.windowWeightedSize += nodeWeight + if p.weightedSize >= p.maximum>>1 { // Lazily initialize when close to the maximum - capacity := p.Maximum - if p.IsWeighted { + capacity := p.maximum + if p.isWeighted { //nolint:gosec // there's no overflow - capacity = uint64(p.Window.Len()) + uint64(p.Probation.Len()) + uint64(p.Protected.Len()) + capacity = uint64(p.window.Len()) + uint64(p.probation.Len()) + uint64(p.protected.Len()) } - p.Sketch.EnsureCapacity(capacity) + p.sketch.ensureCapacity(capacity) } - p.Sketch.Increment(n.Key()) - p.MissesInSample++ + p.sketch.increment(n.Key()) + p.missesInSample++ // ignore out-of-order write operations if !n.IsAlive() { @@ -109,144 +109,140 @@ func (p *Policy[K, V]) Add(n node.Node[K, V], evictNode func(n node.Node[K, V], } switch { - case nodeWeight > p.Maximum: + case nodeWeight > p.maximum: evictNode(n, 0) - case nodeWeight > p.WindowMaximum: - p.Window.PushFront(n) + case nodeWeight > p.windowMaximum: + p.window.PushFront(n) default: - p.Window.PushBack(n) + p.window.PushBack(n) } } -func (p *Policy[K, V]) Update(n, old node.Node[K, V], evictNode func(n node.Node[K, V], nowNanos int64)) { +func (p *policy[K, V]) update(n, old node.Node[K, V], evictNode func(n node.Node[K, V], nowNanos int64)) { nodeWeight := uint64(n.Weight()) p.updateNode(n, old) switch { case n.InWindow(): - p.WindowWeightedSize += nodeWeight + p.windowWeightedSize += nodeWeight switch { - case nodeWeight > p.Maximum: + case nodeWeight > p.maximum: evictNode(n, 0) - case nodeWeight <= p.WindowMaximum: - p.Access(n) - case p.Window.Contains(n): - p.Window.MoveToFront(n) + case nodeWeight <= p.windowMaximum: + p.access(n) + case p.window.Contains(n): + p.window.MoveToFront(n) } case n.InMainProbation(): - if nodeWeight <= p.Maximum { - p.Access(n) + if nodeWeight <= p.maximum { + p.access(n) } else { evictNode(n, 0) } case n.InMainProtected(): - p.MainProtectedWeightedSize += nodeWeight - if nodeWeight <= p.Maximum { - p.Access(n) + p.mainProtectedWeightedSize += nodeWeight + if nodeWeight <= p.maximum { + p.access(n) } else { evictNode(n, 0) } } - p.WeightedSize += nodeWeight + p.weightedSize += nodeWeight } -func (p *Policy[K, V]) updateNode(n, old node.Node[K, V]) { +func (p *policy[K, V]) updateNode(n, old node.Node[K, V]) { n.SetQueueType(old.GetQueueType()) switch { case n.InWindow(): - p.Window.UpdateNode(n, old) + p.window.UpdateNode(n, old) case n.InMainProbation(): - p.Probation.UpdateNode(n, old) + p.probation.UpdateNode(n, old) default: - p.Protected.UpdateNode(n, old) + p.protected.UpdateNode(n, old) } - p.MakeDead(old) + p.makeDead(old) } -// Delete deletes node from the eviction policy. -func (p *Policy[K, V]) Delete(n node.Node[K, V]) { +// delete deletes node from the eviction policy. +func (p *policy[K, V]) delete(n node.Node[K, V]) { // add may not have been processed yet switch { case n.InWindow(): - p.Window.Delete(n) + p.window.Delete(n) case n.InMainProbation(): - p.Probation.Delete(n) + p.probation.Delete(n) default: - p.Protected.Delete(n) + p.protected.Delete(n) } - p.MakeDead(n) + p.makeDead(n) } -func (p *Policy[K, V]) MakeDead(n node.Node[K, V]) { +func (p *policy[K, V]) makeDead(n node.Node[K, V]) { if !n.IsDead() { nodeWeight := uint64(n.Weight()) if n.InWindow() { - p.WindowWeightedSize -= nodeWeight + p.windowWeightedSize -= nodeWeight } else if n.InMainProtected() { - p.MainProtectedWeightedSize -= nodeWeight + p.mainProtectedWeightedSize -= nodeWeight } - p.WeightedSize -= nodeWeight + p.weightedSize -= nodeWeight n.Die() } } -func (p *Policy[K, V]) SetMaximumSize(maximum uint64) { - if maximum == p.Maximum { +func (p *policy[K, V]) setMaximumSize(maximum uint64) { + if maximum == p.maximum { return } window := maximum - uint64(percentMain*float64(maximum)) - mainProtected := uint64(PercentMainProtected * float64(maximum-window)) + mainProtected := uint64(percentMainProtected * float64(maximum-window)) - p.Maximum = maximum - p.WindowMaximum = window - p.MainProtectedMaximum = mainProtected + p.maximum = maximum + p.windowMaximum = window + p.mainProtectedMaximum = mainProtected - p.HitsInSample = 0 - p.MissesInSample = 0 - p.StepSize = -hillClimberStepPercent * float64(maximum) + p.hitsInSample = 0 + p.missesInSample = 0 + p.stepSize = -hillClimberStepPercent * float64(maximum) - if p.Sketch != nil && !p.IsWeighted && p.WeightedSize >= (maximum>>1) { - // Lazily initialize when close to the maximum Size - p.Sketch.EnsureCapacity(maximum) + if p.sketch != nil && !p.isWeighted && p.weightedSize >= (maximum>>1) { + // Lazily initialize when close to the maximum size + p.sketch.ensureCapacity(maximum) } } -func (p *Policy[K, V]) EnsureCapacity(capacity uint64) { - p.Sketch.EnsureCapacity(capacity) -} - // Promote the node from probation to protected on access. -func (p *Policy[K, V]) reorderProbation(n node.Node[K, V]) { +func (p *policy[K, V]) reorderProbation(n node.Node[K, V]) { nodeWeight := uint64(n.Weight()) - if p.Probation.NotContains(n) { + if p.probation.NotContains(n) { // Ignore stale accesses for an entry that is no longer present return - } else if nodeWeight > p.MainProtectedMaximum { - reorder(p.Probation, n) + } else if nodeWeight > p.mainProtectedMaximum { + reorder(p.probation, n) return } // If the protected space exceeds its maximum, the LRU items are demoted to the probation space. // This is deferred to the adaption phase at the end of the maintenance cycle. - p.MainProtectedWeightedSize += nodeWeight - p.Probation.Delete(n) - p.Protected.PushBack(n) + p.mainProtectedWeightedSize += nodeWeight + p.probation.Delete(n) + p.protected.PushBack(n) n.MakeMainProtected() } -func (p *Policy[K, V]) EvictNodes(evictNode func(n node.Node[K, V], nowNanos int64)) { - candidate := p.EvictFromWindow() +func (p *policy[K, V]) evictNodes(evictNode func(n node.Node[K, V], nowNanos int64)) { + candidate := p.evictFromWindow() p.evictFromMain(candidate, evictNode) } -func (p *Policy[K, V]) EvictFromWindow() node.Node[K, V] { +func (p *policy[K, V]) evictFromWindow() node.Node[K, V] { var first node.Node[K, V] - n := p.Window.Head() - for p.WindowWeightedSize > p.WindowMaximum { - // The pending operations will adjust the Size to reflect the correct weight + n := p.window.Head() + for p.windowWeightedSize > p.windowMaximum { + // The pending operations will adjust the size to reflect the correct weight if node.Equals(n, nil) { break } @@ -255,43 +251,43 @@ func (p *Policy[K, V]) EvictFromWindow() node.Node[K, V] { nodeWeight := uint64(n.Weight()) if nodeWeight != 0 { n.MakeMainProbation() - p.Window.Delete(n) - p.Probation.PushBack(n) + p.window.Delete(n) + p.probation.PushBack(n) if first == nil { first = n } - p.WindowWeightedSize -= nodeWeight + p.windowWeightedSize -= nodeWeight } n = next } return first } -func (p *Policy[K, V]) evictFromMain(candidate node.Node[K, V], evictNode func(n node.Node[K, V], nowNanos int64)) { +func (p *policy[K, V]) evictFromMain(candidate node.Node[K, V], evictNode func(n node.Node[K, V], nowNanos int64)) { victimQueue := node.InMainProbationQueue candidateQueue := node.InMainProbationQueue - victim := p.Probation.Head() - for p.WeightedSize > p.Maximum { + victim := p.probation.Head() + for p.weightedSize > p.maximum { // Search the admission window for additional candidates if node.Equals(candidate, nil) && candidateQueue == node.InMainProbationQueue { - candidate = p.Window.Head() + candidate = p.window.Head() candidateQueue = node.InWindowQueue } // Try evicting from the protected and window queues if node.Equals(candidate, nil) && node.Equals(victim, nil) { if victimQueue == node.InMainProbationQueue { - victim = p.Protected.Head() + victim = p.protected.Head() victimQueue = node.InMainProtectedQueue continue } else if victimQueue == node.InMainProtectedQueue { - victim = p.Window.Head() + victim = p.window.Head() victimQueue = node.InWindowQueue continue } - // The pending operations will adjust the Size to reflect the correct weight + // The pending operations will adjust the size to reflect the correct weight break } @@ -340,7 +336,7 @@ func (p *Policy[K, V]) evictFromMain(candidate node.Node[K, V], evictNode func(n } // Evict immediately if the candidate's weight exceeds the maximum - if uint64(candidate.Weight()) > p.Maximum { + if uint64(candidate.Weight()) > p.maximum { evict := candidate candidate = candidate.Next() evictNode(evict, 0) @@ -348,7 +344,7 @@ func (p *Policy[K, V]) evictFromMain(candidate node.Node[K, V], evictNode func(n } // Evict the entry with the lowest frequency - if p.Admit(candidate.Key(), victim.Key()) { + if p.admit(candidate.Key(), victim.Key()) { evict := victim victim = victim.Next() evictNode(evict, 0) @@ -361,25 +357,25 @@ func (p *Policy[K, V]) evictFromMain(candidate node.Node[K, V], evictNode func(n } } -func (p *Policy[K, V]) Admit(candidateKey, victimKey K) bool { - victimFreq := p.Sketch.Frequency(victimKey) - candidateFreq := p.Sketch.Frequency(candidateKey) +func (p *policy[K, V]) admit(candidateKey, victimKey K) bool { + victimFreq := p.sketch.frequency(victimKey) + candidateFreq := p.sketch.frequency(candidateKey) if candidateFreq > victimFreq { return true } - if candidateFreq >= AdmitHashdosThreshold { + if candidateFreq >= admitHashdosThreshold { // The maximum frequency is 15 and halved to 7 after a reset to age the history. An attack // exploits that a hot candidate is rejected in favor of a hot victim. The threshold of a warm // candidate reduces the number of random acceptances to minimize the impact on the hit rate. - return (p.Rand() & 127) == 0 + return (p.rand() & 127) == 0 } return false } -func (p *Policy[K, V]) Climb() { +func (p *policy[K, V]) climb() { p.determineAdjustment() p.demoteFromMainProtected() - amount := p.Adjustment + amount := p.adjustment if amount == 0 { return } @@ -390,24 +386,24 @@ func (p *Policy[K, V]) Climb() { } } -func (p *Policy[K, V]) determineAdjustment() { - if p.Sketch.IsNotInitialized() { - p.PreviousSampleHitRate = 0.0 - p.MissesInSample = 0 - p.HitsInSample = 0 +func (p *policy[K, V]) determineAdjustment() { + if p.sketch.isNotInitialized() { + p.previousSampleHitRate = 0.0 + p.missesInSample = 0 + p.hitsInSample = 0 return } - requestCount := p.HitsInSample + p.MissesInSample - if requestCount < p.Sketch.SampleSize { + requestCount := p.hitsInSample + p.missesInSample + if requestCount < p.sketch.sampleSize { return } - hitRate := float64(p.HitsInSample) / float64(requestCount) - hitRateChange := hitRate - p.PreviousSampleHitRate - amount := p.StepSize + hitRate := float64(p.hitsInSample) / float64(requestCount) + hitRateChange := hitRate - p.previousSampleHitRate + amount := p.stepSize if hitRateChange < 0 { - amount = -p.StepSize + amount = -p.stepSize } var nextStepSize float64 if abs(hitRateChange) >= hillClimberRestartThreshold { @@ -415,20 +411,20 @@ func (p *Policy[K, V]) determineAdjustment() { if amount >= 0 { k = float64(1) } - nextStepSize = hillClimberStepPercent * float64(p.Maximum) * k + nextStepSize = hillClimberStepPercent * float64(p.maximum) * k } else { nextStepSize = hillClimberStepDecayRate * amount } - p.PreviousSampleHitRate = hitRate - p.Adjustment = int64(amount) - p.StepSize = nextStepSize - p.MissesInSample = 0 - p.HitsInSample = 0 + p.previousSampleHitRate = hitRate + p.adjustment = int64(amount) + p.stepSize = nextStepSize + p.missesInSample = 0 + p.hitsInSample = 0 } -func (p *Policy[K, V]) demoteFromMainProtected() { - mainProtectedMaximum := p.MainProtectedMaximum - mainProtectedWeightedSize := p.MainProtectedWeightedSize +func (p *policy[K, V]) demoteFromMainProtected() { + mainProtectedMaximum := p.mainProtectedMaximum + mainProtectedWeightedSize := p.mainProtectedWeightedSize if mainProtectedWeightedSize <= mainProtectedMaximum { return } @@ -438,36 +434,36 @@ func (p *Policy[K, V]) demoteFromMainProtected() { break } - demoted := p.Protected.PopFront() + demoted := p.protected.PopFront() if node.Equals(demoted, nil) { break } demoted.MakeMainProbation() - p.Probation.PushBack(demoted) + p.probation.PushBack(demoted) mainProtectedWeightedSize -= uint64(demoted.Weight()) } - p.MainProtectedWeightedSize = mainProtectedWeightedSize + p.mainProtectedWeightedSize = mainProtectedWeightedSize } -func (p *Policy[K, V]) increaseWindow() { - if p.MainProtectedMaximum == 0 { +func (p *policy[K, V]) increaseWindow() { + if p.mainProtectedMaximum == 0 { return } - quota := p.Adjustment - if p.MainProtectedMaximum < uint64(p.Adjustment) { - quota = int64(p.MainProtectedMaximum) + quota := p.adjustment + if p.mainProtectedMaximum < uint64(p.adjustment) { + quota = int64(p.mainProtectedMaximum) } - p.MainProtectedMaximum -= uint64(quota) - p.WindowMaximum += uint64(quota) + p.mainProtectedMaximum -= uint64(quota) + p.windowMaximum += uint64(quota) p.demoteFromMainProtected() for i := 0; i < queueTransferThreshold; i++ { - candidate := p.Probation.Head() + candidate := p.probation.Head() probation := true if node.Equals(candidate, nil) || quota < int64(candidate.Weight()) { - candidate = p.Protected.Head() + candidate = p.protected.Head() probation = false } if node.Equals(candidate, nil) { @@ -481,36 +477,36 @@ func (p *Policy[K, V]) increaseWindow() { quota -= int64(weight) if probation { - p.Probation.Delete(candidate) + p.probation.Delete(candidate) } else { - p.MainProtectedWeightedSize -= weight - p.Protected.Delete(candidate) + p.mainProtectedWeightedSize -= weight + p.protected.Delete(candidate) } - p.WindowWeightedSize += weight - p.Window.PushBack(candidate) + p.windowWeightedSize += weight + p.window.PushBack(candidate) candidate.MakeWindow() } - p.MainProtectedMaximum += uint64(quota) - p.WindowMaximum -= uint64(quota) - p.Adjustment = quota + p.mainProtectedMaximum += uint64(quota) + p.windowMaximum -= uint64(quota) + p.adjustment = quota } -func (p *Policy[K, V]) decreaseWindow() { - if p.WindowMaximum <= 1 { +func (p *policy[K, V]) decreaseWindow() { + if p.windowMaximum <= 1 { return } - quota := -p.Adjustment - windowMaximum := max(0, p.WindowMaximum-1) - if windowMaximum < uint64(-p.Adjustment) { + quota := -p.adjustment + windowMaximum := max(0, p.windowMaximum-1) + if windowMaximum < uint64(-p.adjustment) { quota = int64(windowMaximum) } - p.MainProtectedMaximum += uint64(quota) - p.WindowMaximum -= uint64(quota) + p.mainProtectedMaximum += uint64(quota) + p.windowMaximum -= uint64(quota) for i := 0; i < queueTransferThreshold; i++ { - candidate := p.Window.Head() + candidate := p.window.Head() if node.Equals(candidate, nil) { break } @@ -521,15 +517,15 @@ func (p *Policy[K, V]) decreaseWindow() { } quota -= weight - p.WindowWeightedSize -= uint64(weight) - p.Window.Delete(candidate) - p.Probation.PushBack(candidate) + p.windowWeightedSize -= uint64(weight) + p.window.Delete(candidate) + p.probation.PushBack(candidate) candidate.MakeMainProbation() } - p.MainProtectedMaximum -= uint64(quota) - p.WindowMaximum += uint64(quota) - p.Adjustment = -quota + p.mainProtectedMaximum -= uint64(quota) + p.windowMaximum += uint64(quota) + p.adjustment = -quota } func abs(a float64) float64 { diff --git a/vendor/github.com/maypok86/otter/v2/internal/eviction/tinylfu/sketch.go b/vendor/github.com/maypok86/otter/v2/sketch.go similarity index 64% rename from vendor/github.com/maypok86/otter/v2/internal/eviction/tinylfu/sketch.go rename to vendor/github.com/maypok86/otter/v2/sketch.go index 98c321a2..35baf0c4 100644 --- a/vendor/github.com/maypok86/otter/v2/internal/eviction/tinylfu/sketch.go +++ b/vendor/github.com/maypok86/otter/v2/sketch.go @@ -12,11 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tinylfu +package otter import ( "math" "math/bits" + "sync/atomic" "github.com/maypok86/otter/v2/internal/xmath" "github.com/maypok86/otter/v2/internal/xruntime" @@ -27,76 +28,80 @@ const ( oneMask = 0x1111111111111111 ) -// Sketch is a probabilistic multiset for estimating the popularity of an element within a time window. The +// sketch is a probabilistic multiset for estimating the popularity of an element within a time window. The // maximum frequency of an element is limited to 15 (4-bits) and an aging process periodically // halves the popularity of all elements. -type Sketch[K comparable] struct { - Table []uint64 - SampleSize uint64 - BlockMask uint64 - Size uint64 - hasher xruntime.Hasher[K] +type sketch[K comparable] struct { + table []uint64 + sampleSize uint64 + blockMask uint64 + size uint64 + hasher xruntime.Hasher[K] + isInitialized atomic.Bool } -func newSketch[K comparable]() *Sketch[K] { - return &Sketch[K]{ +func newSketch[K comparable]() *sketch[K] { + return &sketch[K]{ hasher: xruntime.NewHasher[K](), } } -func (s *Sketch[K]) EnsureCapacity(maximumSize uint64) { - if uint64(len(s.Table)) >= maximumSize { +func (s *sketch[K]) ensureCapacity(maximumSize uint64) { + if uint64(len(s.table)) >= maximumSize { return } + if !s.isInitialized.Load() { + s.isInitialized.Store(true) + } newSize := xmath.RoundUpPowerOf264(maximumSize) if newSize < 8 { newSize = 8 } - s.Table = make([]uint64, newSize) - s.SampleSize = 10 + s.table = make([]uint64, newSize) + s.sampleSize = 10 if maximumSize != 0 { - s.SampleSize = 10 * maximumSize + s.sampleSize = 10 * maximumSize } - s.BlockMask = (uint64(len(s.Table)) >> 3) - 1 - s.Size = 0 + s.blockMask = (uint64(len(s.table)) >> 3) - 1 + s.size = 0 s.hasher = xruntime.NewHasher[K]() } -func (s *Sketch[K]) IsNotInitialized() bool { - return s.Table == nil +func (s *sketch[K]) isNotInitialized() bool { + return !s.isInitialized.Load() } -func (s *Sketch[K]) Frequency(k K) uint64 { - if s.IsNotInitialized() { +func (s *sketch[K]) frequency(k K) uint64 { + if s.isNotInitialized() { return 0 } frequency := uint64(math.MaxUint64) blockHash := s.hash(k) counterHash := rehash(blockHash) - block := (blockHash & s.BlockMask) << 3 + block := (blockHash & s.blockMask) << 3 for i := uint64(0); i < 4; i++ { h := counterHash >> (i << 3) index := (h >> 1) & 15 offset := h & 1 slot := block + offset + (i << 1) - count := (s.Table[slot] >> (index << 2)) & 0xf + count := (s.table[slot] >> (index << 2)) & 0xf frequency = min(frequency, count) } return frequency } -func (s *Sketch[K]) Increment(k K) { - if s.IsNotInitialized() { +func (s *sketch[K]) increment(k K) { + if s.isNotInitialized() { return } blockHash := s.hash(k) counterHash := rehash(blockHash) - block := (blockHash & s.BlockMask) << 3 + block := (blockHash & s.blockMask) << 3 // Loop unrolling improves throughput by 10m ops/s h0 := counterHash @@ -120,34 +125,34 @@ func (s *Sketch[K]) Increment(k K) { added = s.incrementAt(slot3, index3) || added if added { - s.Size++ - if s.Size == s.SampleSize { + s.size++ + if s.size == s.sampleSize { s.reset() } } } -func (s *Sketch[K]) incrementAt(i, j uint64) bool { +func (s *sketch[K]) incrementAt(i, j uint64) bool { offset := j << 2 mask := uint64(0xf) << offset - if (s.Table[i] & mask) != mask { - s.Table[i] += uint64(1) << offset + if (s.table[i] & mask) != mask { + s.table[i] += uint64(1) << offset return true } return false } -func (s *Sketch[K]) reset() { +func (s *sketch[K]) reset() { count := 0 - for i := 0; i < len(s.Table); i++ { - count += bits.OnesCount64(s.Table[i] & oneMask) - s.Table[i] = (s.Table[i] >> 1) & resetMask + for i := 0; i < len(s.table); i++ { + count += bits.OnesCount64(s.table[i] & oneMask) + s.table[i] = (s.table[i] >> 1) & resetMask } //nolint:gosec // there's no overflow - s.Size = (s.Size - (uint64(count) >> 2)) >> 1 + s.size = (s.size - (uint64(count) >> 2)) >> 1 } -func (s *Sketch[K]) hash(k K) uint64 { +func (s *sketch[K]) hash(k K) uint64 { return spread(s.hasher.Hash(k)) } diff --git a/vendor/modules.txt b/vendor/modules.txt index e1ef5f66..04f6c352 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -119,17 +119,16 @@ github.com/mailru/easyjson github.com/mailru/easyjson/buffer github.com/mailru/easyjson/jlexer github.com/mailru/easyjson/jwriter -# github.com/maypok86/otter/v2 v2.0.0 +# github.com/maypok86/otter/v2 v2.1.0 ## explicit; go 1.24 github.com/maypok86/otter/v2 -github.com/maypok86/otter/v2/internal/clock github.com/maypok86/otter/v2/internal/deque github.com/maypok86/otter/v2/internal/deque/queue -github.com/maypok86/otter/v2/internal/eviction/tinylfu github.com/maypok86/otter/v2/internal/expiration github.com/maypok86/otter/v2/internal/generated/node github.com/maypok86/otter/v2/internal/hashmap github.com/maypok86/otter/v2/internal/lossy +github.com/maypok86/otter/v2/internal/xiter github.com/maypok86/otter/v2/internal/xmath github.com/maypok86/otter/v2/internal/xruntime github.com/maypok86/otter/v2/internal/xsync