From 33d1bf60d64fc20e705cdc1eb78674f311ebce00 Mon Sep 17 00:00:00 2001 From: abelanger5 Date: Fri, 13 Jun 2025 18:28:22 -0400 Subject: [PATCH] revert: removing replay logic (#1864) * revert: removing input from replay * add to replayopt as well * add a comment --- pkg/repository/v1/sqlcv1/tasks.sql | 2 ++ pkg/repository/v1/sqlcv1/tasks.sql.go | 4 ++++ pkg/repository/v1/task.go | 4 ++++ 3 files changed, 10 insertions(+) diff --git a/pkg/repository/v1/sqlcv1/tasks.sql b/pkg/repository/v1/sqlcv1/tasks.sql index b14e81337..bf4c8ae2d 100644 --- a/pkg/repository/v1/sqlcv1/tasks.sql +++ b/pkg/repository/v1/sqlcv1/tasks.sql @@ -604,6 +604,7 @@ WITH RECURSIVE augmented_tasks AS ( t.step_id, t.workflow_id, t.external_id, + t.input, t.additional_metadata, t.parent_task_external_id, t.parent_task_id, @@ -648,6 +649,7 @@ SELECT t.step_id, t.workflow_id, t.external_id, + t.input, t.additional_metadata, t.parent_task_external_id, t.parent_task_id, diff --git a/pkg/repository/v1/sqlcv1/tasks.sql.go b/pkg/repository/v1/sqlcv1/tasks.sql.go index 48568427f..377cd6340 100644 --- a/pkg/repository/v1/sqlcv1/tasks.sql.go +++ b/pkg/repository/v1/sqlcv1/tasks.sql.go @@ -1025,6 +1025,7 @@ WITH RECURSIVE augmented_tasks AS ( t.step_id, t.workflow_id, t.external_id, + t.input, t.additional_metadata, t.parent_task_external_id, t.parent_task_id, @@ -1069,6 +1070,7 @@ SELECT t.step_id, t.workflow_id, t.external_id, + t.input, t.additional_metadata, t.parent_task_external_id, t.parent_task_id, @@ -1104,6 +1106,7 @@ type ListTasksForReplayRow struct { StepID pgtype.UUID `json:"step_id"` WorkflowID pgtype.UUID `json:"workflow_id"` ExternalID pgtype.UUID `json:"external_id"` + Input []byte `json:"input"` AdditionalMetadata []byte `json:"additional_metadata"` ParentTaskExternalID pgtype.UUID `json:"parent_task_external_id"` ParentTaskID pgtype.Int8 `json:"parent_task_id"` @@ -1136,6 +1139,7 @@ func (q *Queries) ListTasksForReplay(ctx context.Context, db DBTX, arg ListTasks &i.StepID, &i.WorkflowID, &i.ExternalID, + &i.Input, &i.AdditionalMetadata, &i.ParentTaskExternalID, &i.ParentTaskID, diff --git a/pkg/repository/v1/task.go b/pkg/repository/v1/task.go index 707f775cf..b293ba3aa 100644 --- a/pkg/repository/v1/task.go +++ b/pkg/repository/v1/task.go @@ -2585,6 +2585,10 @@ func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, t ExternalId: sqlchelpers.UUIDToStr(task.ExternalID), InitialState: sqlcv1.V1TaskInitialStateQUEUED, AdditionalMetadata: task.AdditionalMetadata, + // NOTE: we require the input to be passed in to the replay method so we can re-evaluate the concurrency keys + // Ideally we could preserve the same concurrency keys, but the replay tasks method is currently unaware of existing concurrency + // keys because they may change between retries. + Input: r.newTaskInputFromExistingBytes(task.Input), }) }