diff --git a/cmd/hatchet-migrate/migrate/migrations/20250319143644_v1_0_4.sql b/cmd/hatchet-migrate/migrate/migrations/20250319143644_v1_0_4.sql new file mode 100644 index 000000000..fd7fca7bf --- /dev/null +++ b/cmd/hatchet-migrate/migrate/migrations/20250319143644_v1_0_4.sql @@ -0,0 +1,283 @@ +-- +goose Up +-- +goose StatementBegin +CREATE OR REPLACE FUNCTION v1_task_update_function() +RETURNS TRIGGER AS +$$ +BEGIN + WITH new_retry_rows AS ( + SELECT + nt.id, + nt.inserted_at, + nt.retry_count, + nt.tenant_id, + -- Convert the retry_after based on min(retry_backoff_factor ^ retry_count, retry_max_backoff) + NOW() + (LEAST(nt.retry_max_backoff, POWER(nt.retry_backoff_factor, nt.app_retry_count)) * interval '1 second') AS retry_after + FROM new_table nt + JOIN old_table ot ON ot.id = nt.id + WHERE nt.initial_state = 'QUEUED' + AND nt.retry_backoff_factor IS NOT NULL + AND ot.app_retry_count IS DISTINCT FROM nt.app_retry_count + AND nt.app_retry_count != 0 + ) + INSERT INTO v1_retry_queue_item ( + task_id, + task_inserted_at, + task_retry_count, + retry_after, + tenant_id + ) + SELECT + id, + inserted_at, + retry_count, + retry_after, + tenant_id + FROM new_retry_rows; + + WITH new_slot_rows AS ( + SELECT + nt.id, + nt.inserted_at, + nt.retry_count, + nt.tenant_id, + nt.workflow_run_id, + nt.external_id, + nt.concurrency_parent_strategy_ids[1] AS parent_strategy_id, + CASE + WHEN array_length(nt.concurrency_parent_strategy_ids, 1) > 1 THEN nt.concurrency_parent_strategy_ids[2:array_length(nt.concurrency_parent_strategy_ids, 1)] + ELSE '{}'::bigint[] + END AS next_parent_strategy_ids, + nt.concurrency_strategy_ids[1] AS strategy_id, + CASE + WHEN array_length(nt.concurrency_strategy_ids, 1) > 1 THEN nt.concurrency_strategy_ids[2:array_length(nt.concurrency_strategy_ids, 1)] + ELSE '{}'::bigint[] + END AS next_strategy_ids, + nt.concurrency_keys[1] AS key, + CASE + WHEN array_length(nt.concurrency_keys, 1) > 1 THEN nt.concurrency_keys[2:array_length(nt.concurrency_keys, 1)] + ELSE '{}'::text[] + END AS next_keys, + nt.workflow_id, + nt.workflow_version_id, + nt.queue, + CURRENT_TIMESTAMP + convert_duration_to_interval(nt.schedule_timeout) AS schedule_timeout_at + FROM new_table nt + JOIN old_table ot ON ot.id = nt.id + WHERE nt.initial_state = 'QUEUED' + AND nt.concurrency_strategy_ids[1] IS NOT NULL + AND (nt.retry_backoff_factor IS NULL OR ot.app_retry_count IS NOT DISTINCT FROM nt.app_retry_count OR nt.app_retry_count = 0) + AND ot.retry_count IS DISTINCT FROM nt.retry_count + ) + UPDATE + v1_concurrency_slot cs + SET + task_retry_count = nt.retry_count, + is_filled = FALSE, + priority = 4 + FROM + new_slot_rows nt + WHERE + cs.task_id = nt.id + AND cs.task_inserted_at = nt.inserted_at + AND cs.strategy_id = nt.strategy_id; + + INSERT INTO v1_queue_item ( + tenant_id, + queue, + task_id, + task_inserted_at, + external_id, + action_id, + step_id, + workflow_id, + workflow_run_id, + schedule_timeout_at, + step_timeout, + priority, + sticky, + desired_worker_id, + retry_count + ) + SELECT + nt.tenant_id, + nt.queue, + nt.id, + nt.inserted_at, + nt.external_id, + nt.action_id, + nt.step_id, + nt.workflow_id, + nt.workflow_run_id, + CURRENT_TIMESTAMP + convert_duration_to_interval(nt.schedule_timeout), + nt.step_timeout, + 4, + nt.sticky, + nt.desired_worker_id, + nt.retry_count + FROM new_table nt + JOIN old_table ot ON ot.id = nt.id + WHERE nt.initial_state = 'QUEUED' + AND nt.concurrency_strategy_ids[1] IS NULL + AND (nt.retry_backoff_factor IS NULL OR ot.app_retry_count IS NOT DISTINCT FROM nt.app_retry_count OR nt.app_retry_count = 0) + AND ot.retry_count IS DISTINCT FROM nt.retry_count; + + RETURN NULL; +END; +$$ +LANGUAGE plpgsql; +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +CREATE OR REPLACE FUNCTION v1_task_update_function() +RETURNS TRIGGER AS +$$ +BEGIN + WITH new_retry_rows AS ( + SELECT + nt.id, + nt.inserted_at, + nt.retry_count, + nt.tenant_id, + -- Convert the retry_after based on min(retry_backoff_factor ^ retry_count, retry_max_backoff) + NOW() + (LEAST(nt.retry_max_backoff, POWER(nt.retry_backoff_factor, nt.app_retry_count)) * interval '1 second') AS retry_after + FROM new_table nt + JOIN old_table ot ON ot.id = nt.id + WHERE nt.initial_state = 'QUEUED' + AND nt.retry_backoff_factor IS NOT NULL + AND ot.app_retry_count IS DISTINCT FROM nt.app_retry_count + AND nt.app_retry_count != 0 + ) + INSERT INTO v1_retry_queue_item ( + task_id, + task_inserted_at, + task_retry_count, + retry_after, + tenant_id + ) + SELECT + id, + inserted_at, + retry_count, + retry_after, + tenant_id + FROM new_retry_rows; + + WITH new_slot_rows AS ( + SELECT + nt.id, + nt.inserted_at, + nt.retry_count, + nt.tenant_id, + nt.workflow_run_id, + nt.external_id, + nt.concurrency_parent_strategy_ids[1] AS parent_strategy_id, + CASE + WHEN array_length(nt.concurrency_parent_strategy_ids, 1) > 1 THEN nt.concurrency_parent_strategy_ids[2:array_length(nt.concurrency_parent_strategy_ids, 1)] + ELSE '{}'::bigint[] + END AS next_parent_strategy_ids, + nt.concurrency_strategy_ids[1] AS strategy_id, + CASE + WHEN array_length(nt.concurrency_strategy_ids, 1) > 1 THEN nt.concurrency_strategy_ids[2:array_length(nt.concurrency_strategy_ids, 1)] + ELSE '{}'::bigint[] + END AS next_strategy_ids, + nt.concurrency_keys[1] AS key, + CASE + WHEN array_length(nt.concurrency_keys, 1) > 1 THEN nt.concurrency_keys[2:array_length(nt.concurrency_keys, 1)] + ELSE '{}'::text[] + END AS next_keys, + nt.workflow_id, + nt.workflow_version_id, + nt.queue, + CURRENT_TIMESTAMP + convert_duration_to_interval(nt.schedule_timeout) AS schedule_timeout_at + FROM new_table nt + JOIN old_table ot ON ot.id = nt.id + WHERE nt.initial_state = 'QUEUED' + AND nt.concurrency_strategy_ids[1] IS NOT NULL + AND (nt.retry_backoff_factor IS NULL OR ot.app_retry_count IS NOT DISTINCT FROM nt.app_retry_count OR nt.app_retry_count = 0) + AND ot.retry_count IS DISTINCT FROM nt.retry_count + ) + INSERT INTO v1_concurrency_slot ( + task_id, + task_inserted_at, + task_retry_count, + external_id, + tenant_id, + workflow_id, + workflow_version_id, + workflow_run_id, + parent_strategy_id, + next_parent_strategy_ids, + strategy_id, + next_strategy_ids, + priority, + key, + next_keys, + queue_to_notify, + schedule_timeout_at + ) + SELECT + id, + inserted_at, + retry_count, + external_id, + tenant_id, + workflow_id, + workflow_version_id, + workflow_run_id, + parent_strategy_id, + next_parent_strategy_ids, + strategy_id, + next_strategy_ids, + 4, + key, + next_keys, + queue, + schedule_timeout_at + FROM new_slot_rows; + + INSERT INTO v1_queue_item ( + tenant_id, + queue, + task_id, + task_inserted_at, + external_id, + action_id, + step_id, + workflow_id, + workflow_run_id, + schedule_timeout_at, + step_timeout, + priority, + sticky, + desired_worker_id, + retry_count + ) + SELECT + nt.tenant_id, + nt.queue, + nt.id, + nt.inserted_at, + nt.external_id, + nt.action_id, + nt.step_id, + nt.workflow_id, + nt.workflow_run_id, + CURRENT_TIMESTAMP + convert_duration_to_interval(nt.schedule_timeout), + nt.step_timeout, + 4, + nt.sticky, + nt.desired_worker_id, + nt.retry_count + FROM new_table nt + JOIN old_table ot ON ot.id = nt.id + WHERE nt.initial_state = 'QUEUED' + AND nt.concurrency_strategy_ids[1] IS NULL + AND (nt.retry_backoff_factor IS NULL OR ot.app_retry_count IS NOT DISTINCT FROM nt.app_retry_count OR nt.app_retry_count = 0) + AND ot.retry_count IS DISTINCT FROM nt.retry_count; + + RETURN NULL; +END; +$$ +LANGUAGE plpgsql; +-- +goose StatementEnd diff --git a/sql/schema/v1-core.sql b/sql/schema/v1-core.sql index 1bd1490e7..6a3687226 100644 --- a/sql/schema/v1-core.sql +++ b/sql/schema/v1-core.sql @@ -982,44 +982,18 @@ BEGIN AND (nt.retry_backoff_factor IS NULL OR ot.app_retry_count IS NOT DISTINCT FROM nt.app_retry_count OR nt.app_retry_count = 0) AND ot.retry_count IS DISTINCT FROM nt.retry_count ) - INSERT INTO v1_concurrency_slot ( - task_id, - task_inserted_at, - task_retry_count, - external_id, - tenant_id, - workflow_id, - workflow_version_id, - workflow_run_id, - parent_strategy_id, - next_parent_strategy_ids, - strategy_id, - next_strategy_ids, - priority, - key, - next_keys, - queue_to_notify, - schedule_timeout_at - ) - SELECT - id, - inserted_at, - retry_count, - external_id, - tenant_id, - workflow_id, - workflow_version_id, - workflow_run_id, - parent_strategy_id, - next_parent_strategy_ids, - strategy_id, - next_strategy_ids, - 4, - key, - next_keys, - queue, - schedule_timeout_at - FROM new_slot_rows; + UPDATE + v1_concurrency_slot cs + SET + task_retry_count = nt.retry_count, + is_filled = FALSE, + priority = 4 + FROM + new_slot_rows nt + WHERE + cs.task_id = nt.id + AND cs.task_inserted_at = nt.inserted_at + AND cs.strategy_id = nt.strategy_id; INSERT INTO v1_queue_item ( tenant_id,