mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-04-26 12:18:33 -05:00
Fix: Remove internal replay batching for now (#1992)
* fix: remove batching, run replays serially
* proposal: do this at the replay controller level
* Revert "fix: remove batching, run replays serially"
This reverts commit 21a93bb260.
* feat: advisory lock
* fix: add prefix to lock
This commit is contained in:
@@ -659,17 +659,27 @@ func (tc *TasksControllerImpl) handleReplayTasks(ctx context.Context, tenantId s
|
||||
}
|
||||
}
|
||||
|
||||
replayRes, err := tc.repov1.Tasks().ReplayTasks(ctx, tenantId, taskIdRetryCounts)
|
||||
replayedTasks := make([]v1.TaskIdInsertedAtRetryCount, 0, len(taskIdRetryCounts))
|
||||
upsertedTasks := make([]*sqlcv1.V1Task, 0, len(taskIdRetryCounts))
|
||||
createdTasks := make([]*sqlcv1.V1Task, 0, len(taskIdRetryCounts))
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not replay tasks: %w", err)
|
||||
for _, task := range taskIdRetryCounts {
|
||||
replayRes, err := tc.repov1.Tasks().ReplayTasks(ctx, tenantId, []v1.TaskIdInsertedAtRetryCount{task})
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to replay task: %w", err)
|
||||
}
|
||||
|
||||
replayedTasks = append(replayedTasks, replayRes.ReplayedTasks...)
|
||||
upsertedTasks = append(upsertedTasks, replayRes.UpsertedTasks...)
|
||||
createdTasks = append(createdTasks, replayRes.InternalEventResults.CreatedTasks...)
|
||||
}
|
||||
|
||||
eg := &errgroup.Group{}
|
||||
|
||||
if len(replayRes.ReplayedTasks) > 0 {
|
||||
if len(replayedTasks) > 0 {
|
||||
eg.Go(func() error {
|
||||
err = tc.signalTasksReplayed(ctx, tenantId, replayRes.ReplayedTasks)
|
||||
err := tc.signalTasksReplayed(ctx, tenantId, replayedTasks)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not signal replayed tasks: %w", err)
|
||||
@@ -679,9 +689,9 @@ func (tc *TasksControllerImpl) handleReplayTasks(ctx context.Context, tenantId s
|
||||
})
|
||||
}
|
||||
|
||||
if len(replayRes.UpsertedTasks) > 0 {
|
||||
if len(upsertedTasks) > 0 {
|
||||
eg.Go(func() error {
|
||||
err = tc.signalTasksUpdated(ctx, tenantId, replayRes.UpsertedTasks)
|
||||
err := tc.signalTasksUpdated(ctx, tenantId, upsertedTasks)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not signal queued tasks: %w", err)
|
||||
@@ -691,9 +701,9 @@ func (tc *TasksControllerImpl) handleReplayTasks(ctx context.Context, tenantId s
|
||||
})
|
||||
}
|
||||
|
||||
if len(replayRes.InternalEventResults.CreatedTasks) > 0 {
|
||||
if len(createdTasks) > 0 {
|
||||
eg.Go(func() error {
|
||||
err = tc.signalTasksCreated(ctx, tenantId, replayRes.InternalEventResults.CreatedTasks)
|
||||
err := tc.signalTasksCreated(ctx, tenantId, createdTasks)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not signal created tasks: %w", err)
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"hash/fnv"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
@@ -2422,6 +2423,12 @@ func makeEventTypeArr(status sqlcv1.V1TaskEventType, n int) []sqlcv1.V1TaskEvent
|
||||
return a
|
||||
}
|
||||
|
||||
func hash(s string) int64 {
|
||||
h := fnv.New64a()
|
||||
h.Write([]byte(s))
|
||||
return int64(h.Sum64())
|
||||
}
|
||||
|
||||
func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*ReplayTasksResult, error) {
|
||||
tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, r.pool, r.l, 30000)
|
||||
|
||||
@@ -2429,6 +2436,12 @@ func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, t
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = r.queries.AdvisoryLock(ctx, tx, hash("replay_" + tenantId))
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to acquire advisory lock: %w", err)
|
||||
}
|
||||
|
||||
defer rollback()
|
||||
|
||||
taskIds := make([]int64, len(tasks))
|
||||
|
||||
Reference in New Issue
Block a user