mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-01-06 08:49:53 -06:00
Refactors the queueing logic to be fairly balanced between actions, with each action backed as a separate FIFO queue. Also adds support for priority queueing and custom queues, though those aren't exposed on the API layer yet. Improves throughput to be > 5000 tasks/second on a single queue. --------- Co-authored-by: Alexander Belanger <alexander@hatchet.run>
62 lines
1.6 KiB
Go
62 lines
1.6 KiB
Go
package scheduling
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// ExhaustedRateLimitCache is a cache of rate limits to their next refill time, which avoids querying queues
|
|
// where we know we're already rate-limited.
|
|
type ExhaustedRateLimitCache struct {
|
|
rlKeysToRefillTimes sync.Map
|
|
maxCacheDuration time.Duration
|
|
}
|
|
|
|
// NewExhaustedRateLimitCache creates a new ExhaustedRateLimitCache.
|
|
func NewExhaustedRateLimitCache(maxCacheDuration time.Duration) *ExhaustedRateLimitCache {
|
|
return &ExhaustedRateLimitCache{
|
|
maxCacheDuration: maxCacheDuration,
|
|
}
|
|
}
|
|
|
|
type cacheEntry struct {
|
|
minRefillTime time.Time
|
|
}
|
|
|
|
func (rlc *ExhaustedRateLimitCache) Set(tenantId, queue string, exhaustedRateLimitRefillTimes []time.Time) {
|
|
minRefillTime := time.Now().Add(rlc.maxCacheDuration)
|
|
|
|
for _, refillTime := range exhaustedRateLimitRefillTimes {
|
|
if refillTime.Before(minRefillTime) {
|
|
minRefillTime = refillTime
|
|
}
|
|
}
|
|
|
|
rlc.rlKeysToRefillTimes.Store(getKeyName(tenantId, queue), cacheEntry{
|
|
minRefillTime: minRefillTime,
|
|
})
|
|
}
|
|
|
|
// Get returns true if the rate limit is not exhausted, false otherwise.
|
|
func (rlc *ExhaustedRateLimitCache) IsExhausted(tenantId, queue string) bool {
|
|
refillTime, ok := rlc.rlKeysToRefillTimes.Load(getKeyName(tenantId, queue))
|
|
|
|
if !ok {
|
|
return false
|
|
}
|
|
|
|
isExhausted := refillTime.(cacheEntry).minRefillTime.After(time.Now())
|
|
|
|
// if the cache entry is expired, remove it
|
|
if !isExhausted {
|
|
rlc.rlKeysToRefillTimes.Delete(getKeyName(tenantId, queue))
|
|
}
|
|
|
|
return isExhausted
|
|
}
|
|
|
|
func getKeyName(tenantId, key string) string {
|
|
return fmt.Sprintf("%s:%s", tenantId, key)
|
|
}
|