Files
hatchet/sql/schema/v1-core.sql
matt c6e154fd03 Feat: OLAP Payloads (#2410)
* feat: olap payloads table

* feat: olap queue messages for payload puts

* feat: wire up writes on task write

* driveby: add + ignore psql-connect

* fix: down migration

* fix: use external id for pk

* fix: insert query

* fix: more external ids

* fix: bit more cleanup

* feat: dags

* fix: the rest of the refs

* fix: placeholder uuid

* fix: write external ids

* feat: wire up messages over the queue

* fix: panic

* Revert "fix: panic"

This reverts commit c0adccf2ea.

* Revert "feat: wire up messages over the queue"

This reverts commit 36f425f3c1.

* fix: rm unused method

* fix: rm more

* fix: rm cruft

* feat: wire up failures

* feat: start wiring up completed events

* fix: more wiring

* fix: finish wiring up completed event payloads

* fix: lint

* feat: start wiring up external ids in the core

* feat: olap pub

* fix: add returning

* fix: wiring

* debug: log lines for pubs

* fix: external id writes

* Revert "debug: log lines for pubs"

This reverts commit fe430840bd.

* fix: rm sample

* debug: rm pub buffer param

* Revert "debug: rm pub buffer param"

This reverts commit b42a5cacbb.

* debug: stuck queries

* debug: more logs

* debug: yet more logs

* fix: rename BulkRetrieve -> Retrieve

* chore: lint

* fix: naming

* fix: conn leak in putpayloads

* fix: revert debug

* Revert "debug: more logs"

This reverts commit 95da7de64f.

* Revert "debug: stuck queries"

This reverts commit 8fda64adc4.

* feat: improve getters, olap getter

* fix: key type

* feat: first pass at pulling olap payloads from the payload store

* fix: start fixing bugs

* fix: start reworking `includePayloads` param

* fix: include payloads wiring

* feat: analyze for payloads

* fix: simplify writes more + write event payloads

* feat: read out event payloads

* feat: env vars for dual writes

* refactor: clean up task prop drilling a bit

* feat: add include payloads params to python for tests

* fix: tx commit

* fix: dual writes

* fix: not null constraint

* fix: one more

* debug: logging

* fix: more debugging, tweak function sig

* fix: function sig

* fix: refs

* debug: more logging

* debug: more logging

* debug: fix condition

* debug: overwrite properly

* fix: revert debug

* fix: rm more drilling

* fix: comments

* fix: partitioning jobs

* chore: ver

* fix: bug, docs

* hack: dummy id and inserted at for payload offloads

* fix: bug

* fix: no need to handle offloads for task event data

* hack: jitter + current ts

* fix: short circuit

* fix: offload payloads in a tx

* fix: uncomment sampling

* fix: don't offload if external store is disabled

* chore: gen sqlc

* fix: migration

* fix: start reworking types

* fix: couple more

* fix: rm unused code

* fix: drill includePayloads down again

* fix: silence annoying error in some cases

* fix: always store payloads

* debug: use workflow run id for input

* fix: improve logging

* debug: logging on retrieve

* debug: task input

* fix: use correct field

* debug: write even null payloads to limit errors

* debug: hide error lines

* fix: quieting more errors

* fix: duplicate example names, remove print lines

* debug: add logging for olap event writes

* hack: immediate event offloads and cutovers

* fix: rm log line

* fix: import

* fix: short circuit events

* fix: duped names
2025-10-20 09:09:49 -04:00

1772 lines
55 KiB
PL/PgSQL

CREATE OR REPLACE FUNCTION get_v1_partitions_before_date(
targetTableName text,
targetDate date
) RETURNS TABLE(partition_name text)
LANGUAGE plpgsql AS
$$
BEGIN
RETURN QUERY
SELECT
inhrelid::regclass::text AS partition_name
FROM
pg_inherits
WHERE
inhparent = targetTableName::regclass
AND substring(inhrelid::regclass::text, format('%s_(\d{8})', targetTableName)) ~ '^\d{8}'
AND (substring(inhrelid::regclass::text, format('%s_(\d{8})', targetTableName))::date) < targetDate;
END;
$$;
CREATE OR REPLACE FUNCTION get_v1_weekly_partitions_before_date(
targetTableName text,
targetDate date
) RETURNS TABLE(partition_name text)
LANGUAGE plpgsql AS
$$
BEGIN
RETURN QUERY
SELECT
inhrelid::regclass::text AS partition_name
FROM
pg_inherits
WHERE
inhparent = targetTableName::regclass
AND substring(inhrelid::regclass::text, format('%s_(\d{8})', targetTableName)) ~ '^\d{8}'
AND (substring(inhrelid::regclass::text, format('%s_(\d{8})', targetTableName))::date) < targetDate
AND (substring(inhrelid::regclass::text, format('%s_(\d{8})', targetTableName))::date) < NOW() - INTERVAL '1 week'
;
END;
$$;
CREATE OR REPLACE FUNCTION create_v1_range_partition(
targetTableName text,
targetDate date
) RETURNS integer
LANGUAGE plpgsql AS
$$
DECLARE
targetDateStr varchar;
targetDatePlusOneDayStr varchar;
newTableName varchar;
BEGIN
SELECT to_char(targetDate, 'YYYYMMDD') INTO targetDateStr;
SELECT to_char(targetDate + INTERVAL '1 day', 'YYYYMMDD') INTO targetDatePlusOneDayStr;
SELECT lower(format('%s_%s', targetTableName, targetDateStr)) INTO newTableName;
-- exit if the table exists
IF EXISTS (SELECT 1 FROM pg_tables WHERE tablename = newTableName) THEN
RETURN 0;
END IF;
EXECUTE
format('CREATE TABLE %s (LIKE %s INCLUDING INDEXES INCLUDING CONSTRAINTS)', newTableName, targetTableName);
EXECUTE
format('ALTER TABLE %s SET (
autovacuum_vacuum_scale_factor = ''0.1'',
autovacuum_analyze_scale_factor=''0.05'',
autovacuum_vacuum_threshold=''25'',
autovacuum_analyze_threshold=''25'',
autovacuum_vacuum_cost_delay=''10'',
autovacuum_vacuum_cost_limit=''1000''
)', newTableName);
EXECUTE
format('ALTER TABLE %s ATTACH PARTITION %s FOR VALUES FROM (''%s'') TO (''%s'')', targetTableName, newTableName, targetDateStr, targetDatePlusOneDayStr);
RETURN 1;
END;
$$;
CREATE OR REPLACE FUNCTION create_v1_weekly_range_partition(
targetTableName text,
targetDate date
) RETURNS integer
LANGUAGE plpgsql AS
$$
DECLARE
targetDateStr varchar;
targetDatePlusOneWeekStr varchar;
newTableName varchar;
BEGIN
SELECT to_char(date_trunc('week', targetDate), 'YYYYMMDD') INTO targetDateStr;
SELECT to_char(date_trunc('week', targetDate) + INTERVAL '1 week', 'YYYYMMDD') INTO targetDatePlusOneWeekStr;
SELECT lower(format('%s_%s', targetTableName, targetDateStr)) INTO newTableName;
-- exit if the table exists
IF EXISTS (SELECT 1 FROM pg_tables WHERE tablename = newTableName) THEN
RETURN 0;
END IF;
EXECUTE
format('CREATE TABLE %s (LIKE %s INCLUDING INDEXES)', newTableName, targetTableName);
EXECUTE
format('ALTER TABLE %s SET (
autovacuum_vacuum_scale_factor = ''0.1'',
autovacuum_analyze_scale_factor=''0.05'',
autovacuum_vacuum_threshold=''25'',
autovacuum_analyze_threshold=''25'',
autovacuum_vacuum_cost_delay=''10'',
autovacuum_vacuum_cost_limit=''1000''
)', newTableName);
EXECUTE
format('ALTER TABLE %s ATTACH PARTITION %s FOR VALUES FROM (''%s'') TO (''%s'')', targetTableName, newTableName, targetDateStr, targetDatePlusOneWeekStr);
RETURN 1;
END;
$$;
-- https://stackoverflow.com/questions/8137112/unnest-array-by-one-level
CREATE OR REPLACE FUNCTION unnest_nd_1d(a anyarray, OUT a_1d anyarray)
RETURNS SETOF anyarray
LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE STRICT AS
$func$
BEGIN -- null is covered by STRICT
IF a = '{}' THEN -- empty
a_1d = '{}';
RETURN NEXT;
ELSE -- all other cases
FOREACH a_1d SLICE 1 IN ARRAY a LOOP
RETURN NEXT;
END LOOP;
END IF;
END
$func$;
-- CreateTable
CREATE TABLE v1_queue (
tenant_id UUID NOT NULL,
name TEXT NOT NULL,
last_active TIMESTAMP(3),
CONSTRAINT v1_queue_pkey PRIMARY KEY (tenant_id, name)
);
CREATE TYPE v1_sticky_strategy AS ENUM ('NONE', 'SOFT', 'HARD');
CREATE TYPE v1_task_initial_state AS ENUM ('QUEUED', 'CANCELLED', 'SKIPPED', 'FAILED');
-- We need a NONE strategy to allow for tasks which were previously using a concurrency strategy to
-- enqueue if the strategy is removed.
CREATE TYPE v1_concurrency_strategy AS ENUM ('NONE', 'GROUP_ROUND_ROBIN', 'CANCEL_IN_PROGRESS', 'CANCEL_NEWEST');
CREATE TABLE v1_workflow_concurrency (
-- We need an id used for stable ordering to prevent deadlocks. We must process all concurrency
-- strategies on a workflow in the same order.
id bigint GENERATED ALWAYS AS IDENTITY,
workflow_id UUID NOT NULL,
workflow_version_id UUID NOT NULL,
-- If the strategy is NONE and we've removed all concurrency slots, we can set is_active to false
is_active BOOLEAN NOT NULL DEFAULT TRUE,
strategy v1_concurrency_strategy NOT NULL,
child_strategy_ids BIGINT[],
expression TEXT NOT NULL,
tenant_id UUID NOT NULL,
max_concurrency INTEGER NOT NULL,
CONSTRAINT v1_workflow_concurrency_pkey PRIMARY KEY (workflow_id, workflow_version_id, id)
);
CREATE TABLE v1_step_concurrency (
-- We need an id used for stable ordering to prevent deadlocks. We must process all concurrency
-- strategies on a step in the same order.
id bigint GENERATED ALWAYS AS IDENTITY,
-- The parent_strategy_id exists if concurrency is defined at the workflow level
parent_strategy_id BIGINT,
workflow_id UUID NOT NULL,
workflow_version_id UUID NOT NULL,
step_id UUID NOT NULL,
-- If the strategy is NONE and we've removed all concurrency slots, we can set is_active to false
is_active BOOLEAN NOT NULL DEFAULT TRUE,
strategy v1_concurrency_strategy NOT NULL,
expression TEXT NOT NULL,
tenant_id UUID NOT NULL,
max_concurrency INTEGER NOT NULL,
CONSTRAINT v1_step_concurrency_pkey PRIMARY KEY (workflow_id, workflow_version_id, step_id, id)
);
CREATE OR REPLACE FUNCTION create_v1_step_concurrency()
RETURNS trigger AS $$
DECLARE
wf_concurrency_row v1_workflow_concurrency%ROWTYPE;
child_ids bigint[];
BEGIN
IF NEW."concurrencyGroupExpression" IS NOT NULL THEN
-- Insert into v1_workflow_concurrency and capture the inserted row.
INSERT INTO v1_workflow_concurrency (
workflow_id,
workflow_version_id,
strategy,
expression,
tenant_id,
max_concurrency
)
SELECT
wf."id",
wv."id",
NEW."limitStrategy"::VARCHAR::v1_concurrency_strategy,
NEW."concurrencyGroupExpression",
wf."tenantId",
NEW."maxRuns"
FROM "WorkflowVersion" wv
JOIN "Workflow" wf ON wv."workflowId" = wf."id"
WHERE wv."id" = NEW."workflowVersionId"
RETURNING * INTO wf_concurrency_row;
-- Insert into v1_step_concurrency and capture the inserted rows into a variable.
WITH inserted_steps AS (
INSERT INTO v1_step_concurrency (
parent_strategy_id,
workflow_id,
workflow_version_id,
step_id,
strategy,
expression,
tenant_id,
max_concurrency
)
SELECT
wf_concurrency_row.id,
s."workflowId",
s."workflowVersionId",
s."id",
NEW."limitStrategy"::VARCHAR::v1_concurrency_strategy,
NEW."concurrencyGroupExpression",
s."tenantId",
NEW."maxRuns"
FROM (
SELECT
s."id",
wf."id" AS "workflowId",
wv."id" AS "workflowVersionId",
wf."tenantId"
FROM "Step" s
JOIN "Job" j ON s."jobId" = j."id"
JOIN "WorkflowVersion" wv ON j."workflowVersionId" = wv."id"
JOIN "Workflow" wf ON wv."workflowId" = wf."id"
WHERE
wv."id" = NEW."workflowVersionId"
AND j."kind" = 'DEFAULT'
) s
RETURNING *
)
SELECT array_remove(array_agg(t.id), NULL)::bigint[] INTO child_ids
FROM inserted_steps t;
-- Update the workflow concurrency row using its primary key.
UPDATE v1_workflow_concurrency
SET child_strategy_ids = child_ids
WHERE workflow_id = wf_concurrency_row.workflow_id
AND workflow_version_id = wf_concurrency_row.workflow_version_id
AND id = wf_concurrency_row.id;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_create_v1_step_concurrency
AFTER INSERT ON "WorkflowConcurrency"
FOR EACH ROW
EXECUTE FUNCTION create_v1_step_concurrency();
-- CreateTable
CREATE TABLE v1_task (
id bigint GENERATED ALWAYS AS IDENTITY,
inserted_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
tenant_id UUID NOT NULL,
queue TEXT NOT NULL,
action_id TEXT NOT NULL,
step_id UUID NOT NULL,
step_readable_id TEXT NOT NULL,
workflow_id UUID NOT NULL,
workflow_version_id UUID NOT NULL,
workflow_run_id UUID NOT NULL,
schedule_timeout TEXT NOT NULL,
step_timeout TEXT,
priority INTEGER DEFAULT 1,
sticky v1_sticky_strategy NOT NULL,
desired_worker_id UUID,
external_id UUID NOT NULL,
display_name TEXT NOT NULL,
input JSONB NOT NULL,
retry_count INTEGER NOT NULL DEFAULT 0,
internal_retry_count INTEGER NOT NULL DEFAULT 0,
app_retry_count INTEGER NOT NULL DEFAULT 0,
-- step_index is relevant for tracking down the correct SIGNAL_COMPLETED event on a
-- replay of a child workflow
step_index BIGINT NOT NULL,
additional_metadata JSONB,
dag_id BIGINT,
dag_inserted_at TIMESTAMPTZ,
parent_task_external_id UUID,
parent_task_id BIGINT,
parent_task_inserted_at TIMESTAMPTZ,
child_index BIGINT,
child_key TEXT,
initial_state v1_task_initial_state NOT NULL DEFAULT 'QUEUED',
initial_state_reason TEXT,
concurrency_parent_strategy_ids BIGINT[],
concurrency_strategy_ids BIGINT[],
concurrency_keys TEXT[],
retry_backoff_factor DOUBLE PRECISION,
retry_max_backoff INTEGER,
CONSTRAINT v1_task_pkey PRIMARY KEY (id, inserted_at)
) PARTITION BY RANGE(inserted_at);
CREATE TABLE v1_lookup_table (
tenant_id UUID NOT NULL,
external_id UUID NOT NULL,
task_id BIGINT,
dag_id BIGINT,
inserted_at TIMESTAMPTZ NOT NULL,
PRIMARY KEY (external_id)
);
CREATE TYPE v1_task_event_type AS ENUM (
'COMPLETED',
'FAILED',
'CANCELLED',
'SIGNAL_CREATED',
'SIGNAL_COMPLETED'
);
-- CreateTable
CREATE TABLE v1_task_event (
id bigint GENERATED ALWAYS AS IDENTITY,
inserted_at timestamptz DEFAULT CURRENT_TIMESTAMP,
tenant_id UUID NOT NULL,
task_id bigint NOT NULL,
task_inserted_at TIMESTAMPTZ NOT NULL,
retry_count INTEGER NOT NULL,
event_type v1_task_event_type NOT NULL,
-- The event key is an optional field that can be used to uniquely identify an event. This is
-- used for signal events to ensure that we don't create duplicate signals.
event_key TEXT,
created_at TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
data JSONB,
external_id UUID,
CONSTRAINT v1_task_event_pkey PRIMARY KEY (task_id, task_inserted_at, id)
) PARTITION BY RANGE(task_inserted_at);
-- Create unique index on (tenant_id, task_id, event_key) when event_key is not null
CREATE UNIQUE INDEX v1_task_event_event_key_unique_idx ON v1_task_event (
tenant_id ASC,
task_id ASC,
task_inserted_at ASC,
event_type ASC,
event_key ASC
) WHERE event_key IS NOT NULL;
-- CreateTable
CREATE TABLE v1_task_expression_eval (
key TEXT NOT NULL,
task_id BIGINT NOT NULL,
task_inserted_at TIMESTAMPTZ NOT NULL,
value_str TEXT,
value_int INTEGER,
kind "StepExpressionKind" NOT NULL,
CONSTRAINT v1_task_expression_eval_pkey PRIMARY KEY (task_id, task_inserted_at, kind, key)
);
-- CreateTable
-- NOTE: changes to v1_queue_item should be reflected in v1_rate_limited_queue_items
CREATE TABLE v1_queue_item (
id bigint GENERATED ALWAYS AS IDENTITY,
tenant_id UUID NOT NULL,
queue TEXT NOT NULL,
task_id bigint NOT NULL,
task_inserted_at TIMESTAMPTZ NOT NULL,
external_id UUID NOT NULL,
action_id TEXT NOT NULL,
step_id UUID NOT NULL,
workflow_id UUID NOT NULL,
workflow_run_id UUID NOT NULL,
schedule_timeout_at TIMESTAMP(3),
step_timeout TEXT,
priority INTEGER NOT NULL DEFAULT 1,
sticky v1_sticky_strategy NOT NULL,
desired_worker_id UUID,
retry_count INTEGER NOT NULL DEFAULT 0,
CONSTRAINT v1_queue_item_pkey PRIMARY KEY (id)
);
alter table v1_queue_item set (
autovacuum_vacuum_scale_factor = '0.1',
autovacuum_analyze_scale_factor='0.05',
autovacuum_vacuum_threshold='25',
autovacuum_analyze_threshold='25',
autovacuum_vacuum_cost_delay='10',
autovacuum_vacuum_cost_limit='1000'
);
CREATE INDEX v1_queue_item_list_idx ON v1_queue_item (
tenant_id ASC,
queue ASC,
priority DESC,
id ASC
);
CREATE INDEX v1_queue_item_task_idx ON v1_queue_item (
task_id ASC,
task_inserted_at ASC,
retry_count ASC
);
-- CreateTable
CREATE TABLE v1_task_runtime (
task_id bigint NOT NULL,
task_inserted_at TIMESTAMPTZ NOT NULL,
retry_count INTEGER NOT NULL,
worker_id UUID,
tenant_id UUID NOT NULL,
timeout_at TIMESTAMP(3) NOT NULL,
CONSTRAINT v1_task_runtime_pkey PRIMARY KEY (task_id, task_inserted_at, retry_count)
);
CREATE INDEX v1_task_runtime_tenantId_workerId_idx ON v1_task_runtime (tenant_id ASC, worker_id ASC) WHERE worker_id IS NOT NULL;
CREATE INDEX v1_task_runtime_tenantId_timeoutAt_idx ON v1_task_runtime (tenant_id ASC, timeout_at ASC);
alter table v1_task_runtime set (
autovacuum_vacuum_scale_factor = '0.1',
autovacuum_analyze_scale_factor='0.05',
autovacuum_vacuum_threshold='25',
autovacuum_analyze_threshold='25',
autovacuum_vacuum_cost_delay='10',
autovacuum_vacuum_cost_limit='1000'
);
-- v1_rate_limited_queue_items represents a queue item that has been rate limited and removed from the v1_queue_item table.
CREATE TABLE v1_rate_limited_queue_items (
requeue_after TIMESTAMPTZ NOT NULL,
-- everything below this is the same as v1_queue_item
tenant_id UUID NOT NULL,
queue TEXT NOT NULL,
task_id bigint NOT NULL,
task_inserted_at TIMESTAMPTZ NOT NULL,
external_id UUID NOT NULL,
action_id TEXT NOT NULL,
step_id UUID NOT NULL,
workflow_id UUID NOT NULL,
workflow_run_id UUID NOT NULL,
schedule_timeout_at TIMESTAMP(3),
step_timeout TEXT,
priority INTEGER NOT NULL DEFAULT 1,
sticky v1_sticky_strategy NOT NULL,
desired_worker_id UUID,
retry_count INTEGER NOT NULL DEFAULT 0,
CONSTRAINT v1_rate_limited_queue_items_pkey PRIMARY KEY (task_id, task_inserted_at, retry_count)
);
CREATE INDEX v1_rate_limited_queue_items_tenant_requeue_after_idx ON v1_rate_limited_queue_items (
tenant_id ASC,
queue ASC,
requeue_after ASC
);
alter table v1_rate_limited_queue_items set (
autovacuum_vacuum_scale_factor = '0.1',
autovacuum_analyze_scale_factor='0.05',
autovacuum_vacuum_threshold='25',
autovacuum_analyze_threshold='25',
autovacuum_vacuum_cost_delay='10',
autovacuum_vacuum_cost_limit='1000'
);
CREATE TYPE v1_match_kind AS ENUM ('TRIGGER', 'SIGNAL');
CREATE TABLE v1_match (
id bigint GENERATED ALWAYS AS IDENTITY,
tenant_id UUID NOT NULL,
kind v1_match_kind NOT NULL,
is_satisfied BOOLEAN NOT NULL DEFAULT FALSE,
-- existing_data is data that from previous match conditions that we'd like to propagate when the
-- new match condition is met. this is used when this is a match created from a previous match, for
-- example when we've satisfied trigger conditions and would like to register durable sleep, user events
-- before triggering the DAG.
existing_data JSONB,
signal_task_id bigint,
signal_task_inserted_at timestamptz,
signal_external_id UUID,
signal_key TEXT,
-- references the parent DAG for the task, which we can use to get input + additional metadata
trigger_dag_id bigint,
trigger_dag_inserted_at timestamptz,
-- references the step id to instantiate the task
trigger_step_id UUID,
trigger_step_index BIGINT,
-- references the external id for the new task
trigger_external_id UUID,
trigger_workflow_run_id UUID,
trigger_parent_task_external_id UUID,
trigger_parent_task_id BIGINT,
trigger_parent_task_inserted_at timestamptz,
trigger_child_index BIGINT,
trigger_child_key TEXT,
-- references the existing task id, which may be set when we're replaying a task
trigger_existing_task_id bigint,
trigger_existing_task_inserted_at timestamptz,
trigger_priority integer,
CONSTRAINT v1_match_pkey PRIMARY KEY (id)
);
CREATE TYPE v1_event_type AS ENUM ('USER', 'INTERNAL');
-- Provides information to the caller about the action to take. This is used to differentiate
-- negative conditions from positive conditions. For example, if a task is waiting for a set of
-- tasks to fail, the success of all tasks would be a CANCEL condition, and the failure of any
-- task would be a QUEUE condition. Different actions are implicitly different groups of conditions.
CREATE TYPE v1_match_condition_action AS ENUM ('CREATE', 'QUEUE', 'CANCEL', 'SKIP', 'CREATE_MATCH');
CREATE TABLE v1_match_condition (
v1_match_id bigint NOT NULL,
id bigint GENERATED ALWAYS AS IDENTITY,
tenant_id UUID NOT NULL,
registered_at timestamptz NOT NULL DEFAULT CURRENT_TIMESTAMP,
event_type v1_event_type NOT NULL,
-- for INTERNAL events, this will correspond to a v1_task_event_type value
event_key TEXT NOT NULL,
event_resource_hint TEXT,
-- readable_data_key is used as the key when constructing the aggregated data for the v1_match
readable_data_key TEXT NOT NULL,
is_satisfied BOOLEAN NOT NULL DEFAULT FALSE,
action v1_match_condition_action NOT NULL DEFAULT 'QUEUE',
or_group_id UUID NOT NULL,
expression TEXT,
data JSONB,
CONSTRAINT v1_match_condition_pkey PRIMARY KEY (v1_match_id, id)
);
CREATE TABLE v1_filter (
id UUID NOT NULL DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
workflow_id UUID NOT NULL,
scope TEXT NOT NULL,
expression TEXT NOT NULL,
payload JSONB NOT NULL DEFAULT '{}'::JSONB,
payload_hash TEXT GENERATED ALWAYS AS (MD5(payload::TEXT)) STORED,
is_declarative BOOLEAN NOT NULL DEFAULT FALSE,
inserted_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (tenant_id, id)
);
CREATE UNIQUE INDEX v1_filter_unique_tenant_workflow_id_scope_expression_payload ON v1_filter (
tenant_id ASC,
workflow_id ASC,
scope ASC,
expression ASC,
payload_hash
);
CREATE TYPE v1_incoming_webhook_auth_type AS ENUM ('BASIC', 'API_KEY', 'HMAC');
CREATE TYPE v1_incoming_webhook_hmac_algorithm AS ENUM ('SHA1', 'SHA256', 'SHA512', 'MD5');
CREATE TYPE v1_incoming_webhook_hmac_encoding AS ENUM ('HEX', 'BASE64', 'BASE64URL');
-- Can add more sources in the future
CREATE TYPE v1_incoming_webhook_source_name AS ENUM ('GENERIC', 'GITHUB', 'STRIPE', 'SLACK', 'LINEAR');
CREATE TABLE v1_incoming_webhook (
tenant_id UUID NOT NULL,
-- names are tenant-unique
name TEXT NOT NULL,
source_name v1_incoming_webhook_source_name NOT NULL,
-- CEL expression that creates an event key
-- from the payload of the webhook
event_key_expression TEXT NOT NULL,
auth_method v1_incoming_webhook_auth_type NOT NULL,
auth__basic__username TEXT,
auth__basic__password BYTEA,
auth__api_key__header_name TEXT,
auth__api_key__key BYTEA,
auth__hmac__algorithm v1_incoming_webhook_hmac_algorithm,
auth__hmac__encoding v1_incoming_webhook_hmac_encoding,
auth__hmac__signature_header_name TEXT,
auth__hmac__webhook_signing_secret BYTEA,
inserted_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (tenant_id, name),
CHECK (
(
auth_method = 'BASIC'
AND (
auth__basic__username IS NOT NULL
AND auth__basic__password IS NOT NULL
)
)
OR
(
auth_method = 'API_KEY'
AND (
auth__api_key__header_name IS NOT NULL
AND auth__api_key__key IS NOT NULL
)
)
OR
(
auth_method = 'HMAC'
AND (
auth__hmac__algorithm IS NOT NULL
AND auth__hmac__encoding IS NOT NULL
AND auth__hmac__signature_header_name IS NOT NULL
AND auth__hmac__webhook_signing_secret IS NOT NULL
)
)
),
CHECK (LENGTH(event_key_expression) > 0),
CHECK (LENGTH(name) > 0)
);
CREATE INDEX v1_match_condition_filter_idx ON v1_match_condition (
tenant_id ASC,
event_type ASC,
event_key ASC,
is_satisfied ASC,
event_resource_hint ASC
);
CREATE TABLE v1_dag (
id bigint GENERATED ALWAYS AS IDENTITY,
inserted_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
tenant_id UUID NOT NULL,
external_id UUID NOT NULL,
display_name TEXT NOT NULL,
workflow_id UUID NOT NULL,
workflow_version_id UUID NOT NULL,
parent_task_external_id UUID,
CONSTRAINT v1_dag_pkey PRIMARY KEY (id, inserted_at)
) PARTITION BY RANGE(inserted_at);
CREATE TABLE v1_dag_to_task (
dag_id BIGINT NOT NULL,
dag_inserted_at TIMESTAMPTZ NOT NULL,
task_id BIGINT NOT NULL,
task_inserted_at TIMESTAMPTZ NOT NULL,
CONSTRAINT v1_dag_to_task_pkey PRIMARY KEY (dag_id, dag_inserted_at, task_id, task_inserted_at)
);
CREATE TABLE v1_dag_data (
dag_id BIGINT NOT NULL,
dag_inserted_at TIMESTAMPTZ NOT NULL,
input JSONB NOT NULL,
additional_metadata JSONB,
CONSTRAINT v1_dag_input_pkey PRIMARY KEY (dag_id, dag_inserted_at)
);
-- CreateTable
CREATE TABLE v1_workflow_concurrency_slot (
sort_id BIGINT NOT NULL,
tenant_id UUID NOT NULL,
workflow_id UUID NOT NULL,
workflow_version_id UUID NOT NULL,
workflow_run_id UUID NOT NULL,
strategy_id BIGINT NOT NULL,
-- DEPRECATED: this is no longer used to track progress of child strategy ids.
completed_child_strategy_ids BIGINT[],
child_strategy_ids BIGINT[],
priority INTEGER NOT NULL DEFAULT 1,
key TEXT NOT NULL,
is_filled BOOLEAN NOT NULL DEFAULT FALSE,
CONSTRAINT v1_workflow_concurrency_slot_pkey PRIMARY KEY (strategy_id, workflow_version_id, workflow_run_id)
);
CREATE INDEX v1_workflow_concurrency_slot_query_idx ON v1_workflow_concurrency_slot (tenant_id, strategy_id ASC, key ASC, priority DESC, sort_id ASC);
-- CreateTable
CREATE TABLE v1_concurrency_slot (
sort_id BIGINT GENERATED ALWAYS AS IDENTITY,
task_id BIGINT NOT NULL,
task_inserted_at TIMESTAMPTZ NOT NULL,
task_retry_count INTEGER NOT NULL,
external_id UUID NOT NULL,
tenant_id UUID NOT NULL,
workflow_id UUID NOT NULL,
workflow_version_id UUID NOT NULL,
workflow_run_id UUID NOT NULL,
strategy_id BIGINT NOT NULL,
parent_strategy_id BIGINT,
priority INTEGER NOT NULL DEFAULT 1,
key TEXT NOT NULL,
is_filled BOOLEAN NOT NULL DEFAULT FALSE,
next_parent_strategy_ids BIGINT[],
next_strategy_ids BIGINT[],
next_keys TEXT[],
queue_to_notify TEXT NOT NULL,
schedule_timeout_at TIMESTAMP(3) NOT NULL,
CONSTRAINT v1_concurrency_slot_pkey PRIMARY KEY (task_id, task_inserted_at, task_retry_count, strategy_id)
);
CREATE INDEX v1_concurrency_slot_query_idx ON v1_concurrency_slot (tenant_id, strategy_id ASC, key ASC, sort_id ASC);
-- When concurrency slot is CREATED, we should check whether the parent concurrency slot exists; if not, we should create
-- the parent concurrency slot as well.
CREATE OR REPLACE FUNCTION after_v1_concurrency_slot_insert_function()
RETURNS trigger AS $$
BEGIN
WITH parent_slot AS (
SELECT
*
FROM
new_table cs
WHERE
cs.parent_strategy_id IS NOT NULL
), parent_to_child_strategy_ids AS (
SELECT
wc.id AS parent_strategy_id,
wc.tenant_id,
ps.workflow_id,
ps.workflow_version_id,
ps.workflow_run_id,
MAX(ps.sort_id) AS sort_id,
MAX(ps.priority) AS priority,
MAX(ps.key) AS key,
ARRAY_AGG(DISTINCT wc.child_strategy_ids) AS child_strategy_ids
FROM
parent_slot ps
JOIN v1_workflow_concurrency wc ON wc.workflow_id = ps.workflow_id AND wc.workflow_version_id = ps.workflow_version_id AND wc.id = ps.parent_strategy_id
GROUP BY
wc.id,
wc.tenant_id,
ps.workflow_id,
ps.workflow_version_id,
ps.workflow_run_id
)
INSERT INTO v1_workflow_concurrency_slot (
sort_id,
tenant_id,
workflow_id,
workflow_version_id,
workflow_run_id,
strategy_id,
child_strategy_ids,
priority,
key
)
SELECT
pcs.sort_id,
pcs.tenant_id,
pcs.workflow_id,
pcs.workflow_version_id,
pcs.workflow_run_id,
pcs.parent_strategy_id,
pcs.child_strategy_ids,
pcs.priority,
pcs.key
FROM
parent_to_child_strategy_ids pcs
ON CONFLICT (strategy_id, workflow_version_id, workflow_run_id) DO NOTHING;
-- If the v1_step_concurrency strategy is not active, we set it to active.
WITH inactive_strategies AS (
SELECT
strategy.*
FROM
new_table cs
JOIN
v1_step_concurrency strategy ON strategy.workflow_id = cs.workflow_id AND strategy.workflow_version_id = cs.workflow_version_id AND strategy.id = cs.strategy_id
WHERE
strategy.is_active = FALSE
ORDER BY
strategy.id
FOR UPDATE
)
UPDATE v1_step_concurrency strategy
SET is_active = TRUE
FROM inactive_strategies
WHERE
strategy.workflow_id = inactive_strategies.workflow_id AND
strategy.workflow_version_id = inactive_strategies.workflow_version_id AND
strategy.step_id = inactive_strategies.step_id AND
strategy.id = inactive_strategies.id;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER after_v1_concurrency_slot_insert
AFTER INSERT ON v1_concurrency_slot
REFERENCING NEW TABLE AS new_table
FOR EACH STATEMENT
EXECUTE FUNCTION after_v1_concurrency_slot_insert_function();
CREATE OR REPLACE FUNCTION cleanup_workflow_concurrency_slots(
p_strategy_id BIGINT,
p_workflow_version_id UUID,
p_workflow_run_id UUID
) RETURNS VOID AS $$
DECLARE
v_sort_id INTEGER;
BEGIN
-- Get the sort_id for the specific workflow concurrency slot
SELECT sort_id INTO v_sort_id
FROM v1_workflow_concurrency_slot
WHERE strategy_id = p_strategy_id
AND workflow_version_id = p_workflow_version_id
AND workflow_run_id = p_workflow_run_id;
-- Acquire an advisory lock for the strategy ID
-- There is a small chance of collisions but it's extremely unlikely
PERFORM pg_advisory_xact_lock(1000000 * p_strategy_id + v_sort_id);
WITH relevant_tasks_for_dags AS (
SELECT
t.id,
t.inserted_at,
t.retry_count
FROM
v1_task t
JOIN
v1_dag_to_task dt ON t.id = dt.task_id AND t.inserted_at = dt.task_inserted_at
JOIN
v1_lookup_table lt ON dt.dag_id = lt.dag_id AND dt.dag_inserted_at = lt.inserted_at
WHERE
lt.external_id = p_workflow_run_id
AND lt.dag_id IS NOT NULL
), final_concurrency_slots_for_dags AS (
SELECT
wcs.strategy_id,
wcs.workflow_version_id,
wcs.workflow_run_id
FROM
v1_workflow_concurrency_slot wcs
WHERE
wcs.strategy_id = p_strategy_id
AND wcs.workflow_version_id = p_workflow_version_id
AND wcs.workflow_run_id = p_workflow_run_id
AND NOT EXISTS (
-- Check if any task in this DAG has a v1_concurrency_slot
SELECT 1
FROM relevant_tasks_for_dags rt
WHERE EXISTS (
SELECT 1
FROM v1_concurrency_slot cs2
WHERE cs2.task_id = rt.id
AND cs2.task_inserted_at = rt.inserted_at
AND cs2.task_retry_count = rt.retry_count
)
)
AND CARDINALITY(wcs.child_strategy_ids) <= (
SELECT COUNT(*)
FROM relevant_tasks_for_dags rt
)
GROUP BY
wcs.strategy_id,
wcs.workflow_version_id,
wcs.workflow_run_id
), final_concurrency_slots_for_tasks AS (
-- If the workflow run id corresponds to a single task, we can safely delete the workflow concurrency slot
SELECT
wcs.strategy_id,
wcs.workflow_version_id,
wcs.workflow_run_id
FROM
v1_workflow_concurrency_slot wcs
JOIN
v1_lookup_table lt ON wcs.workflow_run_id = lt.external_id AND lt.task_id IS NOT NULL
WHERE
wcs.strategy_id = p_strategy_id
AND wcs.workflow_version_id = p_workflow_version_id
AND wcs.workflow_run_id = p_workflow_run_id
), all_parent_slots_to_delete AS (
SELECT
strategy_id,
workflow_version_id,
workflow_run_id
FROM
final_concurrency_slots_for_dags
UNION ALL
SELECT
strategy_id,
workflow_version_id,
workflow_run_id
FROM
final_concurrency_slots_for_tasks
), locked_parent_slots AS (
SELECT
wcs.strategy_id,
wcs.workflow_version_id,
wcs.workflow_run_id
FROM
v1_workflow_concurrency_slot wcs
JOIN
all_parent_slots_to_delete ps ON (wcs.strategy_id, wcs.workflow_version_id, wcs.workflow_run_id) = (ps.strategy_id, ps.workflow_version_id, ps.workflow_run_id)
ORDER BY
wcs.strategy_id,
wcs.workflow_version_id,
wcs.workflow_run_id
FOR UPDATE
)
DELETE FROM
v1_workflow_concurrency_slot wcs
WHERE
(strategy_id, workflow_version_id, workflow_run_id) IN (
SELECT
strategy_id,
workflow_version_id,
workflow_run_id
FROM
locked_parent_slots
);
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION after_v1_concurrency_slot_delete_function()
RETURNS trigger AS $$
DECLARE
rec RECORD;
BEGIN
FOR rec IN SELECT * FROM deleted_rows ORDER BY parent_strategy_id, workflow_version_id, workflow_run_id LOOP
IF rec.parent_strategy_id IS NOT NULL THEN
PERFORM cleanup_workflow_concurrency_slots(
rec.parent_strategy_id,
rec.workflow_version_id,
rec.workflow_run_id
);
END IF;
END LOOP;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER after_v1_concurrency_slot_delete
AFTER DELETE ON v1_concurrency_slot
REFERENCING OLD TABLE AS deleted_rows
FOR EACH STATEMENT
EXECUTE FUNCTION after_v1_concurrency_slot_delete_function();
CREATE TABLE v1_retry_queue_item (
task_id BIGINT NOT NULL,
task_inserted_at TIMESTAMPTZ NOT NULL,
task_retry_count INTEGER NOT NULL,
retry_after TIMESTAMPTZ NOT NULL,
tenant_id UUID NOT NULL,
CONSTRAINT v1_retry_queue_item_pkey PRIMARY KEY (task_id, task_inserted_at, task_retry_count)
);
CREATE INDEX v1_retry_queue_item_tenant_id_retry_after_idx ON v1_retry_queue_item (tenant_id ASC, retry_after ASC);
CREATE OR REPLACE FUNCTION v1_task_insert_function()
RETURNS TRIGGER AS $$
DECLARE
rec RECORD;
BEGIN
WITH new_slot_rows AS (
SELECT
id,
inserted_at,
retry_count,
tenant_id,
priority,
concurrency_parent_strategy_ids[1] AS parent_strategy_id,
CASE
WHEN array_length(concurrency_parent_strategy_ids, 1) > 1 THEN concurrency_parent_strategy_ids[2:array_length(concurrency_parent_strategy_ids, 1)]
ELSE '{}'::bigint[]
END AS next_parent_strategy_ids,
concurrency_strategy_ids[1] AS strategy_id,
external_id,
workflow_run_id,
CASE
WHEN array_length(concurrency_strategy_ids, 1) > 1 THEN concurrency_strategy_ids[2:array_length(concurrency_strategy_ids, 1)]
ELSE '{}'::bigint[]
END AS next_strategy_ids,
concurrency_keys[1] AS key,
CASE
WHEN array_length(concurrency_keys, 1) > 1 THEN concurrency_keys[2:array_length(concurrency_keys, 1)]
ELSE '{}'::text[]
END AS next_keys,
workflow_id,
workflow_version_id,
queue,
CURRENT_TIMESTAMP + convert_duration_to_interval(schedule_timeout) AS schedule_timeout_at
FROM new_table
WHERE initial_state = 'QUEUED' AND concurrency_strategy_ids[1] IS NOT NULL
)
INSERT INTO v1_concurrency_slot (
task_id,
task_inserted_at,
task_retry_count,
external_id,
tenant_id,
workflow_id,
workflow_version_id,
workflow_run_id,
parent_strategy_id,
next_parent_strategy_ids,
strategy_id,
next_strategy_ids,
priority,
key,
next_keys,
queue_to_notify,
schedule_timeout_at
)
SELECT
id,
inserted_at,
retry_count,
external_id,
tenant_id,
workflow_id,
workflow_version_id,
workflow_run_id,
parent_strategy_id,
next_parent_strategy_ids,
strategy_id,
next_strategy_ids,
COALESCE(priority, 1),
key,
next_keys,
queue,
schedule_timeout_at
FROM new_slot_rows;
INSERT INTO v1_queue_item (
tenant_id,
queue,
task_id,
task_inserted_at,
external_id,
action_id,
step_id,
workflow_id,
workflow_run_id,
schedule_timeout_at,
step_timeout,
priority,
sticky,
desired_worker_id,
retry_count
)
SELECT
tenant_id,
queue,
id,
inserted_at,
external_id,
action_id,
step_id,
workflow_id,
workflow_run_id,
CURRENT_TIMESTAMP + convert_duration_to_interval(schedule_timeout),
step_timeout,
COALESCE(priority, 1),
sticky,
desired_worker_id,
retry_count
FROM new_table
WHERE initial_state = 'QUEUED' AND concurrency_strategy_ids[1] IS NULL;
INSERT INTO v1_dag_to_task (
dag_id,
dag_inserted_at,
task_id,
task_inserted_at
)
SELECT
dag_id,
dag_inserted_at,
id,
inserted_at
FROM new_table
WHERE dag_id IS NOT NULL AND dag_inserted_at IS NOT NULL;
INSERT INTO v1_lookup_table (
external_id,
tenant_id,
task_id,
inserted_at
)
SELECT
external_id,
tenant_id,
id,
inserted_at
FROM new_table
ON CONFLICT (external_id) DO NOTHING;
RETURN NULL;
END;
$$
LANGUAGE plpgsql;
CREATE TRIGGER v1_task_insert_trigger
AFTER INSERT ON v1_task
REFERENCING NEW TABLE AS new_table
FOR EACH STATEMENT
EXECUTE PROCEDURE v1_task_insert_function();
CREATE OR REPLACE FUNCTION v1_task_update_function()
RETURNS TRIGGER AS
$$
BEGIN
WITH new_retry_rows AS (
SELECT
nt.id,
nt.inserted_at,
nt.retry_count,
nt.tenant_id,
-- Convert the retry_after based on min(retry_backoff_factor ^ retry_count, retry_max_backoff)
NOW() + (LEAST(nt.retry_max_backoff, POWER(nt.retry_backoff_factor, nt.app_retry_count)) * interval '1 second') AS retry_after
FROM new_table nt
JOIN old_table ot ON ot.id = nt.id
WHERE nt.initial_state = 'QUEUED'
AND nt.retry_backoff_factor IS NOT NULL
AND ot.app_retry_count IS DISTINCT FROM nt.app_retry_count
AND nt.app_retry_count != 0
)
INSERT INTO v1_retry_queue_item (
task_id,
task_inserted_at,
task_retry_count,
retry_after,
tenant_id
)
SELECT
id,
inserted_at,
retry_count,
retry_after,
tenant_id
FROM new_retry_rows;
WITH new_slot_rows AS (
SELECT
nt.id,
nt.inserted_at,
nt.retry_count,
nt.tenant_id,
nt.workflow_run_id,
nt.external_id,
nt.concurrency_parent_strategy_ids[1] AS parent_strategy_id,
CASE
WHEN array_length(nt.concurrency_parent_strategy_ids, 1) > 1 THEN nt.concurrency_parent_strategy_ids[2:array_length(nt.concurrency_parent_strategy_ids, 1)]
ELSE '{}'::bigint[]
END AS next_parent_strategy_ids,
nt.concurrency_strategy_ids[1] AS strategy_id,
CASE
WHEN array_length(nt.concurrency_strategy_ids, 1) > 1 THEN nt.concurrency_strategy_ids[2:array_length(nt.concurrency_strategy_ids, 1)]
ELSE '{}'::bigint[]
END AS next_strategy_ids,
nt.concurrency_keys[1] AS key,
CASE
WHEN array_length(nt.concurrency_keys, 1) > 1 THEN nt.concurrency_keys[2:array_length(nt.concurrency_keys, 1)]
ELSE '{}'::text[]
END AS next_keys,
nt.workflow_id,
nt.workflow_version_id,
nt.queue,
CURRENT_TIMESTAMP + convert_duration_to_interval(nt.schedule_timeout) AS schedule_timeout_at
FROM new_table nt
JOIN old_table ot ON ot.id = nt.id
WHERE nt.initial_state = 'QUEUED'
-- Concurrency strategy id should never be null
AND nt.concurrency_strategy_ids[1] IS NOT NULL
AND (nt.retry_backoff_factor IS NULL OR ot.app_retry_count IS NOT DISTINCT FROM nt.app_retry_count OR nt.app_retry_count = 0)
AND ot.retry_count IS DISTINCT FROM nt.retry_count
), updated_slot AS (
UPDATE
v1_concurrency_slot cs
SET
task_retry_count = nt.retry_count,
schedule_timeout_at = nt.schedule_timeout_at,
is_filled = FALSE,
priority = 4
FROM
new_slot_rows nt
WHERE
cs.task_id = nt.id
AND cs.task_inserted_at = nt.inserted_at
AND cs.strategy_id = nt.strategy_id
RETURNING cs.*
), slots_to_insert AS (
-- select the rows that were not updated
SELECT
nt.*
FROM
new_slot_rows nt
LEFT JOIN
updated_slot cs ON cs.task_id = nt.id AND cs.task_inserted_at = nt.inserted_at AND cs.strategy_id = nt.strategy_id
WHERE
cs.task_id IS NULL
)
INSERT INTO v1_concurrency_slot (
task_id,
task_inserted_at,
task_retry_count,
external_id,
tenant_id,
workflow_id,
workflow_version_id,
workflow_run_id,
parent_strategy_id,
next_parent_strategy_ids,
strategy_id,
next_strategy_ids,
priority,
key,
next_keys,
queue_to_notify,
schedule_timeout_at
)
SELECT
id,
inserted_at,
retry_count,
external_id,
tenant_id,
workflow_id,
workflow_version_id,
workflow_run_id,
parent_strategy_id,
next_parent_strategy_ids,
strategy_id,
next_strategy_ids,
4,
key,
next_keys,
queue,
schedule_timeout_at
FROM slots_to_insert;
INSERT INTO v1_queue_item (
tenant_id,
queue,
task_id,
task_inserted_at,
external_id,
action_id,
step_id,
workflow_id,
workflow_run_id,
schedule_timeout_at,
step_timeout,
priority,
sticky,
desired_worker_id,
retry_count
)
SELECT
nt.tenant_id,
nt.queue,
nt.id,
nt.inserted_at,
nt.external_id,
nt.action_id,
nt.step_id,
nt.workflow_id,
nt.workflow_run_id,
CURRENT_TIMESTAMP + convert_duration_to_interval(nt.schedule_timeout),
nt.step_timeout,
4,
nt.sticky,
nt.desired_worker_id,
nt.retry_count
FROM new_table nt
JOIN old_table ot ON ot.id = nt.id
WHERE nt.initial_state = 'QUEUED'
AND nt.concurrency_strategy_ids[1] IS NULL
AND (nt.retry_backoff_factor IS NULL OR ot.app_retry_count IS NOT DISTINCT FROM nt.app_retry_count OR nt.app_retry_count = 0)
AND ot.retry_count IS DISTINCT FROM nt.retry_count;
RETURN NULL;
END;
$$
LANGUAGE plpgsql;
CREATE TRIGGER v1_task_update_trigger
AFTER UPDATE ON v1_task
REFERENCING NEW TABLE AS new_table OLD TABLE AS old_table
FOR EACH STATEMENT
EXECUTE PROCEDURE v1_task_update_function();
CREATE OR REPLACE FUNCTION v1_retry_queue_item_delete_function()
RETURNS TRIGGER AS
$$
BEGIN
WITH new_slot_rows AS (
SELECT
t.id,
t.inserted_at,
t.retry_count,
t.tenant_id,
t.workflow_run_id,
t.external_id,
t.concurrency_parent_strategy_ids[1] AS parent_strategy_id,
CASE
WHEN array_length(t.concurrency_parent_strategy_ids, 1) > 1 THEN t.concurrency_parent_strategy_ids[2:array_length(t.concurrency_parent_strategy_ids, 1)]
ELSE '{}'::bigint[]
END AS next_parent_strategy_ids,
t.concurrency_strategy_ids[1] AS strategy_id,
CASE
WHEN array_length(t.concurrency_strategy_ids, 1) > 1 THEN t.concurrency_strategy_ids[2:array_length(t.concurrency_strategy_ids, 1)]
ELSE '{}'::bigint[]
END AS next_strategy_ids,
t.concurrency_keys[1] AS key,
CASE
WHEN array_length(t.concurrency_keys, 1) > 1 THEN t.concurrency_keys[2:array_length(t.concurrency_keys, 1)]
ELSE '{}'::text[]
END AS next_keys,
t.workflow_id,
t.workflow_version_id,
t.queue,
CURRENT_TIMESTAMP + convert_duration_to_interval(t.schedule_timeout) AS schedule_timeout_at
FROM deleted_rows dr
JOIN
v1_task t ON t.id = dr.task_id AND t.inserted_at = dr.task_inserted_at
WHERE
dr.retry_after <= NOW()
AND t.initial_state = 'QUEUED'
-- Check to see if the task has a concurrency strategy
AND t.concurrency_strategy_ids[1] IS NOT NULL
)
INSERT INTO v1_concurrency_slot (
task_id,
task_inserted_at,
task_retry_count,
external_id,
tenant_id,
workflow_id,
workflow_version_id,
workflow_run_id,
parent_strategy_id,
next_parent_strategy_ids,
strategy_id,
next_strategy_ids,
priority,
key,
next_keys,
queue_to_notify,
schedule_timeout_at
)
SELECT
id,
inserted_at,
retry_count,
external_id,
tenant_id,
workflow_id,
workflow_version_id,
workflow_run_id,
parent_strategy_id,
next_parent_strategy_ids,
strategy_id,
next_strategy_ids,
4,
key,
next_keys,
queue,
schedule_timeout_at
FROM new_slot_rows;
WITH tasks AS (
SELECT
t.*
FROM
deleted_rows dr
JOIN v1_task t ON t.id = dr.task_id AND t.inserted_at = dr.task_inserted_at
WHERE
dr.retry_after <= NOW()
AND t.initial_state = 'QUEUED'
AND t.concurrency_strategy_ids[1] IS NULL
)
INSERT INTO v1_queue_item (
tenant_id,
queue,
task_id,
task_inserted_at,
external_id,
action_id,
step_id,
workflow_id,
workflow_run_id,
schedule_timeout_at,
step_timeout,
priority,
sticky,
desired_worker_id,
retry_count
)
SELECT
tenant_id,
queue,
id,
inserted_at,
external_id,
action_id,
step_id,
workflow_id,
workflow_run_id,
CURRENT_TIMESTAMP + convert_duration_to_interval(schedule_timeout),
step_timeout,
4,
sticky,
desired_worker_id,
retry_count
FROM tasks;
RETURN NULL;
END;
$$
LANGUAGE plpgsql;
CREATE TRIGGER v1_retry_queue_item_delete_trigger
AFTER DELETE ON v1_retry_queue_item
REFERENCING OLD TABLE AS deleted_rows
FOR EACH STATEMENT
EXECUTE PROCEDURE v1_retry_queue_item_delete_function();
CREATE OR REPLACE FUNCTION v1_concurrency_slot_update_function()
RETURNS TRIGGER AS
$$
BEGIN
-- If the concurrency slot has next_keys, insert a new slot for the next key
WITH new_slot_rows AS (
SELECT
t.id,
t.inserted_at,
t.retry_count,
t.tenant_id,
t.priority,
t.queue,
t.workflow_run_id,
t.external_id,
nt.next_parent_strategy_ids[1] AS parent_strategy_id,
CASE
WHEN array_length(nt.next_parent_strategy_ids, 1) > 1 THEN nt.next_parent_strategy_ids[2:array_length(nt.next_parent_strategy_ids, 1)]
ELSE '{}'::bigint[]
END AS next_parent_strategy_ids,
nt.next_strategy_ids[1] AS strategy_id,
CASE
WHEN array_length(nt.next_strategy_ids, 1) > 1 THEN nt.next_strategy_ids[2:array_length(nt.next_strategy_ids, 1)]
ELSE '{}'::bigint[]
END AS next_strategy_ids,
nt.next_keys[1] AS key,
CASE
WHEN array_length(nt.next_keys, 1) > 1 THEN nt.next_keys[2:array_length(nt.next_keys, 1)]
ELSE '{}'::text[]
END AS next_keys,
t.workflow_id,
t.workflow_version_id,
CURRENT_TIMESTAMP + convert_duration_to_interval(t.schedule_timeout) AS schedule_timeout_at
FROM new_table nt
JOIN old_table ot USING (task_id, task_inserted_at, task_retry_count, key)
JOIN v1_task t ON t.id = nt.task_id AND t.inserted_at = nt.task_inserted_at
WHERE
COALESCE(array_length(nt.next_keys, 1), 0) != 0
AND nt.is_filled = TRUE
AND nt.is_filled IS DISTINCT FROM ot.is_filled
)
INSERT INTO v1_concurrency_slot (
task_id,
task_inserted_at,
task_retry_count,
external_id,
tenant_id,
workflow_id,
workflow_version_id,
workflow_run_id,
parent_strategy_id,
next_parent_strategy_ids,
strategy_id,
next_strategy_ids,
priority,
key,
next_keys,
schedule_timeout_at,
queue_to_notify
)
SELECT
id,
inserted_at,
retry_count,
external_id,
tenant_id,
workflow_id,
workflow_version_id,
workflow_run_id,
parent_strategy_id,
next_parent_strategy_ids,
strategy_id,
next_strategy_ids,
COALESCE(priority, 1),
key,
next_keys,
schedule_timeout_at,
queue
FROM new_slot_rows;
-- If the concurrency slot does not have next_keys, insert an item into v1_queue_item
WITH tasks AS (
SELECT
t.*
FROM
new_table nt
JOIN old_table ot USING (task_id, task_inserted_at, task_retry_count, key)
JOIN v1_task t ON t.id = nt.task_id AND t.inserted_at = nt.task_inserted_at
WHERE
COALESCE(array_length(nt.next_keys, 1), 0) = 0
AND nt.is_filled = TRUE
AND nt.is_filled IS DISTINCT FROM ot.is_filled
)
INSERT INTO v1_queue_item (
tenant_id,
queue,
task_id,
task_inserted_at,
external_id,
action_id,
step_id,
workflow_id,
workflow_run_id,
schedule_timeout_at,
step_timeout,
priority,
sticky,
desired_worker_id,
retry_count
)
SELECT
tenant_id,
queue,
id,
inserted_at,
external_id,
action_id,
step_id,
workflow_id,
workflow_run_id,
CURRENT_TIMESTAMP + convert_duration_to_interval(schedule_timeout),
step_timeout,
COALESCE(priority, 1),
sticky,
desired_worker_id,
retry_count
FROM tasks;
RETURN NULL;
END;
$$
LANGUAGE plpgsql;
CREATE TRIGGER v1_concurrency_slot_update_trigger
AFTER UPDATE ON v1_concurrency_slot
REFERENCING NEW TABLE AS new_table OLD TABLE AS old_table
FOR EACH STATEMENT
EXECUTE PROCEDURE v1_concurrency_slot_update_function();
CREATE OR REPLACE FUNCTION v1_dag_insert_function()
RETURNS TRIGGER AS
$$
BEGIN
INSERT INTO v1_lookup_table (
external_id,
tenant_id,
dag_id,
inserted_at
)
SELECT
external_id,
tenant_id,
id,
inserted_at
FROM new_table
ON CONFLICT (external_id) DO NOTHING;
RETURN NULL;
END;
$$
LANGUAGE plpgsql;
CREATE TRIGGER v1_dag_insert_trigger
AFTER INSERT ON v1_dag
REFERENCING NEW TABLE AS new_table
FOR EACH STATEMENT
EXECUTE PROCEDURE v1_dag_insert_function();
CREATE TYPE v1_log_line_level AS ENUM ('DEBUG', 'INFO', 'WARN', 'ERROR');
CREATE TABLE v1_log_line (
id BIGINT GENERATED ALWAYS AS IDENTITY,
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
tenant_id UUID NOT NULL,
task_id BIGINT NOT NULL,
task_inserted_at TIMESTAMPTZ NOT NULL,
message TEXT NOT NULL,
level v1_log_line_level NOT NULL DEFAULT 'INFO',
metadata JSONB,
retry_count INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (task_id, task_inserted_at, id)
) PARTITION BY RANGE(task_inserted_at);
CREATE TYPE v1_step_match_condition_kind AS ENUM ('PARENT_OVERRIDE', 'USER_EVENT', 'SLEEP');
CREATE TABLE v1_step_match_condition (
id BIGINT GENERATED ALWAYS AS IDENTITY,
tenant_id UUID NOT NULL,
step_id UUID NOT NULL,
readable_data_key TEXT NOT NULL,
action v1_match_condition_action NOT NULL DEFAULT 'CREATE',
or_group_id UUID NOT NULL,
expression TEXT,
kind v1_step_match_condition_kind NOT NULL,
-- If this is a SLEEP condition, this will be set to the sleep duration
sleep_duration TEXT,
-- If this is a USER_EVENT condition, this will be set to the user event key
event_key TEXT,
-- If this is a PARENT_OVERRIDE condition, this will be set to the parent readable_id
parent_readable_id TEXT,
PRIMARY KEY (step_id, id)
);
CREATE TABLE v1_durable_sleep (
id BIGINT GENERATED ALWAYS AS IDENTITY,
tenant_id UUID NOT NULL,
sleep_until TIMESTAMPTZ NOT NULL,
sleep_duration TEXT NOT NULL,
PRIMARY KEY (tenant_id, sleep_until, id)
);
CREATE TYPE v1_payload_type AS ENUM ('TASK_INPUT', 'DAG_INPUT', 'TASK_OUTPUT', 'TASK_EVENT_DATA');
-- IMPORTANT: Keep these values in sync with `v1_payload_type_olap` in the OLAP db
CREATE TYPE v1_payload_location AS ENUM ('INLINE', 'EXTERNAL');
CREATE TABLE v1_payload (
tenant_id UUID NOT NULL,
id BIGINT NOT NULL,
inserted_at TIMESTAMPTZ NOT NULL,
external_id UUID,
type v1_payload_type NOT NULL,
location v1_payload_location NOT NULL,
external_location_key TEXT,
inline_content JSONB,
updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (tenant_id, inserted_at, id, type),
CHECK (
location = 'INLINE'
OR
(location = 'EXTERNAL' AND inline_content IS NULL AND external_location_key IS NOT NULL)
)
) PARTITION BY RANGE(inserted_at);
CREATE TYPE v1_payload_wal_operation AS ENUM ('CREATE', 'UPDATE', 'DELETE');
CREATE TABLE v1_payload_wal (
tenant_id UUID NOT NULL,
offload_at TIMESTAMPTZ NOT NULL,
payload_id BIGINT NOT NULL,
payload_inserted_at TIMESTAMPTZ NOT NULL,
payload_type v1_payload_type NOT NULL,
operation v1_payload_wal_operation NOT NULL DEFAULT 'CREATE',
-- todo: we probably should shuffle this index around - it'd make more sense now for the order to be
-- (tenant_id, offload_at, payload_id, payload_inserted_at, payload_type)
-- so we can filter by the tenant id first, then order by offload_at
PRIMARY KEY (offload_at, tenant_id, payload_id, payload_inserted_at, payload_type)
) PARTITION BY HASH (tenant_id);
CREATE INDEX v1_payload_wal_payload_lookup_idx ON v1_payload_wal (payload_id, payload_inserted_at, payload_type, tenant_id);
CREATE INDEX CONCURRENTLY v1_payload_wal_poll_idx ON v1_payload_wal (tenant_id, offload_at);
SELECT create_v1_hash_partitions('v1_payload_wal'::TEXT, 4);
CREATE TABLE v1_payload_cutover_queue_item (
tenant_id UUID NOT NULL,
cut_over_at TIMESTAMPTZ NOT NULL,
payload_id BIGINT NOT NULL,
payload_inserted_at TIMESTAMPTZ NOT NULL,
payload_type v1_payload_type NOT NULL,
PRIMARY KEY (cut_over_at, tenant_id, payload_id, payload_inserted_at, payload_type)
) PARTITION BY HASH (tenant_id);
SELECT create_v1_hash_partitions('v1_payload_cutover_queue_item'::TEXT, 4);
CREATE OR REPLACE FUNCTION find_matching_tenants_in_payload_wal_partition(
partition_number INT
) RETURNS UUID[]
LANGUAGE plpgsql AS
$$
DECLARE
partition_table text;
result UUID[];
BEGIN
partition_table := 'v1_payload_wal_' || partition_number::text;
EXECUTE format(
'SELECT ARRAY(
SELECT DISTINCT e.tenant_id
FROM %I e
)',
partition_table)
INTO result;
RETURN result;
END;
$$;
CREATE OR REPLACE FUNCTION find_matching_tenants_in_payload_cutover_queue_item_partition(
partition_number INT
) RETURNS UUID[]
LANGUAGE plpgsql AS
$$
DECLARE
partition_table text;
result UUID[];
BEGIN
partition_table := 'v1_payload_cutover_queue_item_' || partition_number::text;
EXECUTE format(
'SELECT ARRAY(
SELECT DISTINCT e.tenant_id
FROM %I e
WHERE e.cut_over_at <= NOW()
)',
partition_table)
INTO result;
RETURN result;
END;
$$;
CREATE TABLE v1_idempotency_key (
tenant_id UUID NOT NULL,
key TEXT NOT NULL,
expires_at TIMESTAMPTZ NOT NULL,
claimed_by_external_id UUID,
inserted_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (tenant_id, expires_at, key)
);
CREATE UNIQUE INDEX v1_idempotency_key_unique_tenant_key ON v1_idempotency_key (tenant_id, key);
-- v1_operation_interval_settings represents the interval settings for a specific tenant. "Operation" means
-- any sort of tenant-specific polling-based operation on the engine, like timeouts, reassigns, etc.
CREATE TABLE v1_operation_interval_settings (
tenant_id UUID NOT NULL,
operation_id TEXT NOT NULL,
-- The interval represents a Go time.Duration, hence the nanoseconds
interval_nanoseconds BIGINT NOT NULL,
PRIMARY KEY (tenant_id, operation_id)
);