feat: durable user event log (#2861)

* placeholder

* feat: db tables for user events (#2862)

* feat: db tables for user events

* move event payloads to payloads table, fix env var loading

* fix: address pr review comments

* missed save

* feat: optimistic scheduling (#2867)

* feat: db tables for user events

* move event payloads to payloads table, fix env var loading

* refactor: small changes to prepare optimistic txs

* feat: optimistic scheduling

* address pr review comments

* rm comments

* fix: rampup test race condition

* fix: goleak

* feat: grpc-side triggers

* fix: config and sem logic

* fix: respect optimistic scheduling env var

* add optimistic to testing matrix, remove pg-only mode

* fix cleanup of pubbuffers

* merge migrations

* last testing fixes
This commit is contained in:
abelanger5
2026-02-02 18:04:02 -05:00
committed by GitHub
parent 7540b295ac
commit d56dee4266
57 changed files with 4663 additions and 1800 deletions
+144 -75
View File
@@ -963,75 +963,78 @@ RETURNS TRIGGER AS $$
DECLARE
rec RECORD;
BEGIN
WITH new_slot_rows AS (
-- Only insert if there's a single task with initial_state = 'QUEUED' and concurrency_strategy_ids is not null
IF (SELECT COUNT(*) FROM new_table WHERE initial_state = 'QUEUED' AND concurrency_strategy_ids[1] IS NOT NULL) > 0 THEN
WITH new_slot_rows AS (
SELECT
id,
inserted_at,
retry_count,
tenant_id,
priority,
concurrency_parent_strategy_ids[1] AS parent_strategy_id,
CASE
WHEN array_length(concurrency_parent_strategy_ids, 1) > 1 THEN concurrency_parent_strategy_ids[2:array_length(concurrency_parent_strategy_ids, 1)]
ELSE '{}'::bigint[]
END AS next_parent_strategy_ids,
concurrency_strategy_ids[1] AS strategy_id,
external_id,
workflow_run_id,
CASE
WHEN array_length(concurrency_strategy_ids, 1) > 1 THEN concurrency_strategy_ids[2:array_length(concurrency_strategy_ids, 1)]
ELSE '{}'::bigint[]
END AS next_strategy_ids,
concurrency_keys[1] AS key,
CASE
WHEN array_length(concurrency_keys, 1) > 1 THEN concurrency_keys[2:array_length(concurrency_keys, 1)]
ELSE '{}'::text[]
END AS next_keys,
workflow_id,
workflow_version_id,
queue,
CURRENT_TIMESTAMP + convert_duration_to_interval(schedule_timeout) AS schedule_timeout_at
FROM new_table
WHERE initial_state = 'QUEUED' AND concurrency_strategy_ids[1] IS NOT NULL
)
INSERT INTO v1_concurrency_slot (
task_id,
task_inserted_at,
task_retry_count,
external_id,
tenant_id,
workflow_id,
workflow_version_id,
workflow_run_id,
parent_strategy_id,
next_parent_strategy_ids,
strategy_id,
next_strategy_ids,
priority,
key,
next_keys,
queue_to_notify,
schedule_timeout_at
)
SELECT
id,
inserted_at,
retry_count,
tenant_id,
priority,
concurrency_parent_strategy_ids[1] AS parent_strategy_id,
CASE
WHEN array_length(concurrency_parent_strategy_ids, 1) > 1 THEN concurrency_parent_strategy_ids[2:array_length(concurrency_parent_strategy_ids, 1)]
ELSE '{}'::bigint[]
END AS next_parent_strategy_ids,
concurrency_strategy_ids[1] AS strategy_id,
external_id,
workflow_run_id,
CASE
WHEN array_length(concurrency_strategy_ids, 1) > 1 THEN concurrency_strategy_ids[2:array_length(concurrency_strategy_ids, 1)]
ELSE '{}'::bigint[]
END AS next_strategy_ids,
concurrency_keys[1] AS key,
CASE
WHEN array_length(concurrency_keys, 1) > 1 THEN concurrency_keys[2:array_length(concurrency_keys, 1)]
ELSE '{}'::text[]
END AS next_keys,
tenant_id,
workflow_id,
workflow_version_id,
workflow_run_id,
parent_strategy_id,
next_parent_strategy_ids,
strategy_id,
next_strategy_ids,
COALESCE(priority, 1),
key,
next_keys,
queue,
CURRENT_TIMESTAMP + convert_duration_to_interval(schedule_timeout) AS schedule_timeout_at
FROM new_table
WHERE initial_state = 'QUEUED' AND concurrency_strategy_ids[1] IS NOT NULL
)
INSERT INTO v1_concurrency_slot (
task_id,
task_inserted_at,
task_retry_count,
external_id,
tenant_id,
workflow_id,
workflow_version_id,
workflow_run_id,
parent_strategy_id,
next_parent_strategy_ids,
strategy_id,
next_strategy_ids,
priority,
key,
next_keys,
queue_to_notify,
schedule_timeout_at
)
SELECT
id,
inserted_at,
retry_count,
external_id,
tenant_id,
workflow_id,
workflow_version_id,
workflow_run_id,
parent_strategy_id,
next_parent_strategy_ids,
strategy_id,
next_strategy_ids,
COALESCE(priority, 1),
key,
next_keys,
queue,
schedule_timeout_at
FROM new_slot_rows;
schedule_timeout_at
FROM new_slot_rows;
END IF;
INSERT INTO v1_queue_item (
tenant_id,
@@ -1069,19 +1072,22 @@ BEGIN
FROM new_table
WHERE initial_state = 'QUEUED' AND concurrency_strategy_ids[1] IS NULL;
INSERT INTO v1_dag_to_task (
dag_id,
dag_inserted_at,
task_id,
task_inserted_at
)
SELECT
dag_id,
dag_inserted_at,
id,
inserted_at
FROM new_table
WHERE dag_id IS NOT NULL AND dag_inserted_at IS NOT NULL;
-- Only insert into v1_dag and v1_dag_to_task if dag_id and dag_inserted_at are not null
IF (SELECT COUNT(*) FROM new_table WHERE dag_id IS NOT NULL AND dag_inserted_at IS NOT NULL) > 0 THEN
INSERT INTO v1_dag_to_task (
dag_id,
dag_inserted_at,
task_id,
task_inserted_at
)
SELECT
dag_id,
dag_inserted_at,
id,
inserted_at
FROM new_table
WHERE dag_id IS NOT NULL AND dag_inserted_at IS NOT NULL;
END IF;
INSERT INTO v1_lookup_table (
external_id,
@@ -1640,7 +1646,7 @@ CREATE TABLE v1_durable_sleep (
PRIMARY KEY (tenant_id, sleep_until, id)
);
CREATE TYPE v1_payload_type AS ENUM ('TASK_INPUT', 'DAG_INPUT', 'TASK_OUTPUT', 'TASK_EVENT_DATA');
CREATE TYPE v1_payload_type AS ENUM ('TASK_INPUT', 'DAG_INPUT', 'TASK_OUTPUT', 'TASK_EVENT_DATA', 'USER_EVENT_INPUT');
-- IMPORTANT: Keep these values in sync with `v1_payload_type_olap` in the OLAP db
CREATE TYPE v1_payload_location AS ENUM ('INLINE', 'EXTERNAL');
@@ -2146,3 +2152,66 @@ CREATE TABLE v1_operation_interval_settings (
interval_nanoseconds BIGINT NOT NULL,
PRIMARY KEY (tenant_id, operation_id)
);
-- Events tables
CREATE TABLE v1_event (
id bigint GENERATED ALWAYS AS IDENTITY,
seen_at TIMESTAMPTZ NOT NULL,
tenant_id UUID NOT NULL,
external_id UUID NOT NULL DEFAULT gen_random_uuid(),
key TEXT NOT NULL,
additional_metadata JSONB,
scope TEXT,
triggering_webhook_name TEXT,
PRIMARY KEY (tenant_id, seen_at, id)
) PARTITION BY RANGE(seen_at);
CREATE INDEX v1_event_key_idx ON v1_event (tenant_id, key);
CREATE TABLE v1_event_lookup_table (
tenant_id UUID NOT NULL,
external_id UUID NOT NULL,
event_id BIGINT NOT NULL,
event_seen_at TIMESTAMPTZ NOT NULL,
PRIMARY KEY (external_id, event_seen_at)
) PARTITION BY RANGE(event_seen_at);
CREATE OR REPLACE FUNCTION v1_event_lookup_table_insert_function()
RETURNS TRIGGER AS
$$
BEGIN
INSERT INTO v1_event_lookup_table (
tenant_id,
external_id,
event_id,
event_seen_at
)
SELECT
tenant_id,
external_id,
id,
seen_at
FROM new_rows
ON CONFLICT (external_id, event_seen_at) DO NOTHING;
RETURN NULL;
END;
$$
LANGUAGE plpgsql;
CREATE TRIGGER v1_event_lookup_table_insert_trigger
AFTER INSERT ON v1_event
REFERENCING NEW TABLE AS new_rows
FOR EACH STATEMENT
EXECUTE FUNCTION v1_event_lookup_table_insert_function();
CREATE TABLE v1_event_to_run (
run_external_id UUID NOT NULL,
event_id BIGINT NOT NULL,
event_seen_at TIMESTAMPTZ NOT NULL,
filter_id UUID,
PRIMARY KEY (event_id, event_seen_at, run_external_id)
) PARTITION BY RANGE(event_seen_at);