fix: reset scheduling timeouts on replay (#1693)

This commit is contained in:
abelanger5
2025-05-07 20:38:51 -04:00
committed by GitHub
parent d56da1aa4c
commit 858f4d9b82
2 changed files with 361 additions and 0 deletions
@@ -0,0 +1,360 @@
-- +goose Up
-- +goose StatementBegin
CREATE OR REPLACE FUNCTION v1_task_update_function()
RETURNS TRIGGER AS
$$
BEGIN
WITH new_retry_rows AS (
SELECT
nt.id,
nt.inserted_at,
nt.retry_count,
nt.tenant_id,
-- Convert the retry_after based on min(retry_backoff_factor ^ retry_count, retry_max_backoff)
NOW() + (LEAST(nt.retry_max_backoff, POWER(nt.retry_backoff_factor, nt.app_retry_count)) * interval '1 second') AS retry_after
FROM new_table nt
JOIN old_table ot ON ot.id = nt.id
WHERE nt.initial_state = 'QUEUED'
AND nt.retry_backoff_factor IS NOT NULL
AND ot.app_retry_count IS DISTINCT FROM nt.app_retry_count
AND nt.app_retry_count != 0
)
INSERT INTO v1_retry_queue_item (
task_id,
task_inserted_at,
task_retry_count,
retry_after,
tenant_id
)
SELECT
id,
inserted_at,
retry_count,
retry_after,
tenant_id
FROM new_retry_rows;
WITH new_slot_rows AS (
SELECT
nt.id,
nt.inserted_at,
nt.retry_count,
nt.tenant_id,
nt.workflow_run_id,
nt.external_id,
nt.concurrency_parent_strategy_ids[1] AS parent_strategy_id,
CASE
WHEN array_length(nt.concurrency_parent_strategy_ids, 1) > 1 THEN nt.concurrency_parent_strategy_ids[2:array_length(nt.concurrency_parent_strategy_ids, 1)]
ELSE '{}'::bigint[]
END AS next_parent_strategy_ids,
nt.concurrency_strategy_ids[1] AS strategy_id,
CASE
WHEN array_length(nt.concurrency_strategy_ids, 1) > 1 THEN nt.concurrency_strategy_ids[2:array_length(nt.concurrency_strategy_ids, 1)]
ELSE '{}'::bigint[]
END AS next_strategy_ids,
nt.concurrency_keys[1] AS key,
CASE
WHEN array_length(nt.concurrency_keys, 1) > 1 THEN nt.concurrency_keys[2:array_length(nt.concurrency_keys, 1)]
ELSE '{}'::text[]
END AS next_keys,
nt.workflow_id,
nt.workflow_version_id,
nt.queue,
CURRENT_TIMESTAMP + convert_duration_to_interval(nt.schedule_timeout) AS schedule_timeout_at
FROM new_table nt
JOIN old_table ot ON ot.id = nt.id
WHERE nt.initial_state = 'QUEUED'
-- Concurrency strategy id should never be null
AND nt.concurrency_strategy_ids[1] IS NOT NULL
AND (nt.retry_backoff_factor IS NULL OR ot.app_retry_count IS NOT DISTINCT FROM nt.app_retry_count OR nt.app_retry_count = 0)
AND ot.retry_count IS DISTINCT FROM nt.retry_count
), updated_slot AS (
UPDATE
v1_concurrency_slot cs
SET
task_retry_count = nt.retry_count,
schedule_timeout_at = nt.schedule_timeout_at,
is_filled = FALSE,
priority = 4
FROM
new_slot_rows nt
WHERE
cs.task_id = nt.id
AND cs.task_inserted_at = nt.inserted_at
AND cs.strategy_id = nt.strategy_id
RETURNING cs.*
), slots_to_insert AS (
-- select the rows that were not updated
SELECT
nt.*
FROM
new_slot_rows nt
LEFT JOIN
updated_slot cs ON cs.task_id = nt.id AND cs.task_inserted_at = nt.inserted_at AND cs.strategy_id = nt.strategy_id
WHERE
cs.task_id IS NULL
)
INSERT INTO v1_concurrency_slot (
task_id,
task_inserted_at,
task_retry_count,
external_id,
tenant_id,
workflow_id,
workflow_version_id,
workflow_run_id,
parent_strategy_id,
next_parent_strategy_ids,
strategy_id,
next_strategy_ids,
priority,
key,
next_keys,
queue_to_notify,
schedule_timeout_at
)
SELECT
id,
inserted_at,
retry_count,
external_id,
tenant_id,
workflow_id,
workflow_version_id,
workflow_run_id,
parent_strategy_id,
next_parent_strategy_ids,
strategy_id,
next_strategy_ids,
4,
key,
next_keys,
queue,
schedule_timeout_at
FROM slots_to_insert;
INSERT INTO v1_queue_item (
tenant_id,
queue,
task_id,
task_inserted_at,
external_id,
action_id,
step_id,
workflow_id,
workflow_run_id,
schedule_timeout_at,
step_timeout,
priority,
sticky,
desired_worker_id,
retry_count
)
SELECT
nt.tenant_id,
nt.queue,
nt.id,
nt.inserted_at,
nt.external_id,
nt.action_id,
nt.step_id,
nt.workflow_id,
nt.workflow_run_id,
CURRENT_TIMESTAMP + convert_duration_to_interval(nt.schedule_timeout),
nt.step_timeout,
4,
nt.sticky,
nt.desired_worker_id,
nt.retry_count
FROM new_table nt
JOIN old_table ot ON ot.id = nt.id
WHERE nt.initial_state = 'QUEUED'
AND nt.concurrency_strategy_ids[1] IS NULL
AND (nt.retry_backoff_factor IS NULL OR ot.app_retry_count IS NOT DISTINCT FROM nt.app_retry_count OR nt.app_retry_count = 0)
AND ot.retry_count IS DISTINCT FROM nt.retry_count;
RETURN NULL;
END;
$$
LANGUAGE plpgsql;
-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
CREATE OR REPLACE FUNCTION v1_task_update_function()
RETURNS TRIGGER AS
$$
BEGIN
WITH new_retry_rows AS (
SELECT
nt.id,
nt.inserted_at,
nt.retry_count,
nt.tenant_id,
-- Convert the retry_after based on min(retry_backoff_factor ^ retry_count, retry_max_backoff)
NOW() + (LEAST(nt.retry_max_backoff, POWER(nt.retry_backoff_factor, nt.app_retry_count)) * interval '1 second') AS retry_after
FROM new_table nt
JOIN old_table ot ON ot.id = nt.id
WHERE nt.initial_state = 'QUEUED'
AND nt.retry_backoff_factor IS NOT NULL
AND ot.app_retry_count IS DISTINCT FROM nt.app_retry_count
AND nt.app_retry_count != 0
)
INSERT INTO v1_retry_queue_item (
task_id,
task_inserted_at,
task_retry_count,
retry_after,
tenant_id
)
SELECT
id,
inserted_at,
retry_count,
retry_after,
tenant_id
FROM new_retry_rows;
WITH new_slot_rows AS (
SELECT
nt.id,
nt.inserted_at,
nt.retry_count,
nt.tenant_id,
nt.workflow_run_id,
nt.external_id,
nt.concurrency_parent_strategy_ids[1] AS parent_strategy_id,
CASE
WHEN array_length(nt.concurrency_parent_strategy_ids, 1) > 1 THEN nt.concurrency_parent_strategy_ids[2:array_length(nt.concurrency_parent_strategy_ids, 1)]
ELSE '{}'::bigint[]
END AS next_parent_strategy_ids,
nt.concurrency_strategy_ids[1] AS strategy_id,
CASE
WHEN array_length(nt.concurrency_strategy_ids, 1) > 1 THEN nt.concurrency_strategy_ids[2:array_length(nt.concurrency_strategy_ids, 1)]
ELSE '{}'::bigint[]
END AS next_strategy_ids,
nt.concurrency_keys[1] AS key,
CASE
WHEN array_length(nt.concurrency_keys, 1) > 1 THEN nt.concurrency_keys[2:array_length(nt.concurrency_keys, 1)]
ELSE '{}'::text[]
END AS next_keys,
nt.workflow_id,
nt.workflow_version_id,
nt.queue,
CURRENT_TIMESTAMP + convert_duration_to_interval(nt.schedule_timeout) AS schedule_timeout_at
FROM new_table nt
JOIN old_table ot ON ot.id = nt.id
WHERE nt.initial_state = 'QUEUED'
-- Concurrency strategy id should never be null
AND nt.concurrency_strategy_ids[1] IS NOT NULL
AND (nt.retry_backoff_factor IS NULL OR ot.app_retry_count IS NOT DISTINCT FROM nt.app_retry_count OR nt.app_retry_count = 0)
AND ot.retry_count IS DISTINCT FROM nt.retry_count
), updated_slot AS (
UPDATE
v1_concurrency_slot cs
SET
task_retry_count = nt.retry_count,
is_filled = FALSE,
priority = 4
FROM
new_slot_rows nt
WHERE
cs.task_id = nt.id
AND cs.task_inserted_at = nt.inserted_at
AND cs.strategy_id = nt.strategy_id
RETURNING cs.*
), slots_to_insert AS (
-- select the rows that were not updated
SELECT
nt.*
FROM
new_slot_rows nt
LEFT JOIN
updated_slot cs ON cs.task_id = nt.id AND cs.task_inserted_at = nt.inserted_at AND cs.strategy_id = nt.strategy_id
WHERE
cs.task_id IS NULL
)
INSERT INTO v1_concurrency_slot (
task_id,
task_inserted_at,
task_retry_count,
external_id,
tenant_id,
workflow_id,
workflow_version_id,
workflow_run_id,
parent_strategy_id,
next_parent_strategy_ids,
strategy_id,
next_strategy_ids,
priority,
key,
next_keys,
queue_to_notify,
schedule_timeout_at
)
SELECT
id,
inserted_at,
retry_count,
external_id,
tenant_id,
workflow_id,
workflow_version_id,
workflow_run_id,
parent_strategy_id,
next_parent_strategy_ids,
strategy_id,
next_strategy_ids,
4,
key,
next_keys,
queue,
schedule_timeout_at
FROM slots_to_insert;
INSERT INTO v1_queue_item (
tenant_id,
queue,
task_id,
task_inserted_at,
external_id,
action_id,
step_id,
workflow_id,
workflow_run_id,
schedule_timeout_at,
step_timeout,
priority,
sticky,
desired_worker_id,
retry_count
)
SELECT
nt.tenant_id,
nt.queue,
nt.id,
nt.inserted_at,
nt.external_id,
nt.action_id,
nt.step_id,
nt.workflow_id,
nt.workflow_run_id,
CURRENT_TIMESTAMP + convert_duration_to_interval(nt.schedule_timeout),
nt.step_timeout,
4,
nt.sticky,
nt.desired_worker_id,
nt.retry_count
FROM new_table nt
JOIN old_table ot ON ot.id = nt.id
WHERE nt.initial_state = 'QUEUED'
AND nt.concurrency_strategy_ids[1] IS NULL
AND (nt.retry_backoff_factor IS NULL OR ot.app_retry_count IS NOT DISTINCT FROM nt.app_retry_count OR nt.app_retry_count = 0)
AND ot.retry_count IS DISTINCT FROM nt.retry_count;
RETURN NULL;
END;
$$
LANGUAGE plpgsql;
-- +goose StatementEnd
+1
View File
@@ -992,6 +992,7 @@ BEGIN
v1_concurrency_slot cs
SET
task_retry_count = nt.retry_count,
schedule_timeout_at = nt.schedule_timeout_at,
is_filled = FALSE,
priority = 4
FROM