From f62142f74de6d6f15842a051ff3bb9b0076b172e Mon Sep 17 00:00:00 2001 From: abelanger5 Date: Tue, 26 Aug 2025 11:06:55 -0400 Subject: [PATCH] fix: explicit ordering in ReleaseTasks and lock parent slots (#2201) * fix: explicit ordering in ReleaseTasks and lock parent slots * fix: IN instead of = * fix: gen diff --- pkg/repository/v1/sqlcv1/batch.go | 61 +++++ pkg/repository/v1/sqlcv1/db.go | 1 + pkg/repository/v1/sqlcv1/tasks-overwrite.go | 286 ++++++++++++++++++++ pkg/repository/v1/sqlcv1/tasks.sql | 117 +------- pkg/repository/v1/sqlcv1/tasks.sql.go | 164 ----------- 5 files changed, 352 insertions(+), 277 deletions(-) create mode 100644 pkg/repository/v1/sqlcv1/batch.go diff --git a/pkg/repository/v1/sqlcv1/batch.go b/pkg/repository/v1/sqlcv1/batch.go new file mode 100644 index 000000000..8c094d4b1 --- /dev/null +++ b/pkg/repository/v1/sqlcv1/batch.go @@ -0,0 +1,61 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.29.0 +// source: batch.go + +package sqlcv1 + +import ( + "context" + "errors" + + "github.com/jackc/pgx/v5" +) + +var ( + ErrBatchAlreadyClosed = errors.New("batch already closed") +) + +const registerBatch = `-- name: RegisterBatch :batchexec +SELECT id, inserted_at, tenant_id, queue, action_id, step_id, step_readable_id, workflow_id, workflow_version_id, workflow_run_id, schedule_timeout, step_timeout, priority, sticky, desired_worker_id, external_id, display_name, input, retry_count, internal_retry_count, app_retry_count, step_index, additional_metadata, dag_id, dag_inserted_at, parent_task_external_id, parent_task_id, parent_task_inserted_at, child_index, child_key, initial_state, initial_state_reason, concurrency_parent_strategy_ids, concurrency_strategy_ids, concurrency_keys, retry_backoff_factor, retry_max_backoff FROM v1_task WHERE id = $1 +` + +type RegisterBatchBatchResults struct { + br pgx.BatchResults + tot int + closed bool +} + +// DO NOT USE: dummy query to satisfy sqlc and register Batch calls on DBTX +func (q *Queries) RegisterBatch(ctx context.Context, db DBTX, id []int64) *RegisterBatchBatchResults { + batch := &pgx.Batch{} + for _, a := range id { + vals := []interface{}{ + a, + } + batch.Queue(registerBatch, vals...) + } + br := db.SendBatch(ctx, batch) + return &RegisterBatchBatchResults{br, len(id), false} +} + +func (b *RegisterBatchBatchResults) Exec(f func(int, error)) { + defer b.br.Close() + for t := 0; t < b.tot; t++ { + if b.closed { + if f != nil { + f(t, ErrBatchAlreadyClosed) + } + continue + } + _, err := b.br.Exec() + if f != nil { + f(t, err) + } + } +} + +func (b *RegisterBatchBatchResults) Close() error { + b.closed = true + return b.br.Close() +} diff --git a/pkg/repository/v1/sqlcv1/db.go b/pkg/repository/v1/sqlcv1/db.go index dbcb3c901..bd6758fb4 100644 --- a/pkg/repository/v1/sqlcv1/db.go +++ b/pkg/repository/v1/sqlcv1/db.go @@ -16,6 +16,7 @@ type DBTX interface { Query(context.Context, string, ...interface{}) (pgx.Rows, error) QueryRow(context.Context, string, ...interface{}) pgx.Row CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error) + SendBatch(context.Context, *pgx.Batch) pgx.BatchResults } func New() *Queries { diff --git a/pkg/repository/v1/sqlcv1/tasks-overwrite.go b/pkg/repository/v1/sqlcv1/tasks-overwrite.go index 88919f3be..3f8000d5a 100644 --- a/pkg/repository/v1/sqlcv1/tasks-overwrite.go +++ b/pkg/repository/v1/sqlcv1/tasks-overwrite.go @@ -3,6 +3,7 @@ package sqlcv1 import ( "context" + "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" ) @@ -506,3 +507,288 @@ func (q *Queries) CreateTaskExpressionEvals(ctx context.Context, db DBTX, arg Cr ) return err } + +const lockParentConcurrencySlots = `-- name: LockParentConcurrencySlots :batchexec +WITH input AS ( + SELECT + task_id, task_inserted_at, retry_count + FROM + ( + SELECT + unnest($1::bigint[]) AS task_id, + unnest($2::timestamptz[]) AS task_inserted_at, + unnest($3::integer[]) AS retry_count + ) AS subquery +), concurrency_slots_to_delete AS ( + SELECT + task_id, task_inserted_at, task_retry_count, parent_strategy_id, workflow_version_id, workflow_run_id + FROM + v1_concurrency_slot + WHERE + (task_id, task_inserted_at, task_retry_count) IN (SELECT task_id, task_inserted_at, retry_count FROM input) +) +SELECT + sort_id, tenant_id, workflow_id, workflow_version_id, workflow_run_id, strategy_id, completed_child_strategy_ids, child_strategy_ids, priority, key, is_filled +FROM + v1_workflow_concurrency_slot wcs +WHERE + (wcs.strategy_id, wcs.workflow_version_id, wcs.workflow_run_id) IN ( + SELECT parent_strategy_id, workflow_version_id, workflow_run_id FROM concurrency_slots_to_delete + ) +ORDER BY + wcs.strategy_id, wcs.workflow_version_id, wcs.workflow_run_id +FOR UPDATE +` + +const releaseConcurrencySlots = `-- name: ReleaseConcurrencySlots :batchexec +WITH input AS ( + SELECT + task_id, task_inserted_at, retry_count + FROM + ( + SELECT + unnest($1::bigint[]) AS task_id, + unnest($2::timestamptz[]) AS task_inserted_at, + unnest($3::integer[]) AS retry_count + ) AS subquery +), concurrency_slots_to_delete AS ( + SELECT + task_id, task_inserted_at, task_retry_count + FROM + v1_concurrency_slot + WHERE + (task_id, task_inserted_at, task_retry_count) IN (SELECT task_id, task_inserted_at, retry_count FROM input) + ORDER BY + task_id, task_inserted_at, task_retry_count + FOR UPDATE +) +DELETE FROM + v1_concurrency_slot +WHERE + (task_id, task_inserted_at, task_retry_count) IN (SELECT task_id, task_inserted_at, task_retry_count FROM concurrency_slots_to_delete) +` + +const releaseQueueItems = `-- name: ReleaseQueueItems :batchexec +WITH input AS ( + SELECT + task_id, task_inserted_at, retry_count + FROM + ( + SELECT + unnest($1::bigint[]) AS task_id, + unnest($2::timestamptz[]) AS task_inserted_at, + unnest($3::integer[]) AS retry_count + ) AS subquery +), queue_items_to_delete AS ( + SELECT + task_id, task_inserted_at, retry_count + FROM + v1_queue_item + WHERE + (task_id, task_inserted_at, retry_count) IN (SELECT task_id, task_inserted_at, retry_count FROM input) + ORDER BY + task_id, task_inserted_at, retry_count + FOR UPDATE +) +DELETE FROM + v1_queue_item +WHERE + (task_id, task_inserted_at, retry_count) IN (SELECT task_id, task_inserted_at, retry_count FROM queue_items_to_delete) +` + +const releaseRateLimitedQueueItems = `-- name: ReleaseRateLimitedQueueItems :batchexec +WITH input AS ( + SELECT + task_id, task_inserted_at, retry_count + FROM + ( + SELECT + unnest($1::bigint[]) AS task_id, + unnest($2::timestamptz[]) AS task_inserted_at, + unnest($3::integer[]) AS retry_count + ) AS subquery +), rate_limited_items_to_delete AS ( + SELECT + task_id, task_inserted_at, retry_count + FROM + v1_rate_limited_queue_items + WHERE + (task_id, task_inserted_at, retry_count) IN (SELECT task_id, task_inserted_at, retry_count FROM input) + ORDER BY + task_id, task_inserted_at, retry_count + FOR UPDATE +) +DELETE FROM + v1_rate_limited_queue_items +WHERE + (task_id, task_inserted_at, retry_count) IN (SELECT task_id, task_inserted_at, retry_count FROM rate_limited_items_to_delete) +` + +const releaseRetryQueueItems = `-- name: ReleaseRetryQueueItems :batchexec +WITH input AS ( + SELECT + task_id, task_inserted_at, retry_count + FROM + ( + SELECT + unnest($1::bigint[]) AS task_id, + unnest($2::timestamptz[]) AS task_inserted_at, + unnest($3::integer[]) AS retry_count + ) AS subquery +), retry_queue_items_to_delete AS ( + SELECT + task_id, task_inserted_at, task_retry_count + FROM + v1_retry_queue_item + WHERE + (task_id, task_inserted_at, task_retry_count) IN (SELECT task_id, task_inserted_at, retry_count FROM input) + ORDER BY + task_id, task_inserted_at, task_retry_count + FOR UPDATE +) +DELETE FROM + v1_retry_queue_item r +WHERE + (task_id, task_inserted_at, task_retry_count) IN ( + SELECT + task_id, task_inserted_at, task_retry_count + FROM + retry_queue_items_to_delete + ) +` + +const releaseTasks = `-- name: ReleaseTasks :batchmany +WITH input AS ( + SELECT + task_id, task_inserted_at, retry_count + FROM + ( + SELECT + unnest($1::bigint[]) AS task_id, + unnest($2::timestamptz[]) AS task_inserted_at, + unnest($3::integer[]) AS retry_count + ) AS subquery +), runtimes_to_delete AS ( + SELECT + task_id, + task_inserted_at, + retry_count, + worker_id + FROM + v1_task_runtime + WHERE + (task_id, task_inserted_at, retry_count) IN (SELECT task_id, task_inserted_at, retry_count FROM input) + ORDER BY + task_id, task_inserted_at, retry_count + FOR UPDATE +), deleted_runtimes AS ( + DELETE FROM + v1_task_runtime + WHERE + (task_id, task_inserted_at, retry_count) IN (SELECT task_id, task_inserted_at, retry_count FROM runtimes_to_delete) + -- return a constant for ordering + RETURNING 1 AS cte_order +) +SELECT + t.queue, + t.id, + t.inserted_at, + t.external_id, + t.step_readable_id, + t.workflow_run_id, + r.worker_id, + i.retry_count::int AS retry_count, + t.retry_count = i.retry_count AS is_current_retry, + t.concurrency_strategy_ids +FROM + v1_task t +JOIN + input i ON i.task_id = t.id AND i.task_inserted_at = t.inserted_at +LEFT JOIN + runtimes_to_delete r ON r.task_id = t.id AND r.retry_count = t.retry_count +` + +type ReleaseTasksParams struct { + Taskids []int64 `json:"taskids"` + Taskinsertedats []pgtype.Timestamptz `json:"taskinsertedats"` + Retrycounts []int32 `json:"retrycounts"` +} + +type ReleaseTasksRow struct { + Queue string `json:"queue"` + ID int64 `json:"id"` + InsertedAt pgtype.Timestamptz `json:"inserted_at"` + ExternalID pgtype.UUID `json:"external_id"` + StepReadableID string `json:"step_readable_id"` + WorkflowRunID pgtype.UUID `json:"workflow_run_id"` + WorkerID pgtype.UUID `json:"worker_id"` + RetryCount int32 `json:"retry_count"` + IsCurrentRetry bool `json:"is_current_retry"` + ConcurrencyStrategyIds []int64 `json:"concurrency_strategy_ids"` +} + +func (q *Queries) ReleaseTasks(ctx context.Context, db DBTX, arg ReleaseTasksParams) ([]*ReleaseTasksRow, error) { + batch := &pgx.Batch{} + vals := []interface{}{ + arg.Taskids, + arg.Taskinsertedats, + arg.Retrycounts, + } + + rowsCh := make(chan *ReleaseTasksRow, len(arg.Taskids)) + errCh := make(chan error, 1) + + res := batch.Queue(releaseTasks, vals...) + + res.Query(func(rows pgx.Rows) error { + for rows.Next() { + var i ReleaseTasksRow + if err := rows.Scan( + &i.Queue, + &i.ID, + &i.InsertedAt, + &i.ExternalID, + &i.StepReadableID, + &i.WorkflowRunID, + &i.WorkerID, + &i.RetryCount, + &i.IsCurrentRetry, + &i.ConcurrencyStrategyIds, + ); err != nil { + errCh <- err + close(rowsCh) + close(errCh) + return err + } + rowsCh <- &i + } + errCh <- rows.Err() + close(rowsCh) + close(errCh) + return nil + }) + batch.Queue(releaseRetryQueueItems, vals...) + batch.Queue(releaseQueueItems, vals...) + batch.Queue(lockParentConcurrencySlots, vals...) + batch.Queue(releaseConcurrencySlots, vals...) + batch.Queue(releaseRateLimitedQueueItems, vals...) + + br := db.SendBatch(ctx, batch) + err := br.Close() + + if err != nil { + return nil, err + } + + var items []*ReleaseTasksRow + + for r := range rowsCh { + items = append(items, r) + } + + if err := <-errCh; err != nil { + return nil, err + } + + return items, nil +} diff --git a/pkg/repository/v1/sqlcv1/tasks.sql b/pkg/repository/v1/sqlcv1/tasks.sql index 0f950d51b..259770c6a 100644 --- a/pkg/repository/v1/sqlcv1/tasks.sql +++ b/pkg/repository/v1/sqlcv1/tasks.sql @@ -161,119 +161,6 @@ WHERE tenant_id = $1 AND id = ANY(@ids::bigint[]); --- name: ReleaseTasks :many -WITH input AS ( - SELECT - * - FROM - ( - SELECT - unnest(@taskIds::bigint[]) AS task_id, - unnest(@taskInsertedAts::timestamptz[]) AS task_inserted_at, - unnest(@retryCounts::integer[]) AS retry_count - ) AS subquery -), runtimes_to_delete AS ( - SELECT - task_id, - task_inserted_at, - retry_count, - worker_id - FROM - v1_task_runtime - WHERE - (task_id, task_inserted_at, retry_count) IN (SELECT task_id, task_inserted_at, retry_count FROM input) - ORDER BY - task_id, task_inserted_at, retry_count - FOR UPDATE -), deleted_runtimes AS ( - DELETE FROM - v1_task_runtime - WHERE - (task_id, task_inserted_at, retry_count) IN (SELECT task_id, task_inserted_at, retry_count FROM runtimes_to_delete) -), retry_queue_items_to_delete AS ( - SELECT - task_id, task_inserted_at, task_retry_count - FROM - v1_retry_queue_item - WHERE - (task_id, task_inserted_at, task_retry_count) IN (SELECT task_id, task_inserted_at, retry_count FROM input) - ORDER BY - task_id, task_inserted_at, task_retry_count - FOR UPDATE -), deleted_rqis AS ( - DELETE FROM - v1_retry_queue_item r - WHERE - (task_id, task_inserted_at, task_retry_count) IN ( - SELECT - task_id, task_inserted_at, task_retry_count - FROM - retry_queue_items_to_delete - ) -), queue_items_to_delete AS ( - SELECT - task_id, task_inserted_at, retry_count - FROM - v1_queue_item - WHERE - (task_id, task_inserted_at, retry_count) IN (SELECT task_id, task_inserted_at, retry_count FROM input) - ORDER BY - task_id, task_inserted_at, retry_count - FOR UPDATE -), deleted_qis AS ( - DELETE FROM - v1_queue_item - WHERE - (task_id, task_inserted_at, retry_count) IN (SELECT task_id, task_inserted_at, retry_count FROM queue_items_to_delete) -), concurrency_slots_to_delete AS ( - SELECT - task_id, task_inserted_at, task_retry_count - FROM - v1_concurrency_slot - WHERE - (task_id, task_inserted_at, task_retry_count) IN (SELECT task_id, task_inserted_at, retry_count FROM input) - ORDER BY - task_id, task_inserted_at, task_retry_count - FOR UPDATE -), deleted_slots AS ( - DELETE FROM - v1_concurrency_slot - WHERE - (task_id, task_inserted_at, task_retry_count) IN (SELECT task_id, task_inserted_at, task_retry_count FROM concurrency_slots_to_delete) -), rate_limited_items_to_delete AS ( - SELECT - task_id, task_inserted_at, retry_count - FROM - v1_rate_limited_queue_items - WHERE - (task_id, task_inserted_at, retry_count) IN (SELECT task_id, task_inserted_at, retry_count FROM input) - ORDER BY - task_id, task_inserted_at, retry_count - FOR UPDATE -), deleted_rate_limited AS ( - DELETE FROM - v1_rate_limited_queue_items - WHERE - (task_id, task_inserted_at, retry_count) IN (SELECT task_id, task_inserted_at, retry_count FROM rate_limited_items_to_delete) -) -SELECT - t.queue, - t.id, - t.inserted_at, - t.external_id, - t.step_readable_id, - t.workflow_run_id, - r.worker_id, - i.retry_count::int AS retry_count, - t.retry_count = i.retry_count AS is_current_retry, - t.concurrency_strategy_ids -FROM - v1_task t -JOIN - input i ON i.task_id = t.id AND i.task_inserted_at = t.inserted_at -LEFT JOIN - runtimes_to_delete r ON r.task_id = t.id AND r.retry_count = t.retry_count; - -- name: FailTaskAppFailure :many -- Fails a task due to an application-level error WITH input AS ( @@ -1006,3 +893,7 @@ SELECT ) FROM input rec; + +-- name: RegisterBatch :batchexec +-- DO NOT USE: dummy query to satisfy sqlc and register Batch calls on DBTX +SELECT * FROM v1_task WHERE id = $1; diff --git a/pkg/repository/v1/sqlcv1/tasks.sql.go b/pkg/repository/v1/sqlcv1/tasks.sql.go index b6f268563..6b89c0bec 100644 --- a/pkg/repository/v1/sqlcv1/tasks.sql.go +++ b/pkg/repository/v1/sqlcv1/tasks.sql.go @@ -1860,167 +1860,3 @@ func (q *Queries) RefreshTimeoutBy(ctx context.Context, db DBTX, arg RefreshTime ) return &i, err } - -const releaseTasks = `-- name: ReleaseTasks :many -WITH input AS ( - SELECT - task_id, task_inserted_at, retry_count - FROM - ( - SELECT - unnest($1::bigint[]) AS task_id, - unnest($2::timestamptz[]) AS task_inserted_at, - unnest($3::integer[]) AS retry_count - ) AS subquery -), runtimes_to_delete AS ( - SELECT - task_id, - task_inserted_at, - retry_count, - worker_id - FROM - v1_task_runtime - WHERE - (task_id, task_inserted_at, retry_count) IN (SELECT task_id, task_inserted_at, retry_count FROM input) - ORDER BY - task_id, task_inserted_at, retry_count - FOR UPDATE -), deleted_runtimes AS ( - DELETE FROM - v1_task_runtime - WHERE - (task_id, task_inserted_at, retry_count) IN (SELECT task_id, task_inserted_at, retry_count FROM runtimes_to_delete) -), retry_queue_items_to_delete AS ( - SELECT - task_id, task_inserted_at, task_retry_count - FROM - v1_retry_queue_item - WHERE - (task_id, task_inserted_at, task_retry_count) IN (SELECT task_id, task_inserted_at, retry_count FROM input) - ORDER BY - task_id, task_inserted_at, task_retry_count - FOR UPDATE -), deleted_rqis AS ( - DELETE FROM - v1_retry_queue_item r - WHERE - (task_id, task_inserted_at, task_retry_count) IN ( - SELECT - task_id, task_inserted_at, task_retry_count - FROM - retry_queue_items_to_delete - ) -), queue_items_to_delete AS ( - SELECT - task_id, task_inserted_at, retry_count - FROM - v1_queue_item - WHERE - (task_id, task_inserted_at, retry_count) IN (SELECT task_id, task_inserted_at, retry_count FROM input) - ORDER BY - task_id, task_inserted_at, retry_count - FOR UPDATE -), deleted_qis AS ( - DELETE FROM - v1_queue_item - WHERE - (task_id, task_inserted_at, retry_count) IN (SELECT task_id, task_inserted_at, retry_count FROM queue_items_to_delete) -), concurrency_slots_to_delete AS ( - SELECT - task_id, task_inserted_at, task_retry_count - FROM - v1_concurrency_slot - WHERE - (task_id, task_inserted_at, task_retry_count) IN (SELECT task_id, task_inserted_at, retry_count FROM input) - ORDER BY - task_id, task_inserted_at, task_retry_count - FOR UPDATE -), deleted_slots AS ( - DELETE FROM - v1_concurrency_slot - WHERE - (task_id, task_inserted_at, task_retry_count) IN (SELECT task_id, task_inserted_at, task_retry_count FROM concurrency_slots_to_delete) -), rate_limited_items_to_delete AS ( - SELECT - task_id, task_inserted_at, retry_count - FROM - v1_rate_limited_queue_items - WHERE - (task_id, task_inserted_at, retry_count) IN (SELECT task_id, task_inserted_at, retry_count FROM input) - ORDER BY - task_id, task_inserted_at, retry_count - FOR UPDATE -), deleted_rate_limited AS ( - DELETE FROM - v1_rate_limited_queue_items - WHERE - (task_id, task_inserted_at, retry_count) IN (SELECT task_id, task_inserted_at, retry_count FROM rate_limited_items_to_delete) -) -SELECT - t.queue, - t.id, - t.inserted_at, - t.external_id, - t.step_readable_id, - t.workflow_run_id, - r.worker_id, - i.retry_count::int AS retry_count, - t.retry_count = i.retry_count AS is_current_retry, - t.concurrency_strategy_ids -FROM - v1_task t -JOIN - input i ON i.task_id = t.id AND i.task_inserted_at = t.inserted_at -LEFT JOIN - runtimes_to_delete r ON r.task_id = t.id AND r.retry_count = t.retry_count -` - -type ReleaseTasksParams struct { - Taskids []int64 `json:"taskids"` - Taskinsertedats []pgtype.Timestamptz `json:"taskinsertedats"` - Retrycounts []int32 `json:"retrycounts"` -} - -type ReleaseTasksRow struct { - Queue string `json:"queue"` - ID int64 `json:"id"` - InsertedAt pgtype.Timestamptz `json:"inserted_at"` - ExternalID pgtype.UUID `json:"external_id"` - StepReadableID string `json:"step_readable_id"` - WorkflowRunID pgtype.UUID `json:"workflow_run_id"` - WorkerID pgtype.UUID `json:"worker_id"` - RetryCount int32 `json:"retry_count"` - IsCurrentRetry bool `json:"is_current_retry"` - ConcurrencyStrategyIds []int64 `json:"concurrency_strategy_ids"` -} - -func (q *Queries) ReleaseTasks(ctx context.Context, db DBTX, arg ReleaseTasksParams) ([]*ReleaseTasksRow, error) { - rows, err := db.Query(ctx, releaseTasks, arg.Taskids, arg.Taskinsertedats, arg.Retrycounts) - if err != nil { - return nil, err - } - defer rows.Close() - var items []*ReleaseTasksRow - for rows.Next() { - var i ReleaseTasksRow - if err := rows.Scan( - &i.Queue, - &i.ID, - &i.InsertedAt, - &i.ExternalID, - &i.StepReadableID, - &i.WorkflowRunID, - &i.WorkerID, - &i.RetryCount, - &i.IsCurrentRetry, - &i.ConcurrencyStrategyIds, - ); err != nil { - return nil, err - } - items = append(items, &i) - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -}