revert: removing replay logic (#1864)

* revert: removing input from replay

* add to replayopt as well

* add a comment
This commit is contained in:
abelanger5
2025-06-13 18:28:22 -04:00
committed by GitHub
parent 1f63af1181
commit 33d1bf60d6
3 changed files with 10 additions and 0 deletions

View File

@@ -604,6 +604,7 @@ WITH RECURSIVE augmented_tasks AS (
t.step_id,
t.workflow_id,
t.external_id,
t.input,
t.additional_metadata,
t.parent_task_external_id,
t.parent_task_id,
@@ -648,6 +649,7 @@ SELECT
t.step_id,
t.workflow_id,
t.external_id,
t.input,
t.additional_metadata,
t.parent_task_external_id,
t.parent_task_id,

View File

@@ -1025,6 +1025,7 @@ WITH RECURSIVE augmented_tasks AS (
t.step_id,
t.workflow_id,
t.external_id,
t.input,
t.additional_metadata,
t.parent_task_external_id,
t.parent_task_id,
@@ -1069,6 +1070,7 @@ SELECT
t.step_id,
t.workflow_id,
t.external_id,
t.input,
t.additional_metadata,
t.parent_task_external_id,
t.parent_task_id,
@@ -1104,6 +1106,7 @@ type ListTasksForReplayRow struct {
StepID pgtype.UUID `json:"step_id"`
WorkflowID pgtype.UUID `json:"workflow_id"`
ExternalID pgtype.UUID `json:"external_id"`
Input []byte `json:"input"`
AdditionalMetadata []byte `json:"additional_metadata"`
ParentTaskExternalID pgtype.UUID `json:"parent_task_external_id"`
ParentTaskID pgtype.Int8 `json:"parent_task_id"`
@@ -1136,6 +1139,7 @@ func (q *Queries) ListTasksForReplay(ctx context.Context, db DBTX, arg ListTasks
&i.StepID,
&i.WorkflowID,
&i.ExternalID,
&i.Input,
&i.AdditionalMetadata,
&i.ParentTaskExternalID,
&i.ParentTaskID,

View File

@@ -2585,6 +2585,10 @@ func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, t
ExternalId: sqlchelpers.UUIDToStr(task.ExternalID),
InitialState: sqlcv1.V1TaskInitialStateQUEUED,
AdditionalMetadata: task.AdditionalMetadata,
// NOTE: we require the input to be passed in to the replay method so we can re-evaluate the concurrency keys
// Ideally we could preserve the same concurrency keys, but the replay tasks method is currently unaware of existing concurrency
// keys because they may change between retries.
Input: r.newTaskInputFromExistingBytes(task.Input),
})
}