fix: more v1 bug bashing (#1334)

This commit is contained in:
abelanger5
2025-03-13 17:13:04 -04:00
committed by GitHub
parent 4c1af57583
commit 4cbde4405a
10 changed files with 520 additions and 43 deletions
@@ -0,0 +1,147 @@
-- +goose Up
-- +goose StatementBegin
-- When concurrency slot is CREATED, we should check whether the parent concurrency slot exists; if not, we should create
-- the parent concurrency slot as well.
CREATE OR REPLACE FUNCTION after_v1_concurrency_slot_insert_function()
RETURNS trigger AS $$
BEGIN
WITH parent_slot AS (
SELECT
*
FROM
new_table cs
WHERE
cs.parent_strategy_id IS NOT NULL
), parent_to_child_strategy_ids AS (
SELECT
wc.id AS parent_strategy_id,
wc.tenant_id,
ps.workflow_id,
ps.workflow_version_id,
ps.workflow_run_id,
MAX(ps.sort_id) AS sort_id,
MAX(ps.priority) AS priority,
MAX(ps.key) AS key,
ARRAY_AGG(DISTINCT wc.child_strategy_ids) AS child_strategy_ids
FROM
parent_slot ps
JOIN v1_workflow_concurrency wc ON wc.workflow_id = ps.workflow_id AND wc.workflow_version_id = ps.workflow_version_id AND wc.id = ps.parent_strategy_id
GROUP BY
wc.id,
wc.tenant_id,
ps.workflow_id,
ps.workflow_version_id,
ps.workflow_run_id
)
INSERT INTO v1_workflow_concurrency_slot (
sort_id,
tenant_id,
workflow_id,
workflow_version_id,
workflow_run_id,
strategy_id,
child_strategy_ids,
priority,
key
)
SELECT
pcs.sort_id,
pcs.tenant_id,
pcs.workflow_id,
pcs.workflow_version_id,
pcs.workflow_run_id,
pcs.parent_strategy_id,
pcs.child_strategy_ids,
pcs.priority,
pcs.key
FROM
parent_to_child_strategy_ids pcs
ON CONFLICT (strategy_id, workflow_version_id, workflow_run_id) DO UPDATE
-- If there's a conflict, and we're inserting a new concurrency_slot, we'd like to remove the strategy_id
-- from the completed child strategy ids.
SET completed_child_strategy_ids = ARRAY(
SELECT DISTINCT UNNEST(ARRAY_REMOVE(v1_workflow_concurrency_slot.completed_child_strategy_ids, cs.strategy_id))
FROM new_table cs
WHERE EXCLUDED.strategy_id = cs.parent_strategy_id
);
-- If the v1_step_concurrency strategy is not active, we set it to active.
WITH inactive_strategies AS (
SELECT
strategy.*
FROM
new_table cs
JOIN
v1_step_concurrency strategy ON strategy.workflow_id = cs.workflow_id AND strategy.workflow_version_id = cs.workflow_version_id AND strategy.id = cs.strategy_id
WHERE
strategy.is_active = FALSE
ORDER BY
strategy.id
FOR UPDATE
)
UPDATE v1_step_concurrency strategy
SET is_active = TRUE
FROM inactive_strategies
WHERE
strategy.workflow_id = inactive_strategies.workflow_id AND
strategy.workflow_version_id = inactive_strategies.workflow_version_id AND
strategy.step_id = inactive_strategies.step_id AND
strategy.id = inactive_strategies.id;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION after_v1_concurrency_slot_delete_function()
RETURNS trigger AS $$
BEGIN
-- When v1_concurrency_slot is DELETED, we add it to the completed_child_strategy_ids on the parent, but only
-- when it's NOT a backoff retry. Backoff retries will continue to consume a workflow concurrency slot.
WITH parent_slot AS (
SELECT
cs.workflow_id,
cs.workflow_version_id,
cs.workflow_run_id,
cs.strategy_id,
cs.parent_strategy_id
FROM
deleted_rows cs
JOIN
v1_task t ON t.id = cs.task_id AND t.inserted_at = cs.task_inserted_at
LEFT JOIN
v1_retry_queue_item rqi ON rqi.task_id = t.id AND rqi.task_inserted_at = t.inserted_at
WHERE
cs.parent_strategy_id IS NOT NULL
AND rqi.task_id IS NULL
), locked_parent_slots AS (
SELECT
wcs.strategy_id,
wcs.workflow_version_id,
wcs.workflow_run_id,
cs.strategy_id AS child_strategy_id
FROM
v1_workflow_concurrency_slot wcs
JOIN
parent_slot cs ON (wcs.strategy_id, wcs.workflow_version_id, wcs.workflow_run_id) = (cs.parent_strategy_id, cs.workflow_version_id, cs.workflow_run_id)
ORDER BY
wcs.strategy_id,
wcs.workflow_version_id,
wcs.workflow_run_id
FOR UPDATE
)
UPDATE v1_workflow_concurrency_slot wcs
SET completed_child_strategy_ids = ARRAY(
SELECT DISTINCT UNNEST(ARRAY_APPEND(wcs.completed_child_strategy_ids, cs.child_strategy_id))
)
FROM locked_parent_slots cs
WHERE
wcs.strategy_id = cs.strategy_id
AND wcs.workflow_version_id = cs.workflow_version_id
AND wcs.workflow_run_id = cs.workflow_run_id;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
-- +goose StatementEnd
@@ -402,7 +402,9 @@ func (tc *OLAPControllerImpl) handleCreateMonitoringEvent(ctx context.Context, t
switch eventTypes[i] {
case sqlcv1.V1EventTypeOlapFINISHED:
event.Output = []byte(eventPayloads[i])
if eventPayloads[i] != "" {
event.Output = []byte(eventPayloads[i])
}
case sqlcv1.V1EventTypeOlapFAILED:
event.ErrorMessage = sqlchelpers.TextFromStr(eventPayloads[i])
case sqlcv1.V1EventTypeOlapCANCELLED:
@@ -2,6 +2,7 @@ package dispatcher
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
@@ -14,6 +15,7 @@ import (
tasktypesv1 "github.com/hatchet-dev/hatchet/internal/services/shared/tasktypes/v1"
"github.com/hatchet-dev/hatchet/internal/telemetry"
"github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers"
v1 "github.com/hatchet-dev/hatchet/pkg/repository/v1"
"github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1"
)
@@ -122,6 +124,48 @@ func (d *DispatcherImpl) handleTaskBulkAssignedTask(ctx context.Context, msg *ms
return fmt.Errorf("could not bulk list step run data: %w", err)
}
parentDataMap, err := d.repov1.Tasks().ListTaskParentOutputs(ctx, msg.TenantID, bulkDatas)
if err != nil {
return fmt.Errorf("could not list parent data: %w", err)
}
for _, task := range bulkDatas {
if parentData, ok := parentDataMap[task.ID]; ok {
currInput := &v1.V1StepRunData{}
if task.Input != nil {
err := json.Unmarshal(task.Input, currInput)
if err != nil {
d.l.Warn().Err(err).Msg("failed to unmarshal input")
continue
}
}
readableIdToData := make(map[string]map[string]interface{})
for _, outputEvent := range parentData {
outputMap := make(map[string]interface{})
if outputEvent.Output != nil {
err := json.Unmarshal(outputEvent.Output, &outputMap)
if err != nil {
d.l.Warn().Err(err).Msg("failed to unmarshal output")
continue
}
}
readableIdToData[outputEvent.StepReadableID] = outputMap
}
currInput.Parents = readableIdToData
task.Input = currInput.Bytes()
}
}
taskIdToData := make(map[int64]*sqlcv1.V1Task)
for _, task := range bulkDatas {
+1 -2
View File
@@ -84,8 +84,7 @@ func (s *sharedRepository) ToV1StepRunData(t *TaskInput) *V1StepRunData {
err := json.Unmarshal(data.Output, &dataMap)
if err != nil {
s.l.Error().Err(err).Msg("failed to unmarshal output")
continue
s.l.Warn().Err(err).Msg("failed to unmarshal output")
}
parents[stepReadableId] = dataMap
+4 -1
View File
@@ -390,7 +390,10 @@ WITH latest_retry_count AS (
WHERE parent_task_external_id = (
SELECT external_id
FROM v1_tasks_olap
WHERE id = @taskId::bigint
WHERE
tenant_id = @tenantId::uuid
AND id = @taskId::bigint
AND inserted_at = @taskInsertedAt::timestamptz
LIMIT 1
)
)
+4 -1
View File
@@ -954,7 +954,10 @@ WITH latest_retry_count AS (
WHERE parent_task_external_id = (
SELECT external_id
FROM v1_tasks_olap
WHERE id = $2::bigint
WHERE
tenant_id = $1::uuid
AND id = $2::bigint
AND inserted_at = $3::timestamptz
LIMIT 1
)
)
+73
View File
@@ -729,6 +729,79 @@ JOIN
LEFT JOIN
step_orders so ON so.step_id = t.step_id;
-- name: ListTaskParentOutputs :many
-- Lists the outputs of parent steps for a list of tasks. This is recursive because it looks at all grandparents
-- of the tasks as well.
WITH RECURSIVE augmented_tasks AS (
-- First, select the tasks from the input
SELECT
id,
inserted_at,
retry_count,
tenant_id,
dag_id,
dag_inserted_at,
step_readable_id,
workflow_run_id,
step_id,
workflow_id
FROM
v1_task
WHERE
(id, inserted_at) IN (
SELECT
unnest(@taskIds::bigint[]),
unnest(@taskInsertedAts::timestamptz[])
)
AND tenant_id = @tenantId::uuid
UNION
-- Then, select the tasks that are parents of the input tasks
SELECT
t.id,
t.inserted_at,
t.retry_count,
t.tenant_id,
t.dag_id,
t.dag_inserted_at,
t.step_readable_id,
t.workflow_run_id,
t.step_id,
t.workflow_id
FROM
augmented_tasks at
JOIN
"Step" s1 ON s1."id" = at.step_id
JOIN
v1_dag_to_task dt ON dt.dag_id = at.dag_id
JOIN
v1_task t ON t.id = dt.task_id
JOIN
"Step" s2 ON s2."id" = t.step_id
JOIN
"_StepOrder" so ON so."A" = s2."id" AND so."B" = s1."id"
)
SELECT
DISTINCT ON (at.id, at.inserted_at, at.retry_count)
at.id,
at.inserted_at,
at.retry_count,
at.tenant_id,
at.dag_id,
at.dag_inserted_at,
at.step_readable_id,
at.workflow_run_id,
at.step_id,
at.workflow_id,
e.data AS output
FROM
augmented_tasks at
JOIN
v1_task_event e ON e.task_id = at.id AND e.task_inserted_at = at.inserted_at AND e.retry_count = at.retry_count
WHERE
e.event_type = 'COMPLETED';
-- name: LockDAGsForReplay :many
-- Locks a list of DAGs for replay. Returns successfully locked DAGs which can be replayed.
SELECT
+126
View File
@@ -751,6 +751,132 @@ func (q *Queries) ListTaskMetas(ctx context.Context, db DBTX, arg ListTaskMetasP
return items, nil
}
const listTaskParentOutputs = `-- name: ListTaskParentOutputs :many
WITH RECURSIVE augmented_tasks AS (
-- First, select the tasks from the input
SELECT
id,
inserted_at,
retry_count,
tenant_id,
dag_id,
dag_inserted_at,
step_readable_id,
workflow_run_id,
step_id,
workflow_id
FROM
v1_task
WHERE
(id, inserted_at) IN (
SELECT
unnest($1::bigint[]),
unnest($2::timestamptz[])
)
AND tenant_id = $3::uuid
UNION
-- Then, select the tasks that are parents of the input tasks
SELECT
t.id,
t.inserted_at,
t.retry_count,
t.tenant_id,
t.dag_id,
t.dag_inserted_at,
t.step_readable_id,
t.workflow_run_id,
t.step_id,
t.workflow_id
FROM
augmented_tasks at
JOIN
"Step" s1 ON s1."id" = at.step_id
JOIN
v1_dag_to_task dt ON dt.dag_id = at.dag_id
JOIN
v1_task t ON t.id = dt.task_id
JOIN
"Step" s2 ON s2."id" = t.step_id
JOIN
"_StepOrder" so ON so."A" = s2."id" AND so."B" = s1."id"
)
SELECT
DISTINCT ON (at.id, at.inserted_at, at.retry_count)
at.id,
at.inserted_at,
at.retry_count,
at.tenant_id,
at.dag_id,
at.dag_inserted_at,
at.step_readable_id,
at.workflow_run_id,
at.step_id,
at.workflow_id,
e.data AS output
FROM
augmented_tasks at
JOIN
v1_task_event e ON e.task_id = at.id AND e.task_inserted_at = at.inserted_at AND e.retry_count = at.retry_count
WHERE
e.event_type = 'COMPLETED'
`
type ListTaskParentOutputsParams struct {
Taskids []int64 `json:"taskids"`
Taskinsertedats []pgtype.Timestamptz `json:"taskinsertedats"`
Tenantid pgtype.UUID `json:"tenantid"`
}
type ListTaskParentOutputsRow struct {
ID int64 `json:"id"`
InsertedAt pgtype.Timestamptz `json:"inserted_at"`
RetryCount int32 `json:"retry_count"`
TenantID pgtype.UUID `json:"tenant_id"`
DagID pgtype.Int8 `json:"dag_id"`
DagInsertedAt pgtype.Timestamptz `json:"dag_inserted_at"`
StepReadableID string `json:"step_readable_id"`
WorkflowRunID pgtype.UUID `json:"workflow_run_id"`
StepID pgtype.UUID `json:"step_id"`
WorkflowID pgtype.UUID `json:"workflow_id"`
Output []byte `json:"output"`
}
// Lists the outputs of parent steps for a list of tasks. This is recursive because it looks at all grandparents
// of the tasks as well.
func (q *Queries) ListTaskParentOutputs(ctx context.Context, db DBTX, arg ListTaskParentOutputsParams) ([]*ListTaskParentOutputsRow, error) {
rows, err := db.Query(ctx, listTaskParentOutputs, arg.Taskids, arg.Taskinsertedats, arg.Tenantid)
if err != nil {
return nil, err
}
defer rows.Close()
var items []*ListTaskParentOutputsRow
for rows.Next() {
var i ListTaskParentOutputsRow
if err := rows.Scan(
&i.ID,
&i.InsertedAt,
&i.RetryCount,
&i.TenantID,
&i.DagID,
&i.DagInsertedAt,
&i.StepReadableID,
&i.WorkflowRunID,
&i.StepID,
&i.WorkflowID,
&i.Output,
); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listTasks = `-- name: ListTasks :many
SELECT
id, inserted_at, tenant_id, queue, action_id, step_id, step_readable_id, workflow_id, workflow_version_id, workflow_run_id, schedule_timeout, step_timeout, priority, sticky, desired_worker_id, external_id, display_name, input, retry_count, internal_retry_count, app_retry_count, step_index, additional_metadata, dag_id, dag_inserted_at, parent_task_external_id, parent_task_id, parent_task_inserted_at, child_index, child_key, initial_state, initial_state_reason, concurrency_parent_strategy_ids, concurrency_strategy_ids, concurrency_keys, retry_backoff_factor, retry_max_backoff
+78 -21
View File
@@ -200,6 +200,10 @@ type TaskRepository interface {
ListFinalizedWorkflowRuns(ctx context.Context, tenantId string, rootExternalIds []string) ([]*ListFinalizedWorkflowRunsResponse, error)
// ListTaskParentOutputs is a method to return the output of a task's parent and grandparent tasks. This is for v0 compatibility
// with the v1 engine, and shouldn't be called from new v1 endpoints.
ListTaskParentOutputs(ctx context.Context, tenantId string, tasks []*sqlcv1.V1Task) (map[int64][]*TaskOutputEvent, error)
ProcessTaskTimeouts(ctx context.Context, tenantId string) ([]*sqlcv1.ProcessTaskTimeoutsRow, bool, error)
ProcessTaskReassignments(ctx context.Context, tenantId string) ([]*sqlcv1.ProcessTaskReassignmentsRow, bool, error)
@@ -579,27 +583,6 @@ func (r *TaskRepositoryImpl) FailTasks(ctx context.Context, tenantId string, fai
defer rollback()
// release queue items
releasedTasks, err := r.releaseTasks(ctx, tx, tenantId, tasks)
if err != nil {
return nil, err
}
if len(tasks) != len(releasedTasks) {
return nil, fmt.Errorf("failed to release all tasks")
}
datas := make([][]byte, len(releasedTasks))
externalIds := make([]string, len(releasedTasks))
for i, releasedTask := range releasedTasks {
out := NewFailedTaskOutputEvent(releasedTask, failureOpts[i].ErrorMessage)
datas[i] = out.Bytes()
externalIds[i] = sqlchelpers.UUIDToStr(releasedTask.ExternalID)
}
retriedTasks := make([]RetriedTask, 0)
// write app failures
@@ -653,6 +636,29 @@ func (r *TaskRepositoryImpl) FailTasks(ctx context.Context, tenantId string, fai
}
}
// release queue items
// NOTE: it's important that we do this after we've written the retries, as some of the triggers for concurrency
// slots case on the retry queue item's existence.
releasedTasks, err := r.releaseTasks(ctx, tx, tenantId, tasks)
if err != nil {
return nil, err
}
if len(tasks) != len(releasedTasks) {
return nil, fmt.Errorf("failed to release all tasks")
}
datas := make([][]byte, len(releasedTasks))
externalIds := make([]string, len(releasedTasks))
for i, releasedTask := range releasedTasks {
out := NewFailedTaskOutputEvent(releasedTask, failureOpts[i].ErrorMessage)
datas[i] = out.Bytes()
externalIds[i] = sqlchelpers.UUIDToStr(releasedTask.ExternalID)
}
// write task events
internalEvents, err := r.createTaskEvents(
ctx,
@@ -2605,3 +2611,54 @@ func uniqueSet(taskIdRetryCounts []TaskIdInsertedAtRetryCount) []TaskIdInsertedA
return res
}
func (r *TaskRepositoryImpl) ListTaskParentOutputs(ctx context.Context, tenantId string, tasks []*sqlcv1.V1Task) (map[int64][]*TaskOutputEvent, error) {
taskIds := make([]int64, len(tasks))
taskInsertedAts := make([]pgtype.Timestamptz, len(tasks))
for i, task := range tasks {
taskIds[i] = task.ID
taskInsertedAts[i] = task.InsertedAt
}
res, err := r.queries.ListTaskParentOutputs(ctx, r.pool, sqlcv1.ListTaskParentOutputsParams{
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
Taskids: taskIds,
Taskinsertedats: taskInsertedAts,
})
if err != nil {
return nil, err
}
workflowRunIdsToOutputs := make(map[string][]*TaskOutputEvent)
for _, outputTask := range res {
if outputTask.WorkflowRunID.Valid {
wrId := sqlchelpers.UUIDToStr(outputTask.WorkflowRunID)
e, err := newTaskEventFromBytes(outputTask.Output)
if err != nil {
r.l.Warn().Msgf("failed to parse task output: %v", err)
continue
}
workflowRunIdsToOutputs[wrId] = append(workflowRunIdsToOutputs[wrId], e)
}
}
resMap := make(map[int64][]*TaskOutputEvent)
for _, task := range tasks {
if task.WorkflowRunID.Valid {
wrId := sqlchelpers.UUIDToStr(task.WorkflowRunID)
if events, ok := workflowRunIdsToOutputs[wrId]; ok {
resMap[task.ID] = events
}
}
}
return resMap, nil
}
+40 -17
View File
@@ -517,17 +517,31 @@ RETURNS trigger AS $$
BEGIN
WITH parent_slot AS (
SELECT
cs.workflow_id, cs.workflow_version_id, cs.workflow_run_id, cs.strategy_id, cs.parent_strategy_id
*
FROM
new_table cs
WHERE
cs.parent_strategy_id IS NOT NULL
), parent_to_child_strategy_ids AS (
SELECT
wc.child_strategy_ids, wc.id
wc.id AS parent_strategy_id,
wc.tenant_id,
ps.workflow_id,
ps.workflow_version_id,
ps.workflow_run_id,
MAX(ps.sort_id) AS sort_id,
MAX(ps.priority) AS priority,
MAX(ps.key) AS key,
ARRAY_AGG(DISTINCT wc.child_strategy_ids) AS child_strategy_ids
FROM
parent_slot ps
JOIN v1_workflow_concurrency wc ON wc.workflow_id = ps.workflow_id AND wc.workflow_version_id = ps.workflow_version_id AND wc.id = ps.parent_strategy_id
GROUP BY
wc.id,
wc.tenant_id,
ps.workflow_id,
ps.workflow_version_id,
ps.workflow_run_id
)
INSERT INTO v1_workflow_concurrency_slot (
sort_id,
@@ -541,22 +555,25 @@ BEGIN
key
)
SELECT
cs.sort_id,
cs.tenant_id,
cs.workflow_id,
cs.workflow_version_id,
cs.workflow_run_id,
cs.parent_strategy_id,
pcs.sort_id,
pcs.tenant_id,
pcs.workflow_id,
pcs.workflow_version_id,
pcs.workflow_run_id,
pcs.parent_strategy_id,
pcs.child_strategy_ids,
cs.priority,
cs.key
pcs.priority,
pcs.key
FROM
new_table cs
JOIN
parent_to_child_strategy_ids pcs ON pcs.id = cs.parent_strategy_id
WHERE
cs.parent_strategy_id IS NOT NULL
ON CONFLICT (strategy_id, workflow_version_id, workflow_run_id) DO NOTHING;
parent_to_child_strategy_ids pcs
ON CONFLICT (strategy_id, workflow_version_id, workflow_run_id) DO UPDATE
-- If there's a conflict, and we're inserting a new concurrency_slot, we'd like to remove the strategy_id
-- from the completed child strategy ids.
SET completed_child_strategy_ids = ARRAY(
SELECT DISTINCT UNNEST(ARRAY_REMOVE(v1_workflow_concurrency_slot.completed_child_strategy_ids, cs.strategy_id))
FROM new_table cs
WHERE EXCLUDED.strategy_id = cs.parent_strategy_id
);
-- If the v1_step_concurrency strategy is not active, we set it to active.
WITH inactive_strategies AS (
@@ -595,7 +612,8 @@ EXECUTE FUNCTION after_v1_concurrency_slot_insert_function();
CREATE OR REPLACE FUNCTION after_v1_concurrency_slot_delete_function()
RETURNS trigger AS $$
BEGIN
-- When v1_concurrency_slot is DELETED, we add it to the completed_child_strategy_ids on the parent.
-- When v1_concurrency_slot is DELETED, we add it to the completed_child_strategy_ids on the parent, but only
-- when it's NOT a backoff retry. Backoff retries will continue to consume a workflow concurrency slot.
WITH parent_slot AS (
SELECT
cs.workflow_id,
@@ -605,8 +623,13 @@ BEGIN
cs.parent_strategy_id
FROM
deleted_rows cs
JOIN
v1_task t ON t.id = cs.task_id AND t.inserted_at = cs.task_inserted_at
LEFT JOIN
v1_retry_queue_item rqi ON rqi.task_id = t.id AND rqi.task_inserted_at = t.inserted_at
WHERE
cs.parent_strategy_id IS NOT NULL
AND rqi.task_id IS NULL
), locked_parent_slots AS (
SELECT
wcs.strategy_id,