Feat: Payload Store Repository (#2047)

* feat: add table for storing payloads

* feat: add payload type enum

* feat: gen sqlc

* feat: initial sql impl

* feat: add payload store repo to shared

* feat: add overwrite

* fix: impl

* feat: bulk op

* feat: initial wiring of inputs for task triggers

* feat: wire up dag matches

* feat: create V1TaskWithPayload and use it everywhere

* fix: couple bugs

* fix: clean up types

* fix: overwrite

* fix: rm input from replay

* fix: move payload store to shared repo

* fix: schema

* refactor: repo setup

* refactor: repos

* fix: gen

* chore: lint

* fix: rename

* feat: naming, write dag inputs

* fix: more naming, trigger bug

* fix: dual writes for now

* fix: pass in tx

* feat: initial work on offloader

* feat: improve external offloader

* fix: some refs

* add withExternalHandler

* fix: improve impl of external store

* feat: implement offloading, fix other impls

* feat: add query to update JSON

* fix: implement offloading + updating records in payloads table

* feat: add WAL table

* feat: add queries for polling WAL and evicting

* feat: wire up writes into WAL

* fix: get job working

* refactor: improve types

* fix: infinite loop

* feat: improve offloading logic to run in two separate txes

* refactor: rework how overrides work

* fix: lint

* fix: migration number

* fix: migration

* fix: migration version

* fix: revert back to reading payloads out

* fix: fall back to previous input, part i

* fix: input fallback

* fix: add back input to replay

* fix: input fallback in dispatcher

* fix: nil check

* feat: advisory locks, part i

* fix: no skip locked

* feat: hash partitioned wal table

* fix: modify queries a bit, tweak crud enum

* fix: pk order, function to find tenants

* feat: wal processing

* fix: only write wal if an external store is enabled, fix offloading logic

* fix: spacing

* feat: schema cleanup

* fix: rm external store loc name

* fix: set content to null when offloading

* fix: cleanup, naming

* fix: pass overwrite payload store along

* debug: add some logging

* Revert "debug: add some logging"

This reverts commit 43e71eadf1.

* fix: typo

* fx: add offloatAt to store opts for offloading

* fix: handle leasing with advisory lock

* fix: struct def

* fix: requeue on payloads not found

* fix: rm hack for triggers

* fix: revert empty input on write

* fix: write input

* feat: env var for enabling / disabling dual writes

* feat: wire up dual writes

* fix: comments

* feat: generics!

* fix: panic from type cast

* fix: migration

* fix: generic

* fix: hack for T key in map

* fix: cleanup
This commit is contained in:
matt
2025-09-12 09:53:01 -04:00
committed by GitHub
parent f385964fcc
commit 92843bb277
32 changed files with 1682 additions and 157 deletions
+62 -1
View File
@@ -58,7 +58,7 @@ BEGIN
END IF;
EXECUTE
format('CREATE TABLE %s (LIKE %s INCLUDING INDEXES)', newTableName, targetTableName);
format('CREATE TABLE %s (LIKE %s INCLUDING INDEXES INCLUDING CONSTRAINTS)', newTableName, targetTableName);
EXECUTE
format('ALTER TABLE %s SET (
autovacuum_vacuum_scale_factor = ''0.1'',
@@ -1629,6 +1629,67 @@ CREATE TABLE v1_durable_sleep (
PRIMARY KEY (tenant_id, sleep_until, id)
);
CREATE TYPE v1_payload_type AS ENUM ('TASK_INPUT', 'DAG_INPUT', 'TASK_OUTPUT');
CREATE TYPE v1_payload_location AS ENUM ('INLINE', 'EXTERNAL');
CREATE TABLE v1_payload (
tenant_id UUID NOT NULL,
id BIGINT NOT NULL,
inserted_at TIMESTAMPTZ NOT NULL,
type v1_payload_type NOT NULL,
location v1_payload_location NOT NULL,
external_location_key TEXT,
inline_content JSONB,
updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (tenant_id, inserted_at, id, type),
CHECK (
(location = 'INLINE' AND inline_content IS NOT NULL AND external_location_key IS NULL)
OR
(location = 'EXTERNAL' AND inline_content IS NULL AND external_location_key IS NOT NULL)
)
) PARTITION BY RANGE(inserted_at);
CREATE TYPE v1_payload_wal_operation AS ENUM ('CREATE', 'UPDATE', 'DELETE');
CREATE TABLE v1_payload_wal (
tenant_id UUID NOT NULL,
offload_at TIMESTAMPTZ NOT NULL,
payload_id BIGINT NOT NULL,
payload_inserted_at TIMESTAMPTZ NOT NULL,
payload_type v1_payload_type NOT NULL,
operation v1_payload_wal_operation NOT NULL,
PRIMARY KEY (offload_at, tenant_id, payload_id, payload_inserted_at, payload_type),
CONSTRAINT "v1_payload_wal_payload" FOREIGN KEY (payload_id, payload_inserted_at, payload_type, tenant_id) REFERENCES v1_payload (id, inserted_at, type, tenant_id) ON DELETE CASCADE
) PARTITION BY HASH (tenant_id);
SELECT create_v1_hash_partitions('v1_payload_wal'::TEXT, 4);
CREATE OR REPLACE FUNCTION find_matching_tenants_in_payload_wal_partition(
partition_number INT
) RETURNS UUID[]
LANGUAGE plpgsql AS
$$
DECLARE
partition_table text;
result UUID[];
BEGIN
partition_table := 'v1_payload_wal_' || partition_number::text;
EXECUTE format(
'SELECT ARRAY(
SELECT DISTINCT e.tenant_id
FROM %I e
WHERE e.offload_at < NOW()
)',
partition_table)
INTO result;
RETURN result;
END;
$$;
CREATE TABLE v1_idempotency_key (
tenant_id UUID NOT NULL,