debug: more deadlocking

This commit is contained in:
mrkaye97
2025-08-20 15:44:37 -04:00
parent f26912c727
commit ea57344e7d
6 changed files with 124 additions and 291 deletions

View File

@@ -283,18 +283,31 @@ FROM
-- name: RunParentCancelInProgress :exec
WITH eligible_running_slots AS (
WITH locked_workflow_concurrency_slots AS (
SELECT *
FROM v1_workflow_concurrency_slot
WHERE (strategy_id, workflow_version_id, workflow_run_id) IN (
SELECT
strategy_id,
workflow_version_id,
workflow_run_id
FROM
tmp_workflow_concurrency_slot
)
ORDER BY strategy_id, workflow_version_id, workflow_run_id
FOR UPDATE
), eligible_running_slots AS (
SELECT wsc.*
FROM (
SELECT DISTINCT key
FROM tmp_workflow_concurrency_slot
FROM locked_workflow_concurrency_slots
WHERE
tenant_id = @tenantId::uuid
AND strategy_id = @strategyId::bigint
) distinct_keys
JOIN LATERAL (
SELECT *
FROM tmp_workflow_concurrency_slot wcs_all
FROM locked_workflow_concurrency_slots wcs_all
WHERE
wcs_all.key = distinct_keys.key
AND wcs_all.tenant_id = @tenantId::uuid
@@ -318,7 +331,7 @@ WITH eligible_running_slots AS (
)
ORDER BY
strategy_id, workflow_version_id, workflow_run_id
FOR UPDATE
FOR UPDATE SKIP LOCKED
), update_tmp_table AS (
UPDATE
tmp_workflow_concurrency_slot wsc

View File

@@ -830,18 +830,31 @@ func (q *Queries) RunGroupRoundRobin(ctx context.Context, db DBTX, arg RunGroupR
}
const runParentCancelInProgress = `-- name: RunParentCancelInProgress :exec
WITH eligible_running_slots AS (
WITH locked_workflow_concurrency_slots AS (
SELECT sort_id, tenant_id, workflow_id, workflow_version_id, workflow_run_id, strategy_id, completed_child_strategy_ids, child_strategy_ids, priority, key, is_filled
FROM v1_workflow_concurrency_slot
WHERE (strategy_id, workflow_version_id, workflow_run_id) IN (
SELECT
strategy_id,
workflow_version_id,
workflow_run_id
FROM
tmp_workflow_concurrency_slot
)
ORDER BY strategy_id, workflow_version_id, workflow_run_id
FOR UPDATE
), eligible_running_slots AS (
SELECT wsc.sort_id, wsc.tenant_id, wsc.workflow_id, wsc.workflow_version_id, wsc.workflow_run_id, wsc.strategy_id, wsc.completed_child_strategy_ids, wsc.child_strategy_ids, wsc.priority, wsc.key, wsc.is_filled
FROM (
SELECT DISTINCT key
FROM tmp_workflow_concurrency_slot
FROM locked_workflow_concurrency_slots
WHERE
tenant_id = $1::uuid
AND strategy_id = $2::bigint
) distinct_keys
JOIN LATERAL (
SELECT sort_id, tenant_id, workflow_id, workflow_version_id, workflow_run_id, strategy_id, completed_child_strategy_ids, child_strategy_ids, priority, key, is_filled
FROM tmp_workflow_concurrency_slot wcs_all
FROM locked_workflow_concurrency_slots wcs_all
WHERE
wcs_all.key = distinct_keys.key
AND wcs_all.tenant_id = $1::uuid
@@ -865,7 +878,7 @@ WITH eligible_running_slots AS (
)
ORDER BY
strategy_id, workflow_version_id, workflow_run_id
FOR UPDATE
FOR UPDATE SKIP LOCKED
), update_tmp_table AS (
UPDATE
tmp_workflow_concurrency_slot wsc

View File

@@ -2,6 +2,7 @@ package sqlcv1
import (
"context"
"encoding/json"
"github.com/jackc/pgx/v5/pgtype"
)
@@ -33,9 +34,9 @@ WITH input AS (
-- NOTE: these are nullable, so sqlc doesn't support casting to a type
unnest($18::bigint[]) AS dag_id,
unnest($19::timestamptz[]) AS dag_inserted_at,
unnest_nd_1d($20::bigint[][]) AS concurrency_parent_strategy_ids,
unnest_nd_1d($21::bigint[][]) AS concurrency_strategy_ids,
unnest_nd_1d($22::text[][]) AS concurrency_keys,
translate(jsonb_array_elements($20::jsonb)::text, '[]', '{}')::bigint[] AS concurrency_parent_strategy_ids,
translate(jsonb_array_elements($21::jsonb)::text, '[]', '{}')::bigint[] AS concurrency_strategy_ids,
translate(jsonb_array_elements($22::jsonb)::text, '[]', '{}')::text[] AS concurrency_keys,
unnest($23::text[]) AS initial_state_reason,
unnest($24::uuid[]) AS parent_task_external_id,
unnest($25::bigint[]) AS parent_task_id,
@@ -177,6 +178,21 @@ func (q *Queries) CreateTasks(ctx context.Context, db DBTX, arg CreateTasksParam
// }
// }()
parentStrategyIds, err := json.Marshal(arg.Concurrencyparentstrategyids)
if err != nil {
return nil, err
}
keys, err := json.Marshal(arg.ConcurrencyKeys)
if err != nil {
return nil, err
}
strategyIds, err := json.Marshal(arg.ConcurrencyStrategyIds)
if err != nil {
return nil, err
}
rows, err := db.Query(ctx, createTasks,
arg.Tenantids,
arg.Queues,
@@ -197,9 +213,9 @@ func (q *Queries) CreateTasks(ctx context.Context, db DBTX, arg CreateTasksParam
arg.InitialStates,
arg.Dagids,
arg.Daginsertedats,
arg.Concurrencyparentstrategyids,
arg.ConcurrencyStrategyIds,
arg.ConcurrencyKeys,
parentStrategyIds,
strategyIds,
keys,
arg.InitialStateReasons,
arg.ParentTaskExternalIds,
arg.ParentTaskIds,

View File

@@ -1,193 +0,0 @@
-- NOTE: this file doesn't typically get generated, since we need to overwrite the
-- behavior of `@dagIds` and `@dagInsertedAts` to be nullable. It can be generated
-- when we'd like to change the query.
-- name: CreateTasks :many
WITH input AS (
SELECT
*
FROM
(
SELECT
unnest(@tenantIds::uuid[]) AS tenant_id,
unnest(@queues::text[]) AS queue,
unnest(@actionIds::text[]) AS action_id,
unnest(@stepIds::uuid[]) AS step_id,
unnest(@stepReadableIds::text[]) AS step_readable_id,
unnest(@workflowIds::uuid[]) AS workflow_id,
unnest(@scheduleTimeouts::text[]) AS schedule_timeout,
unnest(@stepTimeouts::text[]) AS step_timeout,
unnest(@priorities::integer[]) AS priority,
unnest(cast(@stickies::text[] as v1_sticky_strategy[])) AS sticky,
unnest(@desiredWorkerIds::uuid[]) AS desired_worker_id,
unnest(@externalIds::uuid[]) AS external_id,
unnest(@displayNames::text[]) AS display_name,
unnest(@inputs::jsonb[]) AS input,
unnest(@retryCounts::integer[]) AS retry_count,
unnest(@additionalMetadatas::jsonb[]) AS additional_metadata,
unnest(cast(@initialStates::text[] as v1_task_initial_state[])) AS initial_state,
-- NOTE: these are nullable, so sqlc doesn't support casting to a type
unnest(@dagIds::bigint[]) AS dag_id,
unnest(@dagInsertedAts::timestamptz[]) AS dag_inserted_at
) AS subquery
)
INSERT INTO v1_task (
tenant_id,
queue,
action_id,
step_id,
step_readable_id,
workflow_id,
schedule_timeout,
step_timeout,
priority,
sticky,
desired_worker_id,
external_id,
display_name,
input,
retry_count,
additional_metadata,
initial_state,
dag_id,
dag_inserted_at
)
SELECT
i.tenant_id,
i.queue,
i.action_id,
i.step_id,
i.step_readable_id,
i.workflow_id,
i.schedule_timeout,
i.step_timeout,
i.priority,
i.sticky,
i.desired_worker_id,
i.external_id,
i.display_name,
i.input,
i.retry_count,
i.additional_metadata,
i.initial_state,
i.dag_id,
i.dag_inserted_at
FROM
input i
RETURNING
*;
-- name: CreateTaskEvents :exec
-- We get a FOR UPDATE lock on tasks to prevent concurrent writes to the task events
-- tables for each task
WITH locked_tasks AS (
SELECT
id
FROM
v1_task
WHERE
id = ANY(@taskIds::bigint[])
AND tenant_id = @tenantId::uuid
-- order by the task id to get a stable lock order
ORDER BY
id
FOR UPDATE
), input AS (
SELECT
*
FROM
(
SELECT
unnest(@taskIds::bigint[]) AS task_id,
unnest(@retryCounts::integer[]) AS retry_count,
unnest(cast(@eventTypes::text[] as v1_task_event_type[])) AS event_type,
unnest(@eventKeys::text[]) AS event_key,
unnest(@datas::jsonb[]) AS data
) AS subquery
)
INSERT INTO v1_task_event (
tenant_id,
task_id,
retry_count,
event_type,
event_key,
data
)
SELECT
@tenantId::uuid,
i.task_id,
i.retry_count,
i.event_type,
i.event_key,
i.data
FROM
input i
ON CONFLICT (tenant_id, task_id, event_type, event_key) WHERE event_key IS NOT NULL DO NOTHING;
-- name: ReplayTasks :many
-- NOTE: at this point, we assume we have a lock on tasks and therefor we can update the tasks
WITH input AS (
SELECT
*
FROM
(
SELECT
unnest(@taskIds::bigint[]) AS task_id,
unnest(@inputs::jsonb[]) AS input,
unnest(cast(@initialStates::text[] as v1_task_initial_state[])) AS initial_state,
unnest_nd_1d(@concurrencyStrategyIds::bigint[][]) AS concurrency_strategy_ids,
unnest_nd_1d(@concurrencyKeys::text[][]) AS concurrency_keys,
unnest(@initialStateReason::text[]) AS initial_state_reason
) AS subquery
)
UPDATE
v1_task
SET
retry_count = retry_count + 1,
app_retry_count = 0,
internal_retry_count = 0,
input = CASE WHEN i.input IS NOT NULL THEN i.input ELSE v1_task.input END,
initial_state = i.initial_state,
concurrency_strategy_ids = i.concurrency_strategy_ids,
concurrency_keys = i.concurrency_keys,
initial_state_reason = i.initial_state_reason
FROM
input i
WHERE
v1_task.id = i.task_id
RETURNING
v1_task.*;
-- name: CreateTaskExpressionEvals :exec
WITH input AS (
SELECT
*
FROM
(
SELECT
unnest(@taskIds::bigint[]) AS task_id,
unnest(@taskInsertedAts::timestamptz[]) AS task_inserted_at,
unnest(@keys::text[]) AS key,
unnest(@valuesStr::text[]) AS value_str,
unnest(cast(@kinds::text[] as "StepExpressionKind"[])) AS kind
) AS subquery
)
INSERT INTO v1_task_expression_eval (
key,
task_id,
task_inserted_at,
value_str,
kind
)
SELECT
i.key,
i.task_id,
i.task_inserted_at,
i.value_str,
i.kind
FROM
input i
ON CONFLICT (key, task_id, task_inserted_at, kind) DO UPDATE
SET
value_str = EXCLUDED.value_str,
value_int = EXCLUDED.value_int;

View File

@@ -1902,50 +1902,43 @@ func (r *sharedRepository) insertTasks(
return nil, fmt.Errorf("failed to upsert queues: %w", err)
}
// group by step_id
stepIdsToParams := make(map[string]sqlcv1.CreateTasksParams, 0)
for i, task := range tasks {
params, ok := stepIdsToParams[task.StepId]
if !ok {
params = sqlcv1.CreateTasksParams{
Tenantids: make([]pgtype.UUID, 0),
Queues: make([]string, 0),
Actionids: make([]string, 0),
Stepids: make([]pgtype.UUID, 0),
Stepreadableids: make([]string, 0),
Workflowids: make([]pgtype.UUID, 0),
Scheduletimeouts: make([]string, 0),
Steptimeouts: make([]string, 0),
Priorities: make([]int32, 0),
Stickies: make([]string, 0),
Desiredworkerids: make([]pgtype.UUID, 0),
Externalids: make([]pgtype.UUID, 0),
Displaynames: make([]string, 0),
Inputs: make([][]byte, 0),
Retrycounts: make([]int32, 0),
Additionalmetadatas: make([][]byte, 0),
InitialStates: make([]string, 0),
InitialStateReasons: make([]pgtype.Text, 0),
Dagids: make([]pgtype.Int8, 0),
Daginsertedats: make([]pgtype.Timestamptz, 0),
Concurrencyparentstrategyids: make([][]pgtype.Int8, 0),
ConcurrencyStrategyIds: make([][]int64, 0),
ConcurrencyKeys: make([][]string, 0),
ParentTaskExternalIds: make([]pgtype.UUID, 0),
ParentTaskIds: make([]pgtype.Int8, 0),
ParentTaskInsertedAts: make([]pgtype.Timestamptz, 0),
ChildIndex: make([]pgtype.Int8, 0),
ChildKey: make([]pgtype.Text, 0),
StepIndex: make([]int64, 0),
RetryBackoffFactor: make([]pgtype.Float8, 0),
RetryMaxBackoff: make([]pgtype.Int4, 0),
WorkflowVersionIds: make([]pgtype.UUID, 0),
WorkflowRunIds: make([]pgtype.UUID, 0),
}
}
params := sqlcv1.CreateTasksParams{
Tenantids: make([]pgtype.UUID, 0),
Queues: make([]string, 0),
Actionids: make([]string, 0),
Stepids: make([]pgtype.UUID, 0),
Stepreadableids: make([]string, 0),
Workflowids: make([]pgtype.UUID, 0),
Scheduletimeouts: make([]string, 0),
Steptimeouts: make([]string, 0),
Priorities: make([]int32, 0),
Stickies: make([]string, 0),
Desiredworkerids: make([]pgtype.UUID, 0),
Externalids: make([]pgtype.UUID, 0),
Displaynames: make([]string, 0),
Inputs: make([][]byte, 0),
Retrycounts: make([]int32, 0),
Additionalmetadatas: make([][]byte, 0),
InitialStates: make([]string, 0),
InitialStateReasons: make([]pgtype.Text, 0),
Dagids: make([]pgtype.Int8, 0),
Daginsertedats: make([]pgtype.Timestamptz, 0),
Concurrencyparentstrategyids: make([][]pgtype.Int8, 0),
ConcurrencyStrategyIds: make([][]int64, 0),
ConcurrencyKeys: make([][]string, 0),
ParentTaskExternalIds: make([]pgtype.UUID, 0),
ParentTaskIds: make([]pgtype.Int8, 0),
ParentTaskInsertedAts: make([]pgtype.Timestamptz, 0),
ChildIndex: make([]pgtype.Int8, 0),
ChildKey: make([]pgtype.Text, 0),
StepIndex: make([]int64, 0),
RetryBackoffFactor: make([]pgtype.Float8, 0),
RetryMaxBackoff: make([]pgtype.Int4, 0),
WorkflowVersionIds: make([]pgtype.UUID, 0),
WorkflowRunIds: make([]pgtype.UUID, 0),
}
for i := range tasks {
params.Tenantids = append(params.Tenantids, tenantIds[i])
params.Queues = append(params.Queues, queues[i])
params.Actionids = append(params.Actionids, actionIds[i])
@@ -1979,51 +1972,43 @@ func (r *sharedRepository) insertTasks(
params.RetryMaxBackoff = append(params.RetryMaxBackoff, retryMaxBackoffs[i])
params.WorkflowVersionIds = append(params.WorkflowVersionIds, workflowVersionIds[i])
params.WorkflowRunIds = append(params.WorkflowRunIds, workflowRunIds[i])
stepIdsToParams[task.StepId] = params
}
res := make([]*sqlcv1.V1Task, 0)
// for any initial states which are not queued, create a finalizing task event
eventTaskIdRetryCounts := make([]TaskIdInsertedAtRetryCount, 0)
eventTaskExternalIds := make([]string, 0)
eventDatas := make([][]byte, 0)
eventTypes := make([]sqlcv1.V1TaskEventType, 0)
for stepId, params := range stepIdsToParams {
createdTasks, err := r.queries.CreateTasks(ctx, tx, params)
createdTasks, err := r.queries.CreateTasks(ctx, tx, params)
if err != nil {
return nil, fmt.Errorf("failed to create tasks for step id %s: %w", stepId, err)
if err != nil {
return nil, fmt.Errorf("failed to create tasks: %w", err)
}
for _, createdTask := range createdTasks {
idRetryCount := TaskIdInsertedAtRetryCount{
Id: createdTask.ID,
InsertedAt: createdTask.InsertedAt,
RetryCount: createdTask.RetryCount,
}
res = append(res, createdTasks...)
for _, createdTask := range createdTasks {
idRetryCount := TaskIdInsertedAtRetryCount{
Id: createdTask.ID,
InsertedAt: createdTask.InsertedAt,
RetryCount: createdTask.RetryCount,
}
switch createdTask.InitialState {
case sqlcv1.V1TaskInitialStateFAILED:
eventTaskIdRetryCounts = append(eventTaskIdRetryCounts, idRetryCount)
eventTaskExternalIds = append(eventTaskExternalIds, sqlchelpers.UUIDToStr(createdTask.ExternalID))
eventDatas = append(eventDatas, NewFailedTaskOutputEventFromTask(createdTask).Bytes())
eventTypes = append(eventTypes, sqlcv1.V1TaskEventTypeFAILED)
case sqlcv1.V1TaskInitialStateCANCELLED:
eventTaskIdRetryCounts = append(eventTaskIdRetryCounts, idRetryCount)
eventTaskExternalIds = append(eventTaskExternalIds, sqlchelpers.UUIDToStr(createdTask.ExternalID))
eventDatas = append(eventDatas, NewCancelledTaskOutputEventFromTask(createdTask).Bytes())
eventTypes = append(eventTypes, sqlcv1.V1TaskEventTypeCANCELLED)
case sqlcv1.V1TaskInitialStateSKIPPED:
eventTaskIdRetryCounts = append(eventTaskIdRetryCounts, idRetryCount)
eventTaskExternalIds = append(eventTaskExternalIds, sqlchelpers.UUIDToStr(createdTask.ExternalID))
eventDatas = append(eventDatas, NewSkippedTaskOutputEventFromTask(createdTask).Bytes())
eventTypes = append(eventTypes, sqlcv1.V1TaskEventTypeCOMPLETED)
}
switch createdTask.InitialState {
case sqlcv1.V1TaskInitialStateFAILED:
eventTaskIdRetryCounts = append(eventTaskIdRetryCounts, idRetryCount)
eventTaskExternalIds = append(eventTaskExternalIds, sqlchelpers.UUIDToStr(createdTask.ExternalID))
eventDatas = append(eventDatas, NewFailedTaskOutputEventFromTask(createdTask).Bytes())
eventTypes = append(eventTypes, sqlcv1.V1TaskEventTypeFAILED)
case sqlcv1.V1TaskInitialStateCANCELLED:
eventTaskIdRetryCounts = append(eventTaskIdRetryCounts, idRetryCount)
eventTaskExternalIds = append(eventTaskExternalIds, sqlchelpers.UUIDToStr(createdTask.ExternalID))
eventDatas = append(eventDatas, NewCancelledTaskOutputEventFromTask(createdTask).Bytes())
eventTypes = append(eventTypes, sqlcv1.V1TaskEventTypeCANCELLED)
case sqlcv1.V1TaskInitialStateSKIPPED:
eventTaskIdRetryCounts = append(eventTaskIdRetryCounts, idRetryCount)
eventTaskExternalIds = append(eventTaskExternalIds, sqlchelpers.UUIDToStr(createdTask.ExternalID))
eventDatas = append(eventDatas, NewSkippedTaskOutputEventFromTask(createdTask).Bytes())
eventTypes = append(eventTypes, sqlcv1.V1TaskEventTypeCOMPLETED)
}
}
@@ -2043,7 +2028,7 @@ func (r *sharedRepository) insertTasks(
}
if len(createExpressionOpts) > 0 {
err = r.createExpressionEvals(ctx, tx, res, createExpressionOpts)
err = r.createExpressionEvals(ctx, tx, createdTasks, createExpressionOpts)
if err != nil {
return nil, fmt.Errorf("failed to create expression evals: %w", err)
@@ -2053,7 +2038,7 @@ func (r *sharedRepository) insertTasks(
// TODO: this should be moved to after the transaction commits
saveQueueCache()
return res, nil
return createdTasks, nil
}
// replayTasks updates tasks into the database. note that we're using Postgres rules to automatically insert the created

View File

@@ -737,6 +737,7 @@ BEGIN
ps.workflow_id,
ps.workflow_version_id,
ps.workflow_run_id
ORDER BY wc.id, ps.workflow_version_id, ps.workflow_run_id
)
INSERT INTO v1_workflow_concurrency_slot (
sort_id,
@@ -810,11 +811,9 @@ BEGIN
FROM v1_workflow_concurrency_slot
WHERE strategy_id = p_strategy_id
AND workflow_version_id = p_workflow_version_id
AND workflow_run_id = p_workflow_run_id;
-- Acquire an advisory lock for the strategy ID
-- There is a small chance of collisions but it's extremely unlikely
PERFORM pg_advisory_xact_lock(1000000 * p_strategy_id + v_sort_id);
AND workflow_run_id = p_workflow_run_id
ORDER BY strategy_id, workflow_version_id, workflow_run_id
FOR UPDATE;
WITH final_concurrency_slots_for_dags AS (
-- If the workflow run id corresponds to a DAG, we get workflow concurrency slots