From 2a8ba155fab971bd0916c5d990e3e06556b65455 Mon Sep 17 00:00:00 2001 From: abelanger5 Date: Mon, 25 Aug 2025 12:54:08 -0400 Subject: [PATCH] fix: match and cancel newest/in progress deadlocks (#2190) --- .../migrations/20250822193023_v1_0_39.sql | 304 ++++++++++++++++++ pkg/repository/v1/sqlcv1/concurrency.sql | 38 ++- pkg/repository/v1/sqlcv1/concurrency.sql.go | 38 ++- pkg/repository/v1/sqlcv1/matches.sql | 33 +- pkg/repository/v1/sqlcv1/matches.sql.go | 33 +- pkg/repository/v1/sqlcv1/tasks.sql | 19 ++ pkg/repository/v1/sqlcv1/tasks.sql.go | 31 ++ pkg/repository/v1/task.go | 28 ++ sql/schema/v1-core.sql | 11 - 9 files changed, 476 insertions(+), 59 deletions(-) create mode 100644 cmd/hatchet-migrate/migrate/migrations/20250822193023_v1_0_39.sql diff --git a/cmd/hatchet-migrate/migrate/migrations/20250822193023_v1_0_39.sql b/cmd/hatchet-migrate/migrate/migrations/20250822193023_v1_0_39.sql new file mode 100644 index 000000000..eb82118ac --- /dev/null +++ b/cmd/hatchet-migrate/migrate/migrations/20250822193023_v1_0_39.sql @@ -0,0 +1,304 @@ +-- +goose Up +-- +goose StatementBegin +CREATE OR REPLACE FUNCTION v1_task_insert_function() +RETURNS TRIGGER AS $$ +DECLARE + rec RECORD; +BEGIN + WITH new_slot_rows AS ( + SELECT + id, + inserted_at, + retry_count, + tenant_id, + priority, + concurrency_parent_strategy_ids[1] AS parent_strategy_id, + CASE + WHEN array_length(concurrency_parent_strategy_ids, 1) > 1 THEN concurrency_parent_strategy_ids[2:array_length(concurrency_parent_strategy_ids, 1)] + ELSE '{}'::bigint[] + END AS next_parent_strategy_ids, + concurrency_strategy_ids[1] AS strategy_id, + external_id, + workflow_run_id, + CASE + WHEN array_length(concurrency_strategy_ids, 1) > 1 THEN concurrency_strategy_ids[2:array_length(concurrency_strategy_ids, 1)] + ELSE '{}'::bigint[] + END AS next_strategy_ids, + concurrency_keys[1] AS key, + CASE + WHEN array_length(concurrency_keys, 1) > 1 THEN concurrency_keys[2:array_length(concurrency_keys, 1)] + ELSE '{}'::text[] + END AS next_keys, + workflow_id, + workflow_version_id, + queue, + CURRENT_TIMESTAMP + convert_duration_to_interval(schedule_timeout) AS schedule_timeout_at + FROM new_table + WHERE initial_state = 'QUEUED' AND concurrency_strategy_ids[1] IS NOT NULL + ) + INSERT INTO v1_concurrency_slot ( + task_id, + task_inserted_at, + task_retry_count, + external_id, + tenant_id, + workflow_id, + workflow_version_id, + workflow_run_id, + parent_strategy_id, + next_parent_strategy_ids, + strategy_id, + next_strategy_ids, + priority, + key, + next_keys, + queue_to_notify, + schedule_timeout_at + ) + SELECT + id, + inserted_at, + retry_count, + external_id, + tenant_id, + workflow_id, + workflow_version_id, + workflow_run_id, + parent_strategy_id, + next_parent_strategy_ids, + strategy_id, + next_strategy_ids, + COALESCE(priority, 1), + key, + next_keys, + queue, + schedule_timeout_at + FROM new_slot_rows; + + INSERT INTO v1_queue_item ( + tenant_id, + queue, + task_id, + task_inserted_at, + external_id, + action_id, + step_id, + workflow_id, + workflow_run_id, + schedule_timeout_at, + step_timeout, + priority, + sticky, + desired_worker_id, + retry_count + ) + SELECT + tenant_id, + queue, + id, + inserted_at, + external_id, + action_id, + step_id, + workflow_id, + workflow_run_id, + CURRENT_TIMESTAMP + convert_duration_to_interval(schedule_timeout), + step_timeout, + COALESCE(priority, 1), + sticky, + desired_worker_id, + retry_count + FROM new_table + WHERE initial_state = 'QUEUED' AND concurrency_strategy_ids[1] IS NULL; + + INSERT INTO v1_dag_to_task ( + dag_id, + dag_inserted_at, + task_id, + task_inserted_at + ) + SELECT + dag_id, + dag_inserted_at, + id, + inserted_at + FROM new_table + WHERE dag_id IS NOT NULL AND dag_inserted_at IS NOT NULL; + + INSERT INTO v1_lookup_table ( + external_id, + tenant_id, + task_id, + inserted_at + ) + SELECT + external_id, + tenant_id, + id, + inserted_at + FROM new_table + ON CONFLICT (external_id) DO NOTHING; + + RETURN NULL; +END; +$$ +LANGUAGE plpgsql; +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +CREATE OR REPLACE FUNCTION v1_task_insert_function() +RETURNS TRIGGER AS $$ +DECLARE + rec RECORD; +BEGIN + WITH new_slot_rows AS ( + SELECT + id, + inserted_at, + retry_count, + tenant_id, + priority, + concurrency_parent_strategy_ids[1] AS parent_strategy_id, + CASE + WHEN array_length(concurrency_parent_strategy_ids, 1) > 1 THEN concurrency_parent_strategy_ids[2:array_length(concurrency_parent_strategy_ids, 1)] + ELSE '{}'::bigint[] + END AS next_parent_strategy_ids, + concurrency_strategy_ids[1] AS strategy_id, + external_id, + workflow_run_id, + CASE + WHEN array_length(concurrency_strategy_ids, 1) > 1 THEN concurrency_strategy_ids[2:array_length(concurrency_strategy_ids, 1)] + ELSE '{}'::bigint[] + END AS next_strategy_ids, + concurrency_keys[1] AS key, + CASE + WHEN array_length(concurrency_keys, 1) > 1 THEN concurrency_keys[2:array_length(concurrency_keys, 1)] + ELSE '{}'::text[] + END AS next_keys, + workflow_id, + workflow_version_id, + queue, + CURRENT_TIMESTAMP + convert_duration_to_interval(schedule_timeout) AS schedule_timeout_at + FROM new_table + WHERE initial_state = 'QUEUED' AND concurrency_strategy_ids[1] IS NOT NULL + ) + INSERT INTO v1_concurrency_slot ( + task_id, + task_inserted_at, + task_retry_count, + external_id, + tenant_id, + workflow_id, + workflow_version_id, + workflow_run_id, + parent_strategy_id, + next_parent_strategy_ids, + strategy_id, + next_strategy_ids, + priority, + key, + next_keys, + queue_to_notify, + schedule_timeout_at + ) + SELECT + id, + inserted_at, + retry_count, + external_id, + tenant_id, + workflow_id, + workflow_version_id, + workflow_run_id, + parent_strategy_id, + next_parent_strategy_ids, + strategy_id, + next_strategy_ids, + COALESCE(priority, 1), + key, + next_keys, + queue, + schedule_timeout_at + FROM new_slot_rows; + + INSERT INTO v1_queue_item ( + tenant_id, + queue, + task_id, + task_inserted_at, + external_id, + action_id, + step_id, + workflow_id, + workflow_run_id, + schedule_timeout_at, + step_timeout, + priority, + sticky, + desired_worker_id, + retry_count + ) + SELECT + tenant_id, + queue, + id, + inserted_at, + external_id, + action_id, + step_id, + workflow_id, + workflow_run_id, + CURRENT_TIMESTAMP + convert_duration_to_interval(schedule_timeout), + step_timeout, + COALESCE(priority, 1), + sticky, + desired_worker_id, + retry_count + FROM new_table + WHERE initial_state = 'QUEUED' AND concurrency_strategy_ids[1] IS NULL; + + INSERT INTO v1_dag_to_task ( + dag_id, + dag_inserted_at, + task_id, + task_inserted_at + ) + SELECT + dag_id, + dag_inserted_at, + id, + inserted_at + FROM new_table + WHERE dag_id IS NOT NULL AND dag_inserted_at IS NOT NULL; + + INSERT INTO v1_lookup_table ( + external_id, + tenant_id, + task_id, + inserted_at + ) + SELECT + external_id, + tenant_id, + id, + inserted_at + FROM new_table + ON CONFLICT (external_id) DO NOTHING; + + -- NOTE: this comes after the insert into v1_dag_to_task and v1_lookup_table, because we case on these tables for cleanup + FOR rec IN SELECT UNNEST(concurrency_parent_strategy_ids) AS parent_strategy_id, workflow_version_id, workflow_run_id FROM new_table WHERE initial_state != 'QUEUED' ORDER BY parent_strategy_id, workflow_version_id, workflow_run_id LOOP + IF rec.parent_strategy_id IS NOT NULL THEN + PERFORM cleanup_workflow_concurrency_slots( + rec.parent_strategy_id, + rec.workflow_version_id, + rec.workflow_run_id + ); + END IF; + END LOOP; + + RETURN NULL; +END; +$$ +LANGUAGE plpgsql; +-- +goose StatementEnd diff --git a/pkg/repository/v1/sqlcv1/concurrency.sql b/pkg/repository/v1/sqlcv1/concurrency.sql index 136944d07..1b307dfe0 100644 --- a/pkg/repository/v1/sqlcv1/concurrency.sql +++ b/pkg/repository/v1/sqlcv1/concurrency.sql @@ -283,18 +283,31 @@ FROM -- name: RunParentCancelInProgress :exec -WITH eligible_running_slots AS ( +WITH locked_workflow_concurrency_slots AS ( + SELECT * + FROM v1_workflow_concurrency_slot + WHERE (strategy_id, workflow_version_id, workflow_run_id) IN ( + SELECT + strategy_id, + workflow_version_id, + workflow_run_id + FROM + tmp_workflow_concurrency_slot + ) + ORDER BY strategy_id, workflow_version_id, workflow_run_id + FOR UPDATE +), eligible_running_slots AS ( SELECT wsc.* FROM ( SELECT DISTINCT key - FROM tmp_workflow_concurrency_slot + FROM locked_workflow_concurrency_slots WHERE tenant_id = @tenantId::uuid AND strategy_id = @strategyId::bigint ) distinct_keys JOIN LATERAL ( SELECT * - FROM tmp_workflow_concurrency_slot wcs_all + FROM locked_workflow_concurrency_slots wcs_all WHERE wcs_all.key = distinct_keys.key AND wcs_all.tenant_id = @tenantId::uuid @@ -510,18 +523,31 @@ FROM updated_slots; -- name: RunParentCancelNewest :exec -WITH eligible_running_slots AS ( +WITH locked_workflow_concurrency_slots AS ( + SELECT * + FROM v1_workflow_concurrency_slot + WHERE (strategy_id, workflow_version_id, workflow_run_id) IN ( + SELECT + strategy_id, + workflow_version_id, + workflow_run_id + FROM + tmp_workflow_concurrency_slot + ) + ORDER BY strategy_id, workflow_version_id, workflow_run_id + FOR UPDATE +), eligible_running_slots AS ( SELECT wsc.* FROM ( SELECT DISTINCT key - FROM tmp_workflow_concurrency_slot + FROM locked_workflow_concurrency_slots WHERE tenant_id = @tenantId::uuid AND strategy_id = @strategyId::bigint ) distinct_keys JOIN LATERAL ( SELECT * - FROM tmp_workflow_concurrency_slot wcs_all + FROM locked_workflow_concurrency_slots wcs_all WHERE wcs_all.key = distinct_keys.key AND wcs_all.tenant_id = @tenantId::uuid diff --git a/pkg/repository/v1/sqlcv1/concurrency.sql.go b/pkg/repository/v1/sqlcv1/concurrency.sql.go index 7b6332d71..d3f6af1e1 100644 --- a/pkg/repository/v1/sqlcv1/concurrency.sql.go +++ b/pkg/repository/v1/sqlcv1/concurrency.sql.go @@ -830,18 +830,31 @@ func (q *Queries) RunGroupRoundRobin(ctx context.Context, db DBTX, arg RunGroupR } const runParentCancelInProgress = `-- name: RunParentCancelInProgress :exec -WITH eligible_running_slots AS ( +WITH locked_workflow_concurrency_slots AS ( + SELECT sort_id, tenant_id, workflow_id, workflow_version_id, workflow_run_id, strategy_id, completed_child_strategy_ids, child_strategy_ids, priority, key, is_filled + FROM v1_workflow_concurrency_slot + WHERE (strategy_id, workflow_version_id, workflow_run_id) IN ( + SELECT + strategy_id, + workflow_version_id, + workflow_run_id + FROM + tmp_workflow_concurrency_slot + ) + ORDER BY strategy_id, workflow_version_id, workflow_run_id + FOR UPDATE +), eligible_running_slots AS ( SELECT wsc.sort_id, wsc.tenant_id, wsc.workflow_id, wsc.workflow_version_id, wsc.workflow_run_id, wsc.strategy_id, wsc.completed_child_strategy_ids, wsc.child_strategy_ids, wsc.priority, wsc.key, wsc.is_filled FROM ( SELECT DISTINCT key - FROM tmp_workflow_concurrency_slot + FROM locked_workflow_concurrency_slots WHERE tenant_id = $1::uuid AND strategy_id = $2::bigint ) distinct_keys JOIN LATERAL ( SELECT sort_id, tenant_id, workflow_id, workflow_version_id, workflow_run_id, strategy_id, completed_child_strategy_ids, child_strategy_ids, priority, key, is_filled - FROM tmp_workflow_concurrency_slot wcs_all + FROM locked_workflow_concurrency_slots wcs_all WHERE wcs_all.key = distinct_keys.key AND wcs_all.tenant_id = $1::uuid @@ -902,18 +915,31 @@ func (q *Queries) RunParentCancelInProgress(ctx context.Context, db DBTX, arg Ru } const runParentCancelNewest = `-- name: RunParentCancelNewest :exec -WITH eligible_running_slots AS ( +WITH locked_workflow_concurrency_slots AS ( + SELECT sort_id, tenant_id, workflow_id, workflow_version_id, workflow_run_id, strategy_id, completed_child_strategy_ids, child_strategy_ids, priority, key, is_filled + FROM v1_workflow_concurrency_slot + WHERE (strategy_id, workflow_version_id, workflow_run_id) IN ( + SELECT + strategy_id, + workflow_version_id, + workflow_run_id + FROM + tmp_workflow_concurrency_slot + ) + ORDER BY strategy_id, workflow_version_id, workflow_run_id + FOR UPDATE +), eligible_running_slots AS ( SELECT wsc.sort_id, wsc.tenant_id, wsc.workflow_id, wsc.workflow_version_id, wsc.workflow_run_id, wsc.strategy_id, wsc.completed_child_strategy_ids, wsc.child_strategy_ids, wsc.priority, wsc.key, wsc.is_filled FROM ( SELECT DISTINCT key - FROM tmp_workflow_concurrency_slot + FROM locked_workflow_concurrency_slots WHERE tenant_id = $1::uuid AND strategy_id = $2::bigint ) distinct_keys JOIN LATERAL ( SELECT sort_id, tenant_id, workflow_id, workflow_version_id, workflow_run_id, strategy_id, completed_child_strategy_ids, child_strategy_ids, priority, key, is_filled - FROM tmp_workflow_concurrency_slot wcs_all + FROM locked_workflow_concurrency_slots wcs_all WHERE wcs_all.key = distinct_keys.key AND wcs_all.tenant_id = $1::uuid diff --git a/pkg/repository/v1/sqlcv1/matches.sql b/pkg/repository/v1/sqlcv1/matches.sql index a35b89bda..dae4cec21 100644 --- a/pkg/repository/v1/sqlcv1/matches.sql +++ b/pkg/repository/v1/sqlcv1/matches.sql @@ -65,14 +65,19 @@ INSERT INTO v1_match_condition ( -- on the same target table without using RETURNING. WITH input AS ( SELECT - * + UNNEST(@matchIds::BIGINT[]) AS match_id, + UNNEST(@conditionIds::BIGINT[]) AS condition_id, + UNNEST(@datas::JSONB[]) AS data +), locked_matches AS ( + SELECT + m.id FROM - ( - SELECT - unnest(@matchIds::bigint[]) AS match_id, - unnest(@conditionIds::bigint[]) AS condition_id, - unnest(@datas::jsonb[]) AS data - ) AS subquery + v1_match m + WHERE + m.id = ANY(@matchIds::BIGINT[]) + ORDER BY + m.id + FOR UPDATE ), locked_conditions AS ( SELECT m.v1_match_id, @@ -82,6 +87,8 @@ WITH input AS ( v1_match_condition m JOIN input i ON i.match_id = m.v1_match_id AND i.condition_id = m.id + JOIN + locked_matches lm ON lm.id = m.v1_match_id ORDER BY m.id FOR UPDATE @@ -97,21 +104,11 @@ WITH input AS ( (v1_match_condition.v1_match_id, v1_match_condition.id) = (c.v1_match_id, c.id) RETURNING v1_match_condition.v1_match_id, v1_match_condition.id -), distinct_match_ids AS ( - SELECT - DISTINCT v1_match_id - FROM - updated_conditions ) SELECT m.id FROM - v1_match m -JOIN - distinct_match_ids dm ON dm.v1_match_id = m.id -ORDER BY - m.id -FOR UPDATE; + locked_matches m; -- name: SaveSatisfiedMatchConditions :many -- NOTE: we have to break this into a separate query because CTEs can't see modified rows diff --git a/pkg/repository/v1/sqlcv1/matches.sql.go b/pkg/repository/v1/sqlcv1/matches.sql.go index 2b97cf5bb..a393f6638 100644 --- a/pkg/repository/v1/sqlcv1/matches.sql.go +++ b/pkg/repository/v1/sqlcv1/matches.sql.go @@ -124,14 +124,19 @@ func (q *Queries) CreateMatchesForSignalTriggers(ctx context.Context, db DBTX, a const getSatisfiedMatchConditions = `-- name: GetSatisfiedMatchConditions :many WITH input AS ( SELECT - match_id, condition_id, data + UNNEST($1::BIGINT[]) AS match_id, + UNNEST($2::BIGINT[]) AS condition_id, + UNNEST($3::JSONB[]) AS data +), locked_matches AS ( + SELECT + m.id FROM - ( - SELECT - unnest($1::bigint[]) AS match_id, - unnest($2::bigint[]) AS condition_id, - unnest($3::jsonb[]) AS data - ) AS subquery + v1_match m + WHERE + m.id = ANY($1::BIGINT[]) + ORDER BY + m.id + FOR UPDATE ), locked_conditions AS ( SELECT m.v1_match_id, @@ -141,6 +146,8 @@ WITH input AS ( v1_match_condition m JOIN input i ON i.match_id = m.v1_match_id AND i.condition_id = m.id + JOIN + locked_matches lm ON lm.id = m.v1_match_id ORDER BY m.id FOR UPDATE @@ -156,21 +163,11 @@ WITH input AS ( (v1_match_condition.v1_match_id, v1_match_condition.id) = (c.v1_match_id, c.id) RETURNING v1_match_condition.v1_match_id, v1_match_condition.id -), distinct_match_ids AS ( - SELECT - DISTINCT v1_match_id - FROM - updated_conditions ) SELECT m.id FROM - v1_match m -JOIN - distinct_match_ids dm ON dm.v1_match_id = m.id -ORDER BY - m.id -FOR UPDATE + locked_matches m ` type GetSatisfiedMatchConditionsParams struct { diff --git a/pkg/repository/v1/sqlcv1/tasks.sql b/pkg/repository/v1/sqlcv1/tasks.sql index 16e6b65ff..0f950d51b 100644 --- a/pkg/repository/v1/sqlcv1/tasks.sql +++ b/pkg/repository/v1/sqlcv1/tasks.sql @@ -987,3 +987,22 @@ WHERE (v1_task_runtime.task_id, v1_task_runtime.task_inserted_at, v1_task_runtime.retry_count) IN (SELECT id, inserted_at, retry_count FROM task) RETURNING v1_task_runtime.*; + +-- name: CleanupWorkflowConcurrencySlotsAfterInsert :exec +-- Cleans up workflow concurrency slots when tasks have been inserted in a non-QUEUED state. +-- NOTE: this comes after the insert into v1_dag_to_task and v1_lookup_table, because we case on these tables for cleanup +WITH input AS ( + SELECT + UNNEST(@concurrencyParentStrategyIds::bigint[]) AS parent_strategy_id, + UNNEST(@workflowVersionIds::uuid[]) AS workflow_version_id, + UNNEST(@workflowRunIds::uuid[]) AS workflow_run_id + ORDER BY parent_strategy_id, workflow_version_id, workflow_run_id +) +SELECT + cleanup_workflow_concurrency_slots( + rec.parent_strategy_id, + rec.workflow_version_id, + rec.workflow_run_id + ) +FROM + input rec; diff --git a/pkg/repository/v1/sqlcv1/tasks.sql.go b/pkg/repository/v1/sqlcv1/tasks.sql.go index 79ed4722c..b6f268563 100644 --- a/pkg/repository/v1/sqlcv1/tasks.sql.go +++ b/pkg/repository/v1/sqlcv1/tasks.sql.go @@ -11,6 +11,37 @@ import ( "github.com/jackc/pgx/v5/pgtype" ) +const cleanupWorkflowConcurrencySlotsAfterInsert = `-- name: CleanupWorkflowConcurrencySlotsAfterInsert :exec +WITH input AS ( + SELECT + UNNEST($1::bigint[]) AS parent_strategy_id, + UNNEST($2::uuid[]) AS workflow_version_id, + UNNEST($3::uuid[]) AS workflow_run_id + ORDER BY parent_strategy_id, workflow_version_id, workflow_run_id +) +SELECT + cleanup_workflow_concurrency_slots( + rec.parent_strategy_id, + rec.workflow_version_id, + rec.workflow_run_id + ) +FROM + input rec +` + +type CleanupWorkflowConcurrencySlotsAfterInsertParams struct { + Concurrencyparentstrategyids []int64 `json:"concurrencyparentstrategyids"` + Workflowversionids []pgtype.UUID `json:"workflowversionids"` + Workflowrunids []pgtype.UUID `json:"workflowrunids"` +} + +// Cleans up workflow concurrency slots when tasks have been inserted in a non-QUEUED state. +// NOTE: this comes after the insert into v1_dag_to_task and v1_lookup_table, because we case on these tables for cleanup +func (q *Queries) CleanupWorkflowConcurrencySlotsAfterInsert(ctx context.Context, db DBTX, arg CleanupWorkflowConcurrencySlotsAfterInsertParams) error { + _, err := db.Exec(ctx, cleanupWorkflowConcurrencySlotsAfterInsert, arg.Concurrencyparentstrategyids, arg.Workflowversionids, arg.Workflowrunids) + return err +} + const createPartitions = `-- name: CreatePartitions :exec SELECT create_v1_range_partition('v1_task', $1::date), diff --git a/pkg/repository/v1/task.go b/pkg/repository/v1/task.go index 1d1ed3e35..10d524159 100644 --- a/pkg/repository/v1/task.go +++ b/pkg/repository/v1/task.go @@ -1651,6 +1651,10 @@ func (r *sharedRepository) insertTasks( unix := time.Now().UnixMilli() + cleanupParentStrategyIds := make([]int64, 0) + cleanupWorkflowVersionIds := make([]pgtype.UUID, 0) + cleanupWorkflowRunIds := make([]pgtype.UUID, 0) + for i, task := range tasks { stepConfig := stepIdsToConfig[task.StepId] tenantIds[i] = sqlchelpers.UUIDFromStr(tenantId) @@ -1757,6 +1761,14 @@ func (r *sharedRepository) insertTasks( taskStrategyIds = append(taskStrategyIds, strat.ID) taskParentStrategyIds = append(taskParentStrategyIds, strat.ParentStrategyID) emptyConcurrencyKeys = append(emptyConcurrencyKeys, "") + + // we only need to cleanup parent strategy ids if the task is not in a QUEUED state, because + // this skips the creation of a concurrency slot and means we might want to cleanup the workflow slot + if strat.ParentStrategyID.Valid && task.InitialState != sqlcv1.V1TaskInitialStateQUEUED { + cleanupParentStrategyIds = append(cleanupParentStrategyIds, strat.ParentStrategyID.Int64) + cleanupWorkflowRunIds = append(cleanupWorkflowRunIds, sqlchelpers.UUIDFromStr(task.WorkflowRunId)) + cleanupWorkflowVersionIds = append(cleanupWorkflowVersionIds, stepConfig.WorkflowVersionId) + } } } @@ -2050,6 +2062,22 @@ func (r *sharedRepository) insertTasks( } } + if len(cleanupParentStrategyIds) > 0 { + err = r.queries.CleanupWorkflowConcurrencySlotsAfterInsert( + ctx, + tx, + sqlcv1.CleanupWorkflowConcurrencySlotsAfterInsertParams{ + Concurrencyparentstrategyids: cleanupParentStrategyIds, + Workflowrunids: cleanupWorkflowRunIds, + Workflowversionids: cleanupWorkflowVersionIds, + }, + ) + + if err != nil { + return nil, fmt.Errorf("failed to cleanup workflow concurrency slots after insert: %w", err) + } + } + // TODO: this should be moved to after the transaction commits saveQueueCache() diff --git a/sql/schema/v1-core.sql b/sql/schema/v1-core.sql index 6e16df878..11762fc56 100644 --- a/sql/schema/v1-core.sql +++ b/sql/schema/v1-core.sql @@ -1086,17 +1086,6 @@ BEGIN FROM new_table ON CONFLICT (external_id) DO NOTHING; - -- NOTE: this comes after the insert into v1_dag_to_task and v1_lookup_table, because we case on these tables for cleanup - FOR rec IN SELECT UNNEST(concurrency_parent_strategy_ids) AS parent_strategy_id, workflow_version_id, workflow_run_id FROM new_table WHERE initial_state != 'QUEUED' ORDER BY parent_strategy_id, workflow_version_id, workflow_run_id LOOP - IF rec.parent_strategy_id IS NOT NULL THEN - PERFORM cleanup_workflow_concurrency_slots( - rec.parent_strategy_id, - rec.workflow_version_id, - rec.workflow_run_id - ); - END IF; - END LOOP; - RETURN NULL; END; $$