chore(v1): small improvements to replay/parent task lookup (#1346)

* small tweaks to replay/parent task lookup

* some more improvments
This commit is contained in:
abelanger5
2025-03-15 09:15:57 -04:00
committed by GitHub
parent 7ad251df26
commit 5c647e247e
4 changed files with 54 additions and 22 deletions

View File

@@ -127,7 +127,7 @@ func (d *DispatcherImpl) handleTaskBulkAssignedTask(ctx context.Context, msg *ms
parentDataMap, err := d.repov1.Tasks().ListTaskParentOutputs(ctx, msg.TenantID, bulkDatas)
if err != nil {
return fmt.Errorf("could not list parent data: %w", err)
return fmt.Errorf("could not list parent data for %d tasks: %w", len(bulkDatas), err)
}
for _, task := range bulkDatas {

View File

@@ -626,6 +626,7 @@ WITH RECURSIVE augmented_tasks AS (
-- First, select the tasks from the input
SELECT
id,
inserted_at,
tenant_id,
dag_id,
dag_inserted_at,
@@ -645,6 +646,7 @@ WITH RECURSIVE augmented_tasks AS (
-- Then, select the tasks that are children of the input tasks
SELECT
t.id,
t.inserted_at,
t.tenant_id,
t.dag_id,
t.dag_inserted_at,
@@ -683,8 +685,9 @@ WITH RECURSIVE augmented_tasks AS (
FROM
v1_task t
JOIN
-- TODO: USE INSERTED_AT
augmented_tasks at ON at.id = t.id AND at.tenant_id = t.tenant_id
augmented_tasks at ON at.id = t.id AND at.inserted_at = t.inserted_at
WHERE
t.tenant_id = @tenantId::uuid
-- order by the task id to get a stable lock order
ORDER BY
id

View File

@@ -955,6 +955,7 @@ WITH RECURSIVE augmented_tasks AS (
-- First, select the tasks from the input
SELECT
id,
inserted_at,
tenant_id,
dag_id,
dag_inserted_at,
@@ -974,6 +975,7 @@ WITH RECURSIVE augmented_tasks AS (
-- Then, select the tasks that are children of the input tasks
SELECT
t.id,
t.inserted_at,
t.tenant_id,
t.dag_id,
t.dag_inserted_at,
@@ -1012,8 +1014,9 @@ WITH RECURSIVE augmented_tasks AS (
FROM
v1_task t
JOIN
-- TODO: USE INSERTED_AT
augmented_tasks at ON at.id = t.id AND at.tenant_id = t.tenant_id
augmented_tasks at ON at.id = t.id AND at.inserted_at = t.inserted_at
WHERE
t.tenant_id = $3::uuid
-- order by the task id to get a stable lock order
ORDER BY
id

View File

@@ -2142,7 +2142,7 @@ func makeEventTypeArr(status sqlcv1.V1TaskEventType, n int) []sqlcv1.V1TaskEvent
}
func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*ReplayTasksResult, error) {
tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, r.pool, r.l, 5000)
tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, r.pool, r.l, 30000)
if err != nil {
return nil, err
@@ -2166,7 +2166,7 @@ func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, t
})
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to list tasks for replay: %w", err)
}
lockedTaskIds := make([]int64, len(lockedTasks))
@@ -2201,7 +2201,7 @@ func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, t
})
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to lock DAGs for replay: %w", err)
}
successfullyLockedDAGsMap := make(map[int64]bool)
@@ -2222,7 +2222,7 @@ func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, t
})
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to preflight check DAGs for replay: %w", err)
}
for _, dag := range preflightDAGs {
@@ -2239,7 +2239,7 @@ func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, t
})
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to preflight check tasks for replay: %w", err)
}
for _, task := range failedPreflightChecks {
@@ -2301,6 +2301,30 @@ func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, t
}
}
if task.DagID.Valid && task.JobKind == sqlcv1.JobKindONFAILURE {
// we need to check if there are other steps in the subtree
doesOnFailureHaveOtherSteps := false
for stepId := range subtreeStepIds[task.DagID.Int64] {
if stepId == sqlchelpers.UUIDToStr(task.StepID) {
continue
}
doesOnFailureHaveOtherSteps = true
break
}
if doesOnFailureHaveOtherSteps {
if _, ok := dagIdsToChildTasks[task.DagID.Int64]; !ok {
dagIdsToChildTasks[task.DagID.Int64] = make([]*sqlcv1.ListTasksForReplayRow, 0)
}
dagIdsToChildTasks[task.DagID.Int64] = append(dagIdsToChildTasks[task.DagID.Int64], task)
continue
}
}
replayOpts = append(replayOpts, ReplayTaskOpts{
TaskId: task.ID,
InsertedAt: task.InsertedAt,
@@ -2324,7 +2348,7 @@ func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, t
})
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to list all tasks in DAGs: %w", err)
}
dagIdsToAllTasks := make(map[int64][]*sqlcv1.ListAllTasksInDagsRow)
@@ -2344,7 +2368,7 @@ func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, t
upsertedTasks, err = r.replayTasks(ctx, tx, tenantId, replayOpts)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to replay existing tasks: %w", err)
}
}
@@ -2404,7 +2428,7 @@ func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, t
})
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to delete matching signal events: %w", err)
}
}
@@ -2492,14 +2516,14 @@ func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, t
reconstructedMatches, candidateEvents, err := r.reconstructGroupConditions(ctx, tx, tenantId, subtreeExternalIds, eventMatches)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to reconstruct group conditions: %w", err)
}
// create the event matches
err = r.createEventMatches(ctx, tx, tenantId, reconstructedMatches)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to create event matches: %w", err)
}
// process event matches
@@ -2507,11 +2531,11 @@ func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, t
internalMatchResults, err := r.processInternalEventMatches(ctx, tx, tenantId, candidateEvents)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to process internal event matches: %w", err)
}
if err := commit(ctx); err != nil {
return nil, err
return nil, fmt.Errorf("failed to commit transaction: %w", err)
}
return &ReplayTasksResult{
@@ -2717,12 +2741,14 @@ func uniqueSet(taskIdRetryCounts []TaskIdInsertedAtRetryCount) []TaskIdInsertedA
}
func (r *TaskRepositoryImpl) ListTaskParentOutputs(ctx context.Context, tenantId string, tasks []*sqlcv1.V1Task) (map[int64][]*TaskOutputEvent, error) {
taskIds := make([]int64, len(tasks))
taskInsertedAts := make([]pgtype.Timestamptz, len(tasks))
taskIds := make([]int64, 0)
taskInsertedAts := make([]pgtype.Timestamptz, 0)
for i, task := range tasks {
taskIds[i] = task.ID
taskInsertedAts[i] = task.InsertedAt
for _, task := range tasks {
if task.DagID.Valid {
taskIds = append(taskIds, task.ID)
taskInsertedAts = append(taskInsertedAts, task.InsertedAt)
}
}
res, err := r.queries.ListTaskParentOutputs(ctx, r.pool, sqlcv1.ListTaskParentOutputsParams{