diff --git a/pkg/repository/v1/scheduler_concurrency.go b/pkg/repository/v1/scheduler_concurrency.go index 2f56df69f..3b32b3467 100644 --- a/pkg/repository/v1/scheduler_concurrency.go +++ b/pkg/repository/v1/scheduler_concurrency.go @@ -60,7 +60,7 @@ func (c *ConcurrencyRepositoryImpl) UpdateConcurrencyStrategyIsActive( tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency, ) error { - tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, c.pool, c.l, 5000) + tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, c.pool, c.l, 30000) if err != nil { return err @@ -136,7 +136,7 @@ func (c *ConcurrencyRepositoryImpl) runGroupRoundRobin( strategy *sqlcv1.V1StepConcurrency, ) (res *RunConcurrencyResult, err error) { - tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, c.pool, c.l, 5000) + tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, c.pool, c.l, 30000) if err != nil { return nil, fmt.Errorf("failed to prepare transaction (strategy ID: %d): %w", strategy.ID, err) @@ -277,7 +277,7 @@ func (c *ConcurrencyRepositoryImpl) runCancelInProgress( strategy *sqlcv1.V1StepConcurrency, ) (res *RunConcurrencyResult, err error) { - tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, c.pool, c.l, 5000) + tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, c.pool, c.l, 30000) if err != nil { return nil, fmt.Errorf("failed to prepare transaction (strategy ID: %d): %w", strategy.ID, err) @@ -496,7 +496,7 @@ func (c *ConcurrencyRepositoryImpl) runCancelNewest( tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency, ) (res *RunConcurrencyResult, err error) { - tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, c.pool, c.l, 5000) + tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, c.pool, c.l, 30000) if err != nil { return nil, fmt.Errorf("failed to prepare transaction (strategy ID: %d): %w", strategy.ID, err) diff --git a/pkg/repository/v1/sqlcv1/concurrency-overwrite.sql.go b/pkg/repository/v1/sqlcv1/concurrency-overwrite.sql.go index 47682cc78..652ae3b8d 100644 --- a/pkg/repository/v1/sqlcv1/concurrency-overwrite.sql.go +++ b/pkg/repository/v1/sqlcv1/concurrency-overwrite.sql.go @@ -39,6 +39,7 @@ WITH filled_parent_slots AS ( ORDER BY task_id, task_inserted_at FOR UPDATE + LIMIT 1000 ), eligible_slots AS ( SELECT cs.sort_id, cs.task_id, cs.task_inserted_at, cs.task_retry_count, cs.external_id, cs.tenant_id, cs.workflow_id, cs.workflow_version_id, cs.workflow_run_id, cs.strategy_id, cs.parent_strategy_id, cs.priority, cs.key, cs.is_filled, cs.next_parent_strategy_ids, cs.next_strategy_ids, cs.next_keys, cs.queue_to_notify, cs.schedule_timeout_at @@ -207,6 +208,7 @@ WITH slots AS ( strategy_id = $2::bigint AND schedule_timeout_at < NOW() AND is_filled = FALSE + LIMIT 1000 ), eligible_running_slots AS ( SELECT task_id, @@ -429,6 +431,7 @@ WITH slots AS ( strategy_id = $2::bigint AND schedule_timeout_at < NOW() AND is_filled = FALSE + LIMIT 1000 ), eligible_running_slots AS ( SELECT task_id,