mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-02-12 19:29:16 -06:00
fix: remove rate limited items from in memory buffer (#2207)
This commit is contained in:
@@ -40,6 +40,7 @@ type AssignResults struct {
|
||||
Unassigned []*sqlcv1.V1QueueItem
|
||||
SchedulingTimedOut []*sqlcv1.V1QueueItem
|
||||
RateLimited []*RateLimitResult
|
||||
RateLimitedToMove []*RateLimitResult
|
||||
}
|
||||
|
||||
type queueFactoryRepository struct {
|
||||
@@ -218,15 +219,12 @@ func (d *queueRepository) MarkQueueItemsProcessed(ctx context.Context, r *Assign
|
||||
}
|
||||
|
||||
// remove rate limited queue items from the queue and place them in the v1_rate_limited_queue_items table
|
||||
// we only do this if the requeue_after time is at least 2 seconds in the future, to avoid thrashing
|
||||
qisToMoveToRateLimited := make([]int64, 0, len(r.RateLimited))
|
||||
qisToMoveToRateLimitedRQAfter := make([]pgtype.Timestamptz, 0, len(r.RateLimited))
|
||||
|
||||
for _, row := range r.RateLimited {
|
||||
if row.NextRefillAt != nil && row.NextRefillAt.UTC().After(time.Now().UTC().Add(rateLimitedRequeueAfterThreshold)) {
|
||||
qisToMoveToRateLimited = append(qisToMoveToRateLimited, row.ID)
|
||||
qisToMoveToRateLimitedRQAfter = append(qisToMoveToRateLimitedRQAfter, sqlchelpers.TimestamptzFromTime(*row.NextRefillAt))
|
||||
}
|
||||
for _, row := range r.RateLimitedToMove {
|
||||
qisToMoveToRateLimited = append(qisToMoveToRateLimited, row.ID)
|
||||
qisToMoveToRateLimitedRQAfter = append(qisToMoveToRateLimitedRQAfter, sqlchelpers.TimestamptzFromTime(*row.NextRefillAt))
|
||||
}
|
||||
|
||||
if len(qisToMoveToRateLimited) > 0 {
|
||||
|
||||
@@ -225,7 +225,7 @@ func (q *Queuer) loopQueue(ctx context.Context) {
|
||||
|
||||
countMu.Lock()
|
||||
count += numFlushed
|
||||
processedQiLength += len(ar.assigned) + len(ar.unassigned) + len(ar.schedulingTimedOut) + len(ar.rateLimited)
|
||||
processedQiLength += len(ar.assigned) + len(ar.unassigned) + len(ar.schedulingTimedOut) + len(ar.rateLimited) + len(ar.rateLimitedToMove)
|
||||
countMu.Unlock()
|
||||
|
||||
if sinceStart := time.Since(startFlush); sinceStart > 100*time.Millisecond {
|
||||
@@ -407,6 +407,11 @@ func (q *Queuer) ack(r *assignResults) {
|
||||
delete(q.unassigned, schedulingTimedOutItem.ID)
|
||||
}
|
||||
|
||||
for _, rateLimitedItemToMove := range r.rateLimitedToMove {
|
||||
delete(q.unacked, rateLimitedItemToMove.qi.ID)
|
||||
delete(q.unassigned, rateLimitedItemToMove.qi.ID)
|
||||
}
|
||||
|
||||
for _, rateLimitedItem := range r.rateLimited {
|
||||
delete(q.unacked, rateLimitedItem.qi.ID)
|
||||
q.unassigned[rateLimitedItem.qi.ID] = rateLimitedItem.qi
|
||||
@@ -437,7 +442,7 @@ func (q *Queuer) flushToDatabase(ctx context.Context, r *assignResults) int {
|
||||
|
||||
q.l.Debug().Int("assigned", len(r.assigned)).Int("unassigned", len(r.unassigned)).Int("scheduling_timed_out", len(r.schedulingTimedOut)).Msg("flushing to database")
|
||||
|
||||
if len(r.assigned) == 0 && len(r.unassigned) == 0 && len(r.schedulingTimedOut) == 0 && len(r.rateLimited) == 0 {
|
||||
if len(r.assigned) == 0 && len(r.unassigned) == 0 && len(r.schedulingTimedOut) == 0 && len(r.rateLimited) == 0 && len(r.rateLimitedToMove) == 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
@@ -446,6 +451,7 @@ func (q *Queuer) flushToDatabase(ctx context.Context, r *assignResults) int {
|
||||
Unassigned: r.unassigned,
|
||||
SchedulingTimedOut: r.schedulingTimedOut,
|
||||
RateLimited: make([]*v1.RateLimitResult, 0, len(r.rateLimited)),
|
||||
RateLimitedToMove: make([]*v1.RateLimitResult, 0, len(r.rateLimitedToMove)),
|
||||
}
|
||||
|
||||
stepRunIdsToAcks := make(map[int64]int, len(r.assigned))
|
||||
@@ -472,6 +478,19 @@ func (q *Queuer) flushToDatabase(ctx context.Context, r *assignResults) int {
|
||||
})
|
||||
}
|
||||
|
||||
for _, rateLimitedItemToMove := range r.rateLimitedToMove {
|
||||
opts.RateLimitedToMove = append(opts.RateLimitedToMove, &v1.RateLimitResult{
|
||||
V1QueueItem: rateLimitedItemToMove.qi,
|
||||
ExceededKey: rateLimitedItemToMove.exceededKey,
|
||||
ExceededUnits: rateLimitedItemToMove.exceededUnits,
|
||||
ExceededVal: rateLimitedItemToMove.exceededVal,
|
||||
NextRefillAt: rateLimitedItemToMove.nextRefillAt,
|
||||
TaskId: rateLimitedItemToMove.qi.TaskID,
|
||||
TaskInsertedAt: rateLimitedItemToMove.qi.TaskInsertedAt,
|
||||
RetryCount: rateLimitedItemToMove.qi.RetryCount,
|
||||
})
|
||||
}
|
||||
|
||||
succeeded, failed, err := q.repo.MarkQueueItemsProcessed(ctx, opts)
|
||||
|
||||
if err != nil {
|
||||
@@ -518,7 +537,7 @@ func (q *Queuer) flushToDatabase(ctx context.Context, r *assignResults) int {
|
||||
TenantId: q.tenantId,
|
||||
Assigned: succeeded,
|
||||
SchedulingTimedOut: r.schedulingTimedOut,
|
||||
RateLimited: opts.RateLimited,
|
||||
RateLimited: append(opts.RateLimited, opts.RateLimitedToMove...),
|
||||
Unassigned: r.unassigned,
|
||||
}
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@ import (
|
||||
v1 "github.com/hatchet-dev/hatchet/pkg/repository/v1"
|
||||
)
|
||||
|
||||
const MAX_RATE_LIMIT_UPDATE_FREQUENCY = 990 * time.Millisecond // avoid boundary conditions on 1 second polls
|
||||
const MAX_RATE_LIMIT_UPDATE_FREQUENCY = 500 * time.Millisecond // avoid boundary conditions on 1 second polls
|
||||
|
||||
type rateLimit struct {
|
||||
key string
|
||||
|
||||
@@ -17,6 +17,8 @@ import (
|
||||
"github.com/hatchet-dev/hatchet/pkg/scheduling/v0/randomticker"
|
||||
)
|
||||
|
||||
const rateLimitedRequeueAfterThreshold = 2 * time.Second
|
||||
|
||||
// Scheduler is responsible for scheduling steps to workers as efficiently as possible.
|
||||
// This is tenant-scoped, so each tenant will have its own scheduler.
|
||||
type Scheduler struct {
|
||||
@@ -423,6 +425,19 @@ type scheduleRateLimitResult struct {
|
||||
qi *sqlcv1.V1QueueItem
|
||||
}
|
||||
|
||||
// shouldRemoveFromQueue returns true if the queue item is being rate limited and should be removed from the queue
|
||||
// until the rate limit is reset.
|
||||
// we only do this if the requeue_after time is at least 2 seconds in the future, to avoid thrashing
|
||||
func (s *scheduleRateLimitResult) shouldRemoveFromQueue() bool {
|
||||
if s.rateLimitResult == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
nextRefillAt := s.nextRefillAt
|
||||
|
||||
return nextRefillAt != nil && nextRefillAt.UTC().After(time.Now().UTC().Add(rateLimitedRequeueAfterThreshold))
|
||||
}
|
||||
|
||||
type assignSingleResult struct {
|
||||
qi *sqlcv1.V1QueueItem
|
||||
|
||||
@@ -661,6 +676,7 @@ type assignResults struct {
|
||||
unassigned []*sqlcv1.V1QueueItem
|
||||
schedulingTimedOut []*sqlcv1.V1QueueItem
|
||||
rateLimited []*scheduleRateLimitResult
|
||||
rateLimitedToMove []*scheduleRateLimitResult
|
||||
}
|
||||
|
||||
func (s *Scheduler) tryAssign(
|
||||
@@ -725,6 +741,7 @@ func (s *Scheduler) tryAssign(
|
||||
err := queueutils.BatchLinear(50, batched, func(batchQis []*sqlcv1.V1QueueItem) error {
|
||||
batchAssigned := make([]*assignedQueueItem, 0, len(batchQis))
|
||||
batchRateLimited := make([]*scheduleRateLimitResult, 0, len(batchQis))
|
||||
batchRateLimitedToMove := make([]*scheduleRateLimitResult, 0, len(batchQis))
|
||||
batchUnassigned := make([]*sqlcv1.V1QueueItem, 0, len(batchQis))
|
||||
|
||||
batchStart := time.Now()
|
||||
@@ -740,7 +757,12 @@ func (s *Scheduler) tryAssign(
|
||||
for _, singleRes := range results {
|
||||
if !singleRes.succeeded {
|
||||
if singleRes.rateLimitResult != nil {
|
||||
batchRateLimited = append(batchRateLimited, singleRes.rateLimitResult)
|
||||
if singleRes.rateLimitResult.shouldRemoveFromQueue() {
|
||||
|
||||
batchRateLimitedToMove = append(batchRateLimitedToMove, singleRes.rateLimitResult)
|
||||
} else {
|
||||
batchRateLimited = append(batchRateLimited, singleRes.rateLimitResult)
|
||||
}
|
||||
} else {
|
||||
batchUnassigned = append(batchUnassigned, singleRes.qi)
|
||||
|
||||
@@ -764,9 +786,10 @@ func (s *Scheduler) tryAssign(
|
||||
}
|
||||
|
||||
r := &assignResults{
|
||||
assigned: batchAssigned,
|
||||
rateLimited: batchRateLimited,
|
||||
unassigned: batchUnassigned,
|
||||
assigned: batchAssigned,
|
||||
rateLimited: batchRateLimited,
|
||||
rateLimitedToMove: batchRateLimitedToMove,
|
||||
unassigned: batchUnassigned,
|
||||
}
|
||||
|
||||
extensionResultsMu.Lock()
|
||||
|
||||
Reference in New Issue
Block a user