mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-04-22 10:10:07 -05:00
@@ -2,6 +2,7 @@ package queueutils
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -11,6 +12,10 @@ import (
|
||||
type OpMethod func(ctx context.Context, id string) (bool, error)
|
||||
|
||||
// SerialOperation represents a method that can only run serially.
|
||||
// It can be configured with a maxJitter duration to add a random delay
|
||||
// before executing, which helps prevent the "thundering herd" problem
|
||||
// when many operations might start at the same time. The jitter is disabled
|
||||
// by default (maxJitter=0) and can be enabled via OperationPool.WithJitter().
|
||||
type SerialOperation struct {
|
||||
mu sync.RWMutex
|
||||
shouldContinue bool
|
||||
@@ -20,9 +25,15 @@ type SerialOperation struct {
|
||||
description string
|
||||
timeout time.Duration
|
||||
method OpMethod
|
||||
maxJitter time.Duration
|
||||
}
|
||||
|
||||
func (o *SerialOperation) RunOrContinue(ql *zerolog.Logger) {
|
||||
// Apply jitter if configured
|
||||
if o.maxJitter > 0 {
|
||||
jitter := time.Duration(rand.Int63n(int64(o.maxJitter)))
|
||||
time.Sleep(jitter)
|
||||
}
|
||||
|
||||
o.setContinue(true)
|
||||
o.Run(ql)
|
||||
|
||||
@@ -16,6 +16,7 @@ type OperationPool struct {
|
||||
description string
|
||||
method OpMethod
|
||||
ql *zerolog.Logger
|
||||
maxJitter time.Duration
|
||||
}
|
||||
|
||||
func NewOperationPool(ql *zerolog.Logger, timeout time.Duration, description string, method OpMethod) *OperationPool {
|
||||
@@ -24,9 +25,15 @@ func NewOperationPool(ql *zerolog.Logger, timeout time.Duration, description str
|
||||
description: description,
|
||||
method: method,
|
||||
ql: ql,
|
||||
maxJitter: 0, // Default to no jitter
|
||||
}
|
||||
}
|
||||
|
||||
func (p *OperationPool) WithJitter(maxJitter time.Duration) *OperationPool {
|
||||
p.maxJitter = maxJitter
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *OperationPool) SetTenants(tenants []*dbsqlc.Tenant) {
|
||||
tenantMap := make(map[string]bool)
|
||||
|
||||
@@ -58,6 +65,7 @@ func (p *OperationPool) GetOperation(id string) *SerialOperation {
|
||||
description: p.description,
|
||||
timeout: p.timeout,
|
||||
method: p.method,
|
||||
maxJitter: p.maxJitter,
|
||||
}
|
||||
|
||||
p.ops.Store(id, op)
|
||||
|
||||
Reference in New Issue
Block a user