diff --git a/internal/services/dispatcher/dispatcher_v1.go b/internal/services/dispatcher/dispatcher_v1.go index 544f1f806..d9cbbc637 100644 --- a/internal/services/dispatcher/dispatcher_v1.go +++ b/internal/services/dispatcher/dispatcher_v1.go @@ -127,7 +127,7 @@ func (d *DispatcherImpl) handleTaskBulkAssignedTask(ctx context.Context, msg *ms parentDataMap, err := d.repov1.Tasks().ListTaskParentOutputs(ctx, msg.TenantID, bulkDatas) if err != nil { - return fmt.Errorf("could not list parent data: %w", err) + return fmt.Errorf("could not list parent data for %d tasks: %w", len(bulkDatas), err) } for _, task := range bulkDatas { diff --git a/pkg/repository/v1/sqlcv1/tasks.sql b/pkg/repository/v1/sqlcv1/tasks.sql index 4f5981a2a..7570a78de 100644 --- a/pkg/repository/v1/sqlcv1/tasks.sql +++ b/pkg/repository/v1/sqlcv1/tasks.sql @@ -626,6 +626,7 @@ WITH RECURSIVE augmented_tasks AS ( -- First, select the tasks from the input SELECT id, + inserted_at, tenant_id, dag_id, dag_inserted_at, @@ -645,6 +646,7 @@ WITH RECURSIVE augmented_tasks AS ( -- Then, select the tasks that are children of the input tasks SELECT t.id, + t.inserted_at, t.tenant_id, t.dag_id, t.dag_inserted_at, @@ -683,8 +685,9 @@ WITH RECURSIVE augmented_tasks AS ( FROM v1_task t JOIN - -- TODO: USE INSERTED_AT - augmented_tasks at ON at.id = t.id AND at.tenant_id = t.tenant_id + augmented_tasks at ON at.id = t.id AND at.inserted_at = t.inserted_at + WHERE + t.tenant_id = @tenantId::uuid -- order by the task id to get a stable lock order ORDER BY id diff --git a/pkg/repository/v1/sqlcv1/tasks.sql.go b/pkg/repository/v1/sqlcv1/tasks.sql.go index 5ad7def43..b3ea23316 100644 --- a/pkg/repository/v1/sqlcv1/tasks.sql.go +++ b/pkg/repository/v1/sqlcv1/tasks.sql.go @@ -955,6 +955,7 @@ WITH RECURSIVE augmented_tasks AS ( -- First, select the tasks from the input SELECT id, + inserted_at, tenant_id, dag_id, dag_inserted_at, @@ -974,6 +975,7 @@ WITH RECURSIVE augmented_tasks AS ( -- Then, select the tasks that are children of the input tasks SELECT t.id, + t.inserted_at, t.tenant_id, t.dag_id, t.dag_inserted_at, @@ -1012,8 +1014,9 @@ WITH RECURSIVE augmented_tasks AS ( FROM v1_task t JOIN - -- TODO: USE INSERTED_AT - augmented_tasks at ON at.id = t.id AND at.tenant_id = t.tenant_id + augmented_tasks at ON at.id = t.id AND at.inserted_at = t.inserted_at + WHERE + t.tenant_id = $3::uuid -- order by the task id to get a stable lock order ORDER BY id diff --git a/pkg/repository/v1/task.go b/pkg/repository/v1/task.go index 195008d2f..96c99fa28 100644 --- a/pkg/repository/v1/task.go +++ b/pkg/repository/v1/task.go @@ -2142,7 +2142,7 @@ func makeEventTypeArr(status sqlcv1.V1TaskEventType, n int) []sqlcv1.V1TaskEvent } func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*ReplayTasksResult, error) { - tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, r.pool, r.l, 5000) + tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, r.pool, r.l, 30000) if err != nil { return nil, err @@ -2166,7 +2166,7 @@ func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, t }) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to list tasks for replay: %w", err) } lockedTaskIds := make([]int64, len(lockedTasks)) @@ -2201,7 +2201,7 @@ func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, t }) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to lock DAGs for replay: %w", err) } successfullyLockedDAGsMap := make(map[int64]bool) @@ -2222,7 +2222,7 @@ func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, t }) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to preflight check DAGs for replay: %w", err) } for _, dag := range preflightDAGs { @@ -2239,7 +2239,7 @@ func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, t }) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to preflight check tasks for replay: %w", err) } for _, task := range failedPreflightChecks { @@ -2301,6 +2301,30 @@ func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, t } } + if task.DagID.Valid && task.JobKind == sqlcv1.JobKindONFAILURE { + // we need to check if there are other steps in the subtree + doesOnFailureHaveOtherSteps := false + + for stepId := range subtreeStepIds[task.DagID.Int64] { + if stepId == sqlchelpers.UUIDToStr(task.StepID) { + continue + } + + doesOnFailureHaveOtherSteps = true + break + } + + if doesOnFailureHaveOtherSteps { + if _, ok := dagIdsToChildTasks[task.DagID.Int64]; !ok { + dagIdsToChildTasks[task.DagID.Int64] = make([]*sqlcv1.ListTasksForReplayRow, 0) + } + + dagIdsToChildTasks[task.DagID.Int64] = append(dagIdsToChildTasks[task.DagID.Int64], task) + + continue + } + } + replayOpts = append(replayOpts, ReplayTaskOpts{ TaskId: task.ID, InsertedAt: task.InsertedAt, @@ -2324,7 +2348,7 @@ func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, t }) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to list all tasks in DAGs: %w", err) } dagIdsToAllTasks := make(map[int64][]*sqlcv1.ListAllTasksInDagsRow) @@ -2344,7 +2368,7 @@ func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, t upsertedTasks, err = r.replayTasks(ctx, tx, tenantId, replayOpts) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to replay existing tasks: %w", err) } } @@ -2404,7 +2428,7 @@ func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, t }) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to delete matching signal events: %w", err) } } @@ -2492,14 +2516,14 @@ func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, t reconstructedMatches, candidateEvents, err := r.reconstructGroupConditions(ctx, tx, tenantId, subtreeExternalIds, eventMatches) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to reconstruct group conditions: %w", err) } // create the event matches err = r.createEventMatches(ctx, tx, tenantId, reconstructedMatches) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to create event matches: %w", err) } // process event matches @@ -2507,11 +2531,11 @@ func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, t internalMatchResults, err := r.processInternalEventMatches(ctx, tx, tenantId, candidateEvents) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to process internal event matches: %w", err) } if err := commit(ctx); err != nil { - return nil, err + return nil, fmt.Errorf("failed to commit transaction: %w", err) } return &ReplayTasksResult{ @@ -2717,12 +2741,14 @@ func uniqueSet(taskIdRetryCounts []TaskIdInsertedAtRetryCount) []TaskIdInsertedA } func (r *TaskRepositoryImpl) ListTaskParentOutputs(ctx context.Context, tenantId string, tasks []*sqlcv1.V1Task) (map[int64][]*TaskOutputEvent, error) { - taskIds := make([]int64, len(tasks)) - taskInsertedAts := make([]pgtype.Timestamptz, len(tasks)) + taskIds := make([]int64, 0) + taskInsertedAts := make([]pgtype.Timestamptz, 0) - for i, task := range tasks { - taskIds[i] = task.ID - taskInsertedAts[i] = task.InsertedAt + for _, task := range tasks { + if task.DagID.Valid { + taskIds = append(taskIds, task.ID) + taskInsertedAts = append(taskInsertedAts, task.InsertedAt) + } } res, err := r.queries.ListTaskParentOutputs(ctx, r.pool, sqlcv1.ListTaskParentOutputsParams{