Feat: Job for payload cutovers to external (#2586)

* feat: initial payload cutover job

* refactor: fix a couple things

* feat: start wiring up writes

* feat: only run job if external store is enabled

* fix: add some notes, add loop

* feat: function for reading out payloads

* fix: date handling, logging

* feat: remove wal and immediate offloads

* feat: advisory lock

* feat: partition swap logic

* fix: rm debug

* fix: add todo

* fix: sql cleanup

* fix: sql cleanup, ii

* chore: nuke a bunch of WAL stuff

* chore: more wal

* feat: trigger for crud opts

* feat: drop trigger + function in swapover

* feat: move autovac to later

* feat: use unlogged table initially

* feat: update migration

* fix: drop trigger

* fix: use insert + on conflict

* fix: types

* refactor: clean up a bit

* fix: panic

* fix: detach partition before dropping

* feat: configurable batch size

* feat: offset tracking in the db

* feat: explicitly lock

* fix: down migration

* fix: bug

* fix: offset handling

* fix: try explicit ordering of the insert

* fix: lock location

* fix: do less stuff after locking

* fix: ordering

* fix: dont drop and recreate if temp table exists

* fix: explicitly track completed status

* fix: table name

* fix: dont use unlogged table

* fix: rm todos

* chore: lint

* feat: configurable delay

* fix: use date as pk instead of varchar

* fix: daily job

* fix: hack check constraint to speed up partition attach

* fix: syntax

* fix: syntax

* fix: drop constraint after attaching

* fix: syntax

* fix: drop triggers properly

* fix: factor out insert logic

* refactor: factor out loop logic

* refactor: factor out job preparation work

* fix: ordering

* fix: run the job more often

* fix: use `WithSingletonMode`

* fix: singleton mode sig

* fix: env var cleanup

* fix: overwrite sig

* fix: re-enable immediate offloads with a flag

* fix: order, offload at logic

* feat: add count query to compare

* fix: row-level triggers, partition time bug

* fix: rm todo

* fix: for true

* fix: handle lock not acquired

* fix: handle error

* fix: comment
This commit is contained in:
matt
2025-12-05 10:54:26 -05:00
committed by GitHub
parent 8be9d21ec2
commit 18940869ae
12 changed files with 1230 additions and 491 deletions

View File

@@ -0,0 +1,258 @@
-- +goose Up
-- +goose StatementBegin
CREATE TABLE v1_payload_cutover_job_offset (
key DATE PRIMARY KEY,
last_offset BIGINT NOT NULL,
is_completed BOOLEAN NOT NULL DEFAULT FALSE
);
CREATE OR REPLACE FUNCTION copy_v1_payload_partition_structure(
partition_date date
) RETURNS text
LANGUAGE plpgsql AS
$$
DECLARE
partition_date_str varchar;
source_partition_name varchar;
target_table_name varchar;
trigger_function_name varchar;
trigger_name varchar;
partition_start date;
partition_end date;
BEGIN
SELECT to_char(partition_date, 'YYYYMMDD') INTO partition_date_str;
SELECT format('v1_payload_%s', partition_date_str) INTO source_partition_name;
SELECT format('v1_payload_offload_tmp_%s', partition_date_str) INTO target_table_name;
SELECT format('sync_to_%s', target_table_name) INTO trigger_function_name;
SELECT format('trigger_sync_to_%s', target_table_name) INTO trigger_name;
partition_start := partition_date;
partition_end := partition_date + INTERVAL '1 day';
IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = source_partition_name) THEN
RAISE EXCEPTION 'Source partition % does not exist', source_partition_name;
END IF;
IF EXISTS (SELECT 1 FROM pg_tables WHERE tablename = target_table_name) THEN
RAISE NOTICE 'Target table % already exists, skipping creation', target_table_name;
RETURN target_table_name;
END IF;
EXECUTE format(
'CREATE TABLE %I (LIKE %I INCLUDING DEFAULTS INCLUDING CONSTRAINTS INCLUDING INDEXES)',
target_table_name,
source_partition_name
);
EXECUTE format('
ALTER TABLE %I
ADD CONSTRAINT %I
CHECK (
inserted_at IS NOT NULL
AND inserted_at >= %L::TIMESTAMPTZ
AND inserted_at < %L::TIMESTAMPTZ
)
',
target_table_name,
target_table_name || '_iat_chk_bounds',
partition_start,
partition_end
);
EXECUTE format('
CREATE OR REPLACE FUNCTION %I() RETURNS trigger
LANGUAGE plpgsql AS $func$
BEGIN
IF TG_OP = ''INSERT'' THEN
INSERT INTO %I (tenant_id, id, inserted_at, external_id, type, location, external_location_key, inline_content, updated_at)
VALUES (NEW.tenant_id, NEW.id, NEW.inserted_at, NEW.external_id, NEW.type, NEW.location, NEW.external_location_key, NEW.inline_content, NEW.updated_at)
ON CONFLICT (tenant_id, id, inserted_at, type) DO UPDATE
SET
location = EXCLUDED.location,
external_location_key = EXCLUDED.external_location_key,
inline_content = EXCLUDED.inline_content,
updated_at = EXCLUDED.updated_at;
RETURN NEW;
ELSIF TG_OP = ''UPDATE'' THEN
UPDATE %I
SET
location = NEW.location,
external_location_key = NEW.external_location_key,
inline_content = NEW.inline_content,
updated_at = NEW.updated_at
WHERE
tenant_id = NEW.tenant_id
AND id = NEW.id
AND inserted_at = NEW.inserted_at
AND type = NEW.type;
RETURN NEW;
ELSIF TG_OP = ''DELETE'' THEN
DELETE FROM %I
WHERE
tenant_id = OLD.tenant_id
AND id = OLD.id
AND inserted_at = OLD.inserted_at
AND type = OLD.type;
RETURN OLD;
END IF;
RETURN NULL;
END;
$func$;
', trigger_function_name, target_table_name, target_table_name, target_table_name);
EXECUTE format('DROP TRIGGER IF EXISTS %I ON %I', trigger_name, source_partition_name);
EXECUTE format('
CREATE TRIGGER %I
AFTER INSERT OR UPDATE OR DELETE ON %I
FOR EACH ROW
EXECUTE FUNCTION %I();
', trigger_name, source_partition_name, trigger_function_name);
RAISE NOTICE 'Created table % as a copy of partition % with sync trigger', target_table_name, source_partition_name;
RETURN target_table_name;
END;
$$;
CREATE OR REPLACE FUNCTION list_paginated_payloads_for_offload(
partition_date date,
limit_param int,
offset_param bigint
) RETURNS TABLE (
tenant_id UUID,
id BIGINT,
inserted_at TIMESTAMPTZ,
external_id UUID,
type v1_payload_type,
location v1_payload_location,
external_location_key TEXT,
inline_content JSONB,
updated_at TIMESTAMPTZ
)
LANGUAGE plpgsql AS
$$
DECLARE
partition_date_str varchar;
source_partition_name varchar;
query text;
BEGIN
IF partition_date IS NULL THEN
RAISE EXCEPTION 'partition_date parameter cannot be NULL';
END IF;
SELECT to_char(partition_date, 'YYYYMMDD') INTO partition_date_str;
SELECT format('v1_payload_%s', partition_date_str) INTO source_partition_name;
IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = source_partition_name) THEN
RAISE EXCEPTION 'Partition % does not exist', source_partition_name;
END IF;
query := format('
SELECT tenant_id, id, inserted_at, external_id, type, location,
external_location_key, inline_content, updated_at
FROM %I
ORDER BY tenant_id, inserted_at, id, type
LIMIT $1
OFFSET $2
', source_partition_name);
RETURN QUERY EXECUTE query USING limit_param, offset_param;
END;
$$;
CREATE OR REPLACE FUNCTION swap_v1_payload_partition_with_temp(
partition_date date
) RETURNS text
LANGUAGE plpgsql AS
$$
DECLARE
partition_date_str varchar;
source_partition_name varchar;
temp_table_name varchar;
old_pk_name varchar;
new_pk_name varchar;
partition_start date;
partition_end date;
trigger_function_name varchar;
trigger_name varchar;
BEGIN
IF partition_date IS NULL THEN
RAISE EXCEPTION 'partition_date parameter cannot be NULL';
END IF;
SELECT to_char(partition_date, 'YYYYMMDD') INTO partition_date_str;
SELECT format('v1_payload_%s', partition_date_str) INTO source_partition_name;
SELECT format('v1_payload_offload_tmp_%s', partition_date_str) INTO temp_table_name;
SELECT format('v1_payload_offload_tmp_%s_pkey', partition_date_str) INTO old_pk_name;
SELECT format('v1_payload_%s_pkey', partition_date_str) INTO new_pk_name;
SELECT format('sync_to_%s', temp_table_name) INTO trigger_function_name;
SELECT format('trigger_sync_to_%s', temp_table_name) INTO trigger_name;
IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = temp_table_name) THEN
RAISE EXCEPTION 'Temp table % does not exist', temp_table_name;
END IF;
partition_start := partition_date;
partition_end := partition_date + INTERVAL '1 day';
EXECUTE format(
'ALTER TABLE %I SET (
autovacuum_vacuum_scale_factor = ''0.1'',
autovacuum_analyze_scale_factor = ''0.05'',
autovacuum_vacuum_threshold = ''25'',
autovacuum_analyze_threshold = ''25'',
autovacuum_vacuum_cost_delay = ''10'',
autovacuum_vacuum_cost_limit = ''1000''
)',
temp_table_name
);
RAISE NOTICE 'Set autovacuum settings on partition %', temp_table_name;
LOCK TABLE v1_payload IN ACCESS EXCLUSIVE MODE;
RAISE NOTICE 'Dropping trigger from partition %', source_partition_name;
EXECUTE format('DROP TRIGGER IF EXISTS %I ON %I', trigger_name, source_partition_name);
RAISE NOTICE 'Dropping trigger function %', trigger_function_name;
EXECUTE format('DROP FUNCTION IF EXISTS %I()', trigger_function_name);
IF EXISTS (SELECT 1 FROM pg_tables WHERE tablename = source_partition_name) THEN
RAISE NOTICE 'Dropping old partition %', source_partition_name;
EXECUTE format('ALTER TABLE v1_payload DETACH PARTITION %I', source_partition_name);
EXECUTE format('DROP TABLE %I CASCADE', source_partition_name);
END IF;
RAISE NOTICE 'Renaming primary key % to %', old_pk_name, new_pk_name;
EXECUTE format('ALTER INDEX %I RENAME TO %I', old_pk_name, new_pk_name);
RAISE NOTICE 'Renaming temp table % to %', temp_table_name, source_partition_name;
EXECUTE format('ALTER TABLE %I RENAME TO %I', temp_table_name, source_partition_name);
RAISE NOTICE 'Attaching new partition % to v1_payload', source_partition_name;
EXECUTE format(
'ALTER TABLE v1_payload ATTACH PARTITION %I FOR VALUES FROM (%L) TO (%L)',
source_partition_name,
partition_start,
partition_end
);
RAISE NOTICE 'Dropping hack check constraint';
EXECUTE format(
'ALTER TABLE %I DROP CONSTRAINT %I',
source_partition_name,
temp_table_name || '_iat_chk_bounds'
);
RAISE NOTICE 'Successfully swapped partition %', source_partition_name;
RETURN source_partition_name;
END;
$$;
-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
DROP TABLE v1_payload_cutover_job_offset;
DROP FUNCTION copy_v1_payload_partition_structure(date);
DROP FUNCTION list_paginated_payloads_for_offload(date, int, bigint);
DROP FUNCTION swap_v1_payload_partition_with_temp(date);
-- +goose StatementEnd

View File

@@ -42,29 +42,27 @@ type TasksController interface {
}
type TasksControllerImpl struct {
mq msgqueue.MessageQueue
pubBuffer *msgqueue.MQPubBuffer
l *zerolog.Logger
queueLogger *zerolog.Logger
pgxStatsLogger *zerolog.Logger
repo repository.EngineRepository
repov1 v1.Repository
dv datautils.DataDecoderValidator
s gocron.Scheduler
a *hatcheterrors.Wrapped
p *partition.Partition
celParser *cel.CELParser
opsPoolPollInterval time.Duration
opsPoolJitter time.Duration
timeoutTaskOperations *operation.OperationPool
reassignTaskOperations *operation.OperationPool
retryTaskOperations *operation.OperationPool
emitSleepOperations *operation.OperationPool
evictExpiredIdempotencyKeysOperations *operation.OperationPool
processPayloadWALOperations *queueutils.OperationPool[int64]
processPayloadExternalCutoversOperations *queueutils.OperationPool[int64]
replayEnabled bool
analyzeCronInterval time.Duration
mq msgqueue.MessageQueue
pubBuffer *msgqueue.MQPubBuffer
l *zerolog.Logger
queueLogger *zerolog.Logger
pgxStatsLogger *zerolog.Logger
repo repository.EngineRepository
repov1 v1.Repository
dv datautils.DataDecoderValidator
s gocron.Scheduler
a *hatcheterrors.Wrapped
p *partition.Partition
celParser *cel.CELParser
opsPoolPollInterval time.Duration
opsPoolJitter time.Duration
timeoutTaskOperations *operation.OperationPool
reassignTaskOperations *operation.OperationPool
retryTaskOperations *operation.OperationPool
emitSleepOperations *operation.OperationPool
evictExpiredIdempotencyKeysOperations *operation.OperationPool
replayEnabled bool
analyzeCronInterval time.Duration
}
type TasksControllerOpt func(*TasksControllerOpts)
@@ -284,9 +282,6 @@ func New(fs ...TasksControllerOpt) (*TasksControllerImpl, error) {
opts.repov1.Tasks().DefaultTaskActivityGauge,
))
t.processPayloadWALOperations = queueutils.NewOperationPool(opts.l, timeout, "process payload WAL", t.processPayloadWAL).WithJitter(jitter)
t.processPayloadExternalCutoversOperations = queueutils.NewOperationPool(opts.l, timeout, "process payload external cutovers", t.processPayloadExternalCutovers).WithJitter(jitter)
return t, nil
}
@@ -332,29 +327,12 @@ func (tc *TasksControllerImpl) Start() (func() error, error) {
return nil, wrappedErr
}
_, err = tc.s.NewJob(
gocron.DurationJob(tc.repov1.Payloads().WALProcessInterval()),
gocron.NewTask(
tc.runProcessPayloadWAL(spanContext),
),
)
if err != nil {
wrappedErr := fmt.Errorf("could not schedule process payload WAL: %w", err)
cancel()
span.RecordError(err)
span.SetStatus(codes.Error, "could not run process payload WAL")
span.End()
return nil, wrappedErr
}
_, err = tc.s.NewJob(
gocron.DurationJob(tc.repov1.Payloads().ExternalCutoverProcessInterval()),
gocron.NewTask(
tc.runProcessPayloadExternalCutovers(spanContext),
tc.processPayloadExternalCutovers(spanContext),
),
gocron.WithSingletonMode(gocron.LimitModeReschedule),
)
if err != nil {

View File

@@ -2,40 +2,24 @@ package task
import (
"context"
"github.com/hatchet-dev/hatchet/pkg/telemetry"
"go.opentelemetry.io/otel/codes"
)
func (tc *TasksControllerImpl) runProcessPayloadWAL(ctx context.Context) func() {
func (tc *TasksControllerImpl) processPayloadExternalCutovers(ctx context.Context) func() {
return func() {
tc.l.Debug().Msgf("processing payload WAL")
ctx, span := telemetry.NewSpan(ctx, "TasksControllerImpl.processPayloadExternalCutovers")
defer span.End()
partitions := []int64{0, 1, 2, 3}
tc.l.Debug().Msgf("payload external cutover: processing external cutover payloads")
tc.processPayloadWALOperations.SetPartitions(partitions)
err := tc.repov1.Payloads().CopyOffloadedPayloadsIntoTempTable(ctx)
for _, partitionId := range partitions {
tc.processPayloadWALOperations.RunOrContinue(partitionId)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "could not process external cutover payloads")
tc.l.Error().Err(err).Msg("could not process external cutover payloads")
}
}
}
func (tc *TasksControllerImpl) processPayloadWAL(ctx context.Context, partitionNumber int64) (bool, error) {
return tc.repov1.Payloads().ProcessPayloadWAL(ctx, partitionNumber, tc.pubBuffer)
}
func (tc *TasksControllerImpl) runProcessPayloadExternalCutovers(ctx context.Context) func() {
return func() {
tc.l.Debug().Msgf("processing payload external cutovers")
partitions := []int64{0, 1, 2, 3}
tc.processPayloadExternalCutoversOperations.SetPartitions(partitions)
for _, partitionId := range partitions {
tc.processPayloadExternalCutoversOperations.RunOrContinue(partitionId)
}
}
}
func (tc *TasksControllerImpl) processPayloadExternalCutovers(ctx context.Context, partitionNumber int64) (bool, error) {
return tc.repov1.Payloads().ProcessPayloadExternalCutovers(ctx, partitionNumber)
}

View File

@@ -293,15 +293,17 @@ func (c *ConfigLoader) InitDataLayer() (res *database.Layer, err error) {
DurableSleepLimit: scf.Runtime.TaskOperationLimits.DurableSleepLimit,
}
inlineStoreTTL := time.Duration(scf.PayloadStore.InlineStoreTTLDays) * 24 * time.Hour
payloadStoreOpts := repov1.PayloadStoreRepositoryOpts{
EnablePayloadDualWrites: scf.PayloadStore.EnablePayloadDualWrites,
EnableTaskEventPayloadDualWrites: scf.PayloadStore.EnableTaskEventPayloadDualWrites,
EnableOLAPPayloadDualWrites: scf.PayloadStore.EnableOLAPPayloadDualWrites,
EnableDagDataPayloadDualWrites: scf.PayloadStore.EnableDagDataPayloadDualWrites,
WALPollLimit: scf.PayloadStore.WALPollLimit,
WALProcessInterval: scf.PayloadStore.WALProcessInterval,
ExternalCutoverProcessInterval: scf.PayloadStore.ExternalCutoverProcessInterval,
WALEnabled: scf.PayloadStore.WALEnabled,
ExternalCutoverBatchSize: scf.PayloadStore.ExternalCutoverBatchSize,
InlineStoreTTL: &inlineStoreTTL,
EnableImmediateOffloads: scf.PayloadStore.EnableImmediateOffloads,
}
statusUpdateOpts := repov1.StatusUpdateBatchSizeLimits{

View File

@@ -642,10 +642,10 @@ type PayloadStoreConfig struct {
EnableTaskEventPayloadDualWrites bool `mapstructure:"enableTaskEventPayloadDualWrites" json:"enableTaskEventPayloadDualWrites,omitempty" default:"true"`
EnableDagDataPayloadDualWrites bool `mapstructure:"enableDagDataPayloadDualWrites" json:"enableDagDataPayloadDualWrites,omitempty" default:"true"`
EnableOLAPPayloadDualWrites bool `mapstructure:"enableOLAPPayloadDualWrites" json:"enableOLAPPayloadDualWrites,omitempty" default:"true"`
WALPollLimit int `mapstructure:"walPollLimit" json:"walPollLimit,omitempty" default:"100"`
WALProcessInterval time.Duration `mapstructure:"walProcessInterval" json:"walProcessInterval,omitempty" default:"15s"`
ExternalCutoverProcessInterval time.Duration `mapstructure:"externalCutoverProcessInterval" json:"externalCutoverProcessInterval,omitempty" default:"15s"`
WALEnabled bool `mapstructure:"walEnabled" json:"walEnabled,omitempty" default:"true"`
ExternalCutoverBatchSize int32 `mapstructure:"externalCutoverBatchSize" json:"externalCutoverBatchSize,omitempty" default:"1000"`
InlineStoreTTLDays int32 `mapstructure:"inlineStoreTTLDays" json:"inlineStoreTTLDays,omitempty" default:"2"`
EnableImmediateOffloads bool `mapstructure:"enableImmediateOffloads" json:"enableImmediateOffloads,omitempty" default:"false"`
}
func (c *ServerConfig) HasService(name string) bool {
@@ -920,10 +920,10 @@ func BindAllEnv(v *viper.Viper) {
_ = v.BindEnv("payloadStore.enableTaskEventPayloadDualWrites", "SERVER_PAYLOAD_STORE_ENABLE_TASK_EVENT_PAYLOAD_DUAL_WRITES")
_ = v.BindEnv("payloadStore.enableDagDataPayloadDualWrites", "SERVER_PAYLOAD_STORE_ENABLE_DAG_DATA_PAYLOAD_DUAL_WRITES")
_ = v.BindEnv("payloadStore.enableOLAPPayloadDualWrites", "SERVER_PAYLOAD_STORE_ENABLE_OLAP_PAYLOAD_DUAL_WRITES")
_ = v.BindEnv("payloadStore.walPollLimit", "SERVER_PAYLOAD_STORE_WAL_POLL_LIMIT")
_ = v.BindEnv("payloadStore.walProcessInterval", "SERVER_PAYLOAD_STORE_WAL_PROCESS_INTERVAL")
_ = v.BindEnv("payloadStore.externalCutoverProcessInterval", "SERVER_PAYLOAD_STORE_EXTERNAL_CUTOVER_PROCESS_INTERVAL")
_ = v.BindEnv("payloadStore.walEnabled", "SERVER_PAYLOAD_STORE_WAL_ENABLED")
_ = v.BindEnv("payloadStore.externalCutoverBatchSize", "SERVER_PAYLOAD_STORE_EXTERNAL_CUTOVER_BATCH_SIZE")
_ = v.BindEnv("payloadStore.inlineStoreTTLDays", "SERVER_PAYLOAD_STORE_INLINE_STORE_TTL_DAYS")
_ = v.BindEnv("payloadStore.enableImmediateOffloads", "SERVER_PAYLOAD_STORE_ENABLE_IMMEDIATE_OFFLOADS")
// cron operations options
_ = v.BindEnv("cronOperations.taskAnalyzeCronInterval", "SERVER_CRON_OPERATIONS_TASK_ANALYZE_CRON_INTERVAL")

View File

@@ -6,14 +6,12 @@ import (
"sort"
"time"
msgqueue "github.com/hatchet-dev/hatchet/internal/msgqueue/v1"
"github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers"
"github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1"
"github.com/hatchet-dev/hatchet/pkg/telemetry"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel/attribute"
)
type StorePayloadOpts struct {
@@ -57,19 +55,15 @@ type PayloadStoreRepository interface {
Store(ctx context.Context, tx sqlcv1.DBTX, payloads ...StorePayloadOpts) error
Retrieve(ctx context.Context, tx sqlcv1.DBTX, opts ...RetrievePayloadOpts) (map[RetrievePayloadOpts][]byte, error)
RetrieveFromExternal(ctx context.Context, keys ...ExternalPayloadLocationKey) (map[ExternalPayloadLocationKey][]byte, error)
ProcessPayloadWAL(ctx context.Context, partitionNumber int64, pubBuffer *msgqueue.MQPubBuffer) (bool, error)
ProcessPayloadExternalCutovers(ctx context.Context, partitionNumber int64) (bool, error)
OverwriteExternalStore(store ExternalStore, inlineStoreTTL time.Duration)
OverwriteExternalStore(store ExternalStore)
DualWritesEnabled() bool
TaskEventDualWritesEnabled() bool
DagDataDualWritesEnabled() bool
OLAPDualWritesEnabled() bool
WALPollLimit() int
WALProcessInterval() time.Duration
WALEnabled() bool
ExternalCutoverProcessInterval() time.Duration
ExternalStoreEnabled() bool
ExternalStore() ExternalStore
CopyOffloadedPayloadsIntoTempTable(ctx context.Context) error
}
type payloadStoreRepositoryImpl struct {
@@ -83,13 +77,9 @@ type payloadStoreRepositoryImpl struct {
enableTaskEventPayloadDualWrites bool
enableDagDataPayloadDualWrites bool
enableOLAPPayloadDualWrites bool
walPollLimit int
walProcessInterval time.Duration
externalCutoverProcessInterval time.Duration
// walEnabled controls the payload storage strategy:
// - true: Use WAL system (inline + async external offload)
// - false: Direct external storage (skip inline, save DB space)
walEnabled bool
externalCutoverBatchSize int32
enableImmediateOffloads bool
}
type PayloadStoreRepositoryOpts struct {
@@ -97,15 +87,10 @@ type PayloadStoreRepositoryOpts struct {
EnableTaskEventPayloadDualWrites bool
EnableDagDataPayloadDualWrites bool
EnableOLAPPayloadDualWrites bool
WALPollLimit int
WALProcessInterval time.Duration
ExternalCutoverProcessInterval time.Duration
// WALEnabled controls the payload storage strategy:
// - true: Use WAL (Write-Ahead Log) system - store payloads inline in PostgreSQL first,
// then asynchronously offload to external storage via WAL processing
// - false: Skip WAL system - store payloads directly to external storage immediately,
// saving database space by not storing inline content in PostgreSQL
WALEnabled bool
ExternalCutoverBatchSize int32
InlineStoreTTL *time.Duration
EnableImmediateOffloads bool
}
func NewPayloadStoreRepository(
@@ -120,16 +105,15 @@ func NewPayloadStoreRepository(
queries: queries,
externalStoreEnabled: false,
inlineStoreTTL: nil,
inlineStoreTTL: opts.InlineStoreTTL,
externalStore: &NoOpExternalStore{},
enablePayloadDualWrites: opts.EnablePayloadDualWrites,
enableTaskEventPayloadDualWrites: opts.EnableTaskEventPayloadDualWrites,
enableDagDataPayloadDualWrites: opts.EnableDagDataPayloadDualWrites,
enableOLAPPayloadDualWrites: opts.EnableOLAPPayloadDualWrites,
walPollLimit: opts.WALPollLimit,
walProcessInterval: opts.WALProcessInterval,
externalCutoverProcessInterval: opts.ExternalCutoverProcessInterval,
walEnabled: opts.WALEnabled,
externalCutoverBatchSize: opts.ExternalCutoverBatchSize,
enableImmediateOffloads: opts.EnableImmediateOffloads,
}
}
@@ -141,7 +125,6 @@ type PayloadUniqueKey struct {
}
func (p *payloadStoreRepositoryImpl) Store(ctx context.Context, tx sqlcv1.DBTX, payloads ...StorePayloadOpts) error {
// We store payloads as array in order to use PostgreSQL's UNNEST to batch insert them
taskIds := make([]int64, 0, len(payloads))
taskInsertedAts := make([]pgtype.Timestamptz, 0, len(payloads))
payloadTypes := make([]string, 0, len(payloads))
@@ -150,7 +133,6 @@ func (p *payloadStoreRepositoryImpl) Store(ctx context.Context, tx sqlcv1.DBTX,
tenantIds := make([]pgtype.UUID, 0, len(payloads))
locations := make([]string, 0, len(payloads))
externalIds := make([]pgtype.UUID, 0, len(payloads))
// If the WAL is disabled, we'll store the external location key in the payloads table right away
externalLocationKeys := make([]string, 0, len(payloads))
seenPayloadUniqueKeys := make(map[PayloadUniqueKey]struct{})
@@ -160,9 +142,77 @@ func (p *payloadStoreRepositoryImpl) Store(ctx context.Context, tx sqlcv1.DBTX,
return payloads[i].InsertedAt.Time.After(payloads[j].InsertedAt.Time)
})
// Handle WAL_ENABLED logic
if p.walEnabled {
// WAL_ENABLED = true: insert into payloads table and WAL table
if p.enableImmediateOffloads && p.externalStoreEnabled {
externalOpts := make([]OffloadToExternalStoreOpts, 0, len(payloads))
payloadIndexMap := make(map[PayloadUniqueKey]int)
for i, payload := range payloads {
tenantId := sqlchelpers.UUIDFromStr(payload.TenantId)
uniqueKey := PayloadUniqueKey{
ID: payload.Id,
InsertedAt: payload.InsertedAt,
TenantId: tenantId,
Type: payload.Type,
}
if _, exists := seenPayloadUniqueKeys[uniqueKey]; exists {
continue
}
seenPayloadUniqueKeys[uniqueKey] = struct{}{}
payloadIndexMap[uniqueKey] = i
externalOpts = append(externalOpts, OffloadToExternalStoreOpts{
StorePayloadOpts: &payload,
OffloadAt: time.Now(), // Immediate offload
})
}
retrieveOptsToExternalKey, err := p.externalStore.Store(ctx, externalOpts...)
if err != nil {
return fmt.Errorf("failed to store in external store: %w", err)
}
for _, payload := range payloads {
tenantId := sqlchelpers.UUIDFromStr(payload.TenantId)
uniqueKey := PayloadUniqueKey{
ID: payload.Id,
InsertedAt: payload.InsertedAt,
TenantId: tenantId,
Type: payload.Type,
}
if _, exists := seenPayloadUniqueKeys[uniqueKey]; !exists {
continue // Skip if already processed
}
retrieveOpts := RetrievePayloadOpts{
Id: payload.Id,
InsertedAt: payload.InsertedAt,
Type: payload.Type,
TenantId: tenantId,
}
externalKey, exists := retrieveOptsToExternalKey[retrieveOpts]
if !exists {
return fmt.Errorf("external key not found for payload %d", payload.Id)
}
taskIds = append(taskIds, payload.Id)
taskInsertedAts = append(taskInsertedAts, payload.InsertedAt)
payloadTypes = append(payloadTypes, string(payload.Type))
tenantIds = append(tenantIds, tenantId)
locations = append(locations, string(sqlcv1.V1PayloadLocationEXTERNAL))
inlineContents = append(inlineContents, nil)
externalIds = append(externalIds, payload.ExternalId)
externalLocationKeys = append(externalLocationKeys, string(externalKey))
offloadAts = append(offloadAts, pgtype.Timestamptz{Time: time.Now(), Valid: true})
}
} else {
if p.enableImmediateOffloads {
p.l.Warn().Msg("immediate offloads enabled but external store is not enabled, skipping immediate offloads")
}
for _, payload := range payloads {
tenantId := sqlchelpers.UUIDFromStr(payload.TenantId)
uniqueKey := PayloadUniqueKey{
@@ -187,89 +237,12 @@ func (p *payloadStoreRepositoryImpl) Store(ctx context.Context, tx sqlcv1.DBTX,
externalIds = append(externalIds, payload.ExternalId)
externalLocationKeys = append(externalLocationKeys, "")
if p.externalStoreEnabled {
offloadAts = append(offloadAts, pgtype.Timestamptz{Time: payload.InsertedAt.Time.Add(*p.inlineStoreTTL), Valid: true})
} else {
offloadAts = append(offloadAts, pgtype.Timestamptz{Time: time.Now(), Valid: true})
}
}
} else {
// WAL_ENABLED = false: Skip inline, go directly to external store, we will:
// 1. Prepare external store options for immediate offload
// 2. Call external store first to get storage keys (S3/GCS URLs)
// 3. Process results and prepare for PostgreSQL with external keys only
if !p.externalStoreEnabled {
return fmt.Errorf("external store must be enabled when WAL is disabled")
}
// 1. Prepare external store options for immediate offload
externalOpts := make([]OffloadToExternalStoreOpts, 0, len(payloads))
payloadIndexMap := make(map[PayloadUniqueKey]int) // Map to track original payload indices
for i, payload := range payloads {
tenantId := sqlchelpers.UUIDFromStr(payload.TenantId)
uniqueKey := PayloadUniqueKey{
ID: payload.Id,
InsertedAt: payload.InsertedAt,
TenantId: tenantId,
Type: payload.Type,
offloadAt := pgtype.Timestamptz{Time: time.Now(), Valid: true}
if p.inlineStoreTTL != nil {
offloadAt = pgtype.Timestamptz{Time: time.Now().Add(*p.inlineStoreTTL), Valid: true}
}
if _, exists := seenPayloadUniqueKeys[uniqueKey]; exists {
continue
}
seenPayloadUniqueKeys[uniqueKey] = struct{}{}
payloadIndexMap[uniqueKey] = i
externalOpts = append(externalOpts, OffloadToExternalStoreOpts{
StorePayloadOpts: &payload,
OffloadAt: time.Now(), // Immediate offload
})
}
// 2. Call external store first to get storage keys (S3/GCS URLs)
externalKeys, err := p.externalStore.Store(ctx, externalOpts...)
if err != nil {
return fmt.Errorf("failed to store in external store: %w", err)
}
// 3. Process results and prepare for PostgreSQL with external keys only
for _, payload := range payloads {
tenantId := sqlchelpers.UUIDFromStr(payload.TenantId)
uniqueKey := PayloadUniqueKey{
ID: payload.Id,
InsertedAt: payload.InsertedAt,
TenantId: tenantId,
Type: payload.Type,
}
if _, exists := seenPayloadUniqueKeys[uniqueKey]; !exists {
continue // Skip if already processed
}
// Find the external key for this payload
retrieveOpts := RetrievePayloadOpts{
Id: payload.Id,
InsertedAt: payload.InsertedAt,
Type: payload.Type,
TenantId: tenantId,
}
externalKey, exists := externalKeys[retrieveOpts]
if !exists {
return fmt.Errorf("external key not found for payload %d", payload.Id)
}
taskIds = append(taskIds, payload.Id)
taskInsertedAts = append(taskInsertedAts, payload.InsertedAt)
payloadTypes = append(payloadTypes, string(payload.Type))
tenantIds = append(tenantIds, tenantId)
locations = append(locations, string(sqlcv1.V1PayloadLocationEXTERNAL))
inlineContents = append(inlineContents, nil) // No inline content
externalIds = append(externalIds, payload.ExternalId)
externalLocationKeys = append(externalLocationKeys, string(externalKey))
offloadAts = append(offloadAts, pgtype.Timestamptz{Time: time.Now(), Valid: true})
offloadAts = append(offloadAts, offloadAt)
}
}
@@ -288,22 +261,6 @@ func (p *payloadStoreRepositoryImpl) Store(ctx context.Context, tx sqlcv1.DBTX,
return fmt.Errorf("failed to write payloads: %w", err)
}
// Only write to WAL if external store is configured AND WAL is enabled
// When WAL is disabled, payloads are already in external storage, so no WAL needed
if p.externalStoreEnabled && p.walEnabled {
err = p.queries.WritePayloadWAL(ctx, tx, sqlcv1.WritePayloadWALParams{
Tenantids: tenantIds,
Payloadids: taskIds,
Payloadinsertedats: taskInsertedAts,
Payloadtypes: payloadTypes,
Offloadats: offloadAts,
})
if err != nil {
return fmt.Errorf("failed to write payload WAL: %w", err)
}
}
return err
}
@@ -393,274 +350,8 @@ func (p *payloadStoreRepositoryImpl) retrieve(ctx context.Context, tx sqlcv1.DBT
return optsToPayload, nil
}
func (p *payloadStoreRepositoryImpl) offloadToExternal(ctx context.Context, payloads ...OffloadToExternalStoreOpts) (map[RetrievePayloadOpts]ExternalPayloadLocationKey, error) {
ctx, span := telemetry.NewSpan(ctx, "payloadstore.offload_to_external_store")
defer span.End()
span.SetAttributes(attribute.Int("payloadstore.offload_to_external_store.num_payloads_to_offload", len(payloads)))
// this is only intended to be called from ProcessPayloadWAL, which short-circuits if external store is not enabled
if !p.externalStoreEnabled {
return nil, fmt.Errorf("external store not enabled")
}
return p.externalStore.Store(ctx, payloads...)
}
func (p *payloadStoreRepositoryImpl) ProcessPayloadWAL(ctx context.Context, partitionNumber int64, pubBuffer *msgqueue.MQPubBuffer) (bool, error) {
// no need to process the WAL if external store is not enabled
if !p.externalStoreEnabled {
return false, nil
}
ctx, span := telemetry.NewSpan(ctx, "payloadstore.process_payload_wal")
defer span.End()
tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l, 30000)
if err != nil {
return false, fmt.Errorf("failed to prepare transaction: %w", err)
}
defer rollback()
advisoryLockAcquired, err := p.queries.TryAdvisoryLock(ctx, tx, hash(fmt.Sprintf("process-payload-wal-lease-%d", partitionNumber)))
if err != nil {
return false, fmt.Errorf("failed to acquire advisory lock: %w", err)
}
if !advisoryLockAcquired {
return false, nil
}
walRecords, err := p.queries.PollPayloadWALForRecordsToReplicate(ctx, tx, sqlcv1.PollPayloadWALForRecordsToReplicateParams{
Polllimit: int32(p.walPollLimit),
Partitionnumber: int32(partitionNumber),
})
if err != nil {
return false, err
}
hasMoreWALRecords := len(walRecords) == p.walPollLimit
if len(walRecords) == 0 {
return false, nil
}
retrieveOpts := make([]RetrievePayloadOpts, len(walRecords))
retrieveOptsToOffloadAt := make(map[RetrievePayloadOpts]pgtype.Timestamptz)
retrieveOptsToPayload := make(map[RetrievePayloadOpts][]byte)
for i, record := range walRecords {
opts := RetrievePayloadOpts{
Id: record.PayloadID,
InsertedAt: record.PayloadInsertedAt,
Type: record.PayloadType,
TenantId: record.TenantID,
}
retrieveOpts[i] = opts
retrieveOptsToOffloadAt[opts] = record.OffloadAt
if record.Location == sqlcv1.V1PayloadLocationINLINE {
retrieveOptsToPayload[opts] = record.InlineContent
}
}
externalStoreOpts := make([]OffloadToExternalStoreOpts, 0)
minOffloadAt := time.Now().Add(100 * time.Hour)
offloadLag := time.Since(minOffloadAt).Seconds()
attrs := []attribute.KeyValue{
{
Key: "payloadstore.process_payload_wal.payload_wal_offload_partition_number",
Value: attribute.Int64Value(partitionNumber),
},
{
Key: "payloadstore.process_payload_wal.payload_wal_offload_count",
Value: attribute.IntValue(len(retrieveOpts)),
},
{
Key: "payloadstore.process_payload_wal.payload_wal_offload_lag_seconds",
Value: attribute.Float64Value(offloadLag),
},
}
span.SetAttributes(attrs...)
for _, opts := range retrieveOpts {
offloadAt, ok := retrieveOptsToOffloadAt[opts]
if !ok {
return false, fmt.Errorf("offload at not found for opts: %+v", opts)
}
externalStoreOpts = append(externalStoreOpts, OffloadToExternalStoreOpts{
StorePayloadOpts: &StorePayloadOpts{
Id: opts.Id,
InsertedAt: opts.InsertedAt,
Type: opts.Type,
Payload: retrieveOptsToPayload[opts],
TenantId: opts.TenantId.String(),
},
OffloadAt: offloadAt.Time,
})
if offloadAt.Time.Before(minOffloadAt) {
minOffloadAt = offloadAt.Time
}
}
if err := commit(ctx); err != nil {
return false, err
}
retrieveOptsToStoredKey, err := p.offloadToExternal(ctx, externalStoreOpts...)
if err != nil {
return false, err
}
offloadAts := make([]pgtype.Timestamptz, 0, len(retrieveOptsToStoredKey))
ids := make([]int64, 0, len(retrieveOptsToStoredKey))
insertedAts := make([]pgtype.Timestamptz, 0, len(retrieveOptsToStoredKey))
types := make([]string, 0, len(retrieveOptsToStoredKey))
tenantIds := make([]pgtype.UUID, 0, len(retrieveOptsToStoredKey))
externalLocationKeys := make([]string, 0, len(retrieveOptsToStoredKey))
for _, opt := range retrieveOpts {
offloadAt, exists := retrieveOptsToOffloadAt[opt]
if !exists {
return false, fmt.Errorf("offload at not found for opts: %+v", opt)
}
offloadAts = append(offloadAts, offloadAt)
ids = append(ids, opt.Id)
insertedAts = append(insertedAts, opt.InsertedAt)
types = append(types, string(opt.Type))
key, ok := retrieveOptsToStoredKey[opt]
if !ok {
// important: if there's no key here, it's likely because the payloads table did not contain the payload
// this is okay - it can happen if e.g. a payload partition is dropped before the WAL is processed (not a great situation, but not catastrophic)
// if this happens, we log an error and set the key to `""` which will allow it to be evicted from the WAL. it'll never cause
// an update in the payloads table because there won't be a matching row
p.l.Error().Int64("id", opt.Id).Time("insertedAt", opt.InsertedAt.Time).Msg("external location key not found for opts")
key = ""
}
externalLocationKeys = append(externalLocationKeys, string(key))
tenantIds = append(tenantIds, opt.TenantId)
}
// Second transaction, persist the offload to the db once we've successfully offloaded to the external store
tx, commit, rollback, err = sqlchelpers.PrepareTx(ctx, p.pool, p.l, 5000)
defer rollback()
if err != nil {
return false, fmt.Errorf("failed to prepare transaction for offloading: %w", err)
}
updatedPayloads, err := p.queries.SetPayloadExternalKeys(ctx, tx, sqlcv1.SetPayloadExternalKeysParams{
Ids: ids,
Insertedats: insertedAts,
Payloadtypes: types,
Offloadats: offloadAts,
Tenantids: tenantIds,
Externallocationkeys: externalLocationKeys,
})
if err != nil {
return false, err
}
tenantIdToPayloads := make(map[string][]OLAPPayloadToOffload)
for _, updatedPayload := range updatedPayloads {
if updatedPayload == nil || updatedPayload.Type == sqlcv1.V1PayloadTypeTASKEVENTDATA {
continue
}
tenantIdToPayloads[updatedPayload.TenantID.String()] = append(tenantIdToPayloads[updatedPayload.TenantID.String()], OLAPPayloadToOffload{
ExternalId: updatedPayload.ExternalID,
ExternalLocationKey: updatedPayload.ExternalLocationKey.String,
})
}
if err := commit(ctx); err != nil {
return false, err
}
// todo: make this transactionally safe
// there's no application-level risk here because the worst case if
// we miss an event is we don't mark the payload as external and there's a bit
// of disk bloat, but it'd be good to not need to worry about that
for tenantId, payloads := range tenantIdToPayloads {
msg, err := OLAPPayloadOffloadMessage(tenantId, payloads)
if err != nil {
return false, fmt.Errorf("failed to create OLAP payload offload message: %w", err)
}
pubBuffer.Pub(ctx, msgqueue.OLAP_QUEUE, msg, false)
}
return hasMoreWALRecords, nil
}
func (p *payloadStoreRepositoryImpl) ProcessPayloadExternalCutovers(ctx context.Context, partitionNumber int64) (bool, error) {
// no need to cut over if external store is not enabled
if !p.externalStoreEnabled {
return false, nil
}
ctx, span := telemetry.NewSpan(ctx, "payloadstore.process_payload_external_cutovers")
defer span.End()
tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l, 30000)
if err != nil {
return false, fmt.Errorf("failed to prepare transaction: %w", err)
}
defer rollback()
advisoryLockAcquired, err := p.queries.TryAdvisoryLock(ctx, tx, hash(fmt.Sprintf("process-payload-cut-overs-lease-%d", partitionNumber)))
if err != nil {
return false, fmt.Errorf("failed to acquire advisory lock: %w", err)
}
if !advisoryLockAcquired {
return false, nil
}
queueItemsCutOver, err := p.queries.CutOverPayloadsToExternal(ctx, tx, sqlcv1.CutOverPayloadsToExternalParams{
Polllimit: int32(p.walPollLimit), // nolint: gosec
Partitionnumber: int32(partitionNumber), // nolint: gosec
})
if err != nil {
return false, err
}
if queueItemsCutOver == 0 {
return false, nil
}
hasMoreQueueItems := int(queueItemsCutOver) == p.walPollLimit
if err := commit(ctx); err != nil {
return false, err
}
return hasMoreQueueItems, nil
}
func (p *payloadStoreRepositoryImpl) OverwriteExternalStore(store ExternalStore, inlineStoreTTL time.Duration) {
func (p *payloadStoreRepositoryImpl) OverwriteExternalStore(store ExternalStore) {
p.externalStoreEnabled = true
p.inlineStoreTTL = &inlineStoreTTL
p.externalStore = store
}
@@ -680,14 +371,6 @@ func (p *payloadStoreRepositoryImpl) OLAPDualWritesEnabled() bool {
return p.enableOLAPPayloadDualWrites
}
func (p *payloadStoreRepositoryImpl) WALPollLimit() int {
return p.walPollLimit
}
func (p *payloadStoreRepositoryImpl) WALProcessInterval() time.Duration {
return p.walProcessInterval
}
func (p *payloadStoreRepositoryImpl) ExternalCutoverProcessInterval() time.Duration {
return p.externalCutoverProcessInterval
}
@@ -700,8 +383,295 @@ func (p *payloadStoreRepositoryImpl) ExternalStore() ExternalStore {
return p.externalStore
}
func (p *payloadStoreRepositoryImpl) WALEnabled() bool {
return p.walEnabled
type BulkCutOverPayload struct {
TenantID pgtype.UUID
Id int64
InsertedAt pgtype.Timestamptz
ExternalId pgtype.UUID
Type sqlcv1.V1PayloadType
ExternalLocationKey ExternalPayloadLocationKey
}
type CutoverBatchOutcome struct {
ShouldContinue bool
NextOffset int64
}
func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Context, partitionDate pgtype.Date, partitionDateStr string, offset int64) (*CutoverBatchOutcome, error) {
tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l, 10000)
if err != nil {
return nil, fmt.Errorf("failed to prepare transaction for copying offloaded payloads: %w", err)
}
defer rollback()
tableName := fmt.Sprintf("v1_payload_offload_tmp_%s", partitionDateStr)
payloads, err := p.queries.ListPaginatedPayloadsForOffload(ctx, tx, sqlcv1.ListPaginatedPayloadsForOffloadParams{
Partitiondate: partitionDate,
Offsetparam: offset,
Limitparam: p.externalCutoverBatchSize,
})
if err != nil {
return nil, fmt.Errorf("failed to list payloads for offload: %w", err)
}
alreadyExternalPayloads := make(map[RetrievePayloadOpts]ExternalPayloadLocationKey)
offloadOpts := make([]OffloadToExternalStoreOpts, 0, len(payloads))
retrieveOptsToExternalId := make(map[RetrievePayloadOpts]string)
for _, payload := range payloads {
if payload.Location != sqlcv1.V1PayloadLocationINLINE {
retrieveOpt := RetrievePayloadOpts{
Id: payload.ID,
InsertedAt: payload.InsertedAt,
Type: payload.Type,
TenantId: payload.TenantID,
}
alreadyExternalPayloads[retrieveOpt] = ExternalPayloadLocationKey(payload.ExternalLocationKey)
retrieveOptsToExternalId[retrieveOpt] = payload.ExternalID.String()
} else {
offloadOpts = append(offloadOpts, OffloadToExternalStoreOpts{
StorePayloadOpts: &StorePayloadOpts{
Id: payload.ID,
InsertedAt: payload.InsertedAt,
Type: payload.Type,
Payload: payload.InlineContent,
TenantId: payload.TenantID.String(),
ExternalId: payload.ExternalID,
},
OffloadAt: time.Now(),
})
retrieveOptsToExternalId[RetrievePayloadOpts{
Id: payload.ID,
InsertedAt: payload.InsertedAt,
Type: payload.Type,
TenantId: payload.TenantID,
}] = payload.ExternalID.String()
}
}
retrieveOptsToKey, err := p.ExternalStore().Store(ctx, offloadOpts...)
if err != nil {
return nil, fmt.Errorf("failed to offload payloads to external store: %w", err)
}
for r, k := range alreadyExternalPayloads {
retrieveOptsToKey[r] = k
}
payloadsToInsert := make([]sqlcv1.CutoverPayloadToInsert, 0, len(payloads))
for r, k := range retrieveOptsToKey {
externalId := retrieveOptsToExternalId[r]
payloadsToInsert = append(payloadsToInsert, sqlcv1.CutoverPayloadToInsert{
TenantID: r.TenantId,
ID: r.Id,
InsertedAt: r.InsertedAt,
ExternalID: sqlchelpers.UUIDFromStr(externalId),
Type: r.Type,
ExternalLocationKey: string(k),
})
}
_, err = sqlcv1.InsertCutOverPayloadsIntoTempTable(ctx, tx, tableName, payloadsToInsert)
if err != nil {
return nil, fmt.Errorf("failed to copy offloaded payloads into temp table: %w", err)
}
offset += int64(len(payloads))
err = p.queries.UpsertLastOffsetForCutoverJob(ctx, tx, sqlcv1.UpsertLastOffsetForCutoverJobParams{
Key: partitionDate,
Lastoffset: offset,
})
if err != nil {
return nil, fmt.Errorf("failed to upsert last offset for cutover job: %w", err)
}
if err := commit(ctx); err != nil {
return nil, fmt.Errorf("failed to commit copy offloaded payloads transaction: %w", err)
}
if len(payloads) < int(p.externalCutoverBatchSize) {
return &CutoverBatchOutcome{
ShouldContinue: false,
NextOffset: offset,
}, nil
}
return &CutoverBatchOutcome{
ShouldContinue: true,
NextOffset: offset,
}, nil
}
type CutoverJobRunMetadata struct {
ShouldRun bool
LastOffset int64
PartitionDate pgtype.Date
PartitionDateStr string
}
func (p *payloadStoreRepositoryImpl) prepareCutoverTableJob(ctx context.Context) (*CutoverJobRunMetadata, error) {
if p.inlineStoreTTL == nil {
return nil, fmt.Errorf("inline store TTL is not set")
}
partitionTime := time.Now().Add(-1 * *p.inlineStoreTTL)
partitionDate := pgtype.Date{
Time: partitionTime,
Valid: true,
}
partitionDateStr := partitionTime.Format("20060102")
tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l, 10000)
if err != nil {
return nil, err
}
defer rollback()
hashKey := fmt.Sprintf("payload-cutover-temp-table-lease-%s", partitionDateStr)
lockAcquired, err := p.queries.TryAdvisoryLock(ctx, tx, hash(hashKey))
if err != nil {
return nil, fmt.Errorf("failed to acquire advisory lock for payload cutover temp table: %w", err)
}
if !lockAcquired {
return &CutoverJobRunMetadata{
ShouldRun: false,
LastOffset: 0,
PartitionDate: partitionDate,
PartitionDateStr: partitionDateStr,
}, nil
}
jobStatus, err := p.queries.FindLastOffsetForCutoverJob(ctx, p.pool, partitionDate)
var offset int64
var isCompleted bool
if err != nil {
if err == pgx.ErrNoRows {
offset = 0
isCompleted = false
} else {
return nil, fmt.Errorf("failed to find last offset for cutover job: %w", err)
}
} else {
offset = jobStatus.LastOffset
isCompleted = jobStatus.IsCompleted
}
if isCompleted {
return &CutoverJobRunMetadata{
ShouldRun: false,
LastOffset: offset,
PartitionDate: partitionDate,
PartitionDateStr: partitionDateStr,
}, nil
}
err = p.queries.CreateV1PayloadCutoverTemporaryTable(ctx, tx, partitionDate)
if err != nil {
return nil, fmt.Errorf("failed to create payload cutover temporary table: %w", err)
}
if err := commit(ctx); err != nil {
return nil, fmt.Errorf("failed to commit copy offloaded payloads transaction: %w", err)
}
return &CutoverJobRunMetadata{
ShouldRun: true,
LastOffset: offset,
PartitionDate: partitionDate,
PartitionDateStr: partitionDateStr,
}, nil
}
func (p *payloadStoreRepositoryImpl) CopyOffloadedPayloadsIntoTempTable(ctx context.Context) error {
if !p.externalStoreEnabled {
return nil
}
jobMeta, err := p.prepareCutoverTableJob(ctx)
if err != nil {
return fmt.Errorf("failed to prepare cutover table job: %w", err)
}
if !jobMeta.ShouldRun {
return nil
}
partitionDate := jobMeta.PartitionDate
partitionDateStr := jobMeta.PartitionDateStr
offset := jobMeta.LastOffset
for {
outcome, err := p.ProcessPayloadCutoverBatch(ctx, partitionDate, partitionDateStr, offset)
if err != nil {
return fmt.Errorf("failed to process payload cutover batch: %w", err)
}
if !outcome.ShouldContinue {
break
}
offset = outcome.NextOffset
}
tempPartitionName := fmt.Sprintf("v1_payload_offload_tmp_%s", partitionDateStr)
sourcePartitionName := fmt.Sprintf("v1_payload_%s", partitionDateStr)
countsEqual, err := sqlcv1.ComparePartitionRowCounts(ctx, p.pool, tempPartitionName, sourcePartitionName)
if err != nil {
return fmt.Errorf("failed to compare partition row counts: %w", err)
}
if !countsEqual {
return fmt.Errorf("row counts do not match between temp and source partitions for date %s", partitionDateStr)
}
tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l, 10000)
if err != nil {
return fmt.Errorf("failed to prepare transaction for swapping payload cutover temp table: %w", err)
}
defer rollback()
err = p.queries.SwapV1PayloadPartitionWithTemp(ctx, tx, partitionDate)
if err != nil {
return fmt.Errorf("failed to swap payload cutover temp table: %w", err)
}
err = p.queries.MarkCutoverJobAsCompleted(ctx, tx, partitionDate)
if err != nil {
return fmt.Errorf("failed to mark cutover job as completed: %w", err)
}
if err := commit(ctx); err != nil {
return fmt.Errorf("failed to commit swap payload cutover temp table transaction: %w", err)
}
return nil
}
type NoOpExternalStore struct{}

View File

@@ -27,7 +27,7 @@ type Repository interface {
Logs() LogLineRepository
OverwriteLogsRepository(l LogLineRepository)
Payloads() PayloadStoreRepository
OverwriteExternalPayloadStore(o ExternalStore, nativeStoreTTL time.Duration)
OverwriteExternalPayloadStore(o ExternalStore)
Workers() WorkerRepository
Workflows() WorkflowRepository
Ticker() TickerRepository
@@ -126,8 +126,8 @@ func (r *repositoryImpl) Payloads() PayloadStoreRepository {
return r.payloadStore
}
func (r *repositoryImpl) OverwriteExternalPayloadStore(o ExternalStore, nativeStoreTTL time.Duration) {
r.payloadStore.OverwriteExternalStore(o, nativeStoreTTL)
func (r *repositoryImpl) OverwriteExternalPayloadStore(o ExternalStore) {
r.payloadStore.OverwriteExternalStore(o)
}
func (r *repositoryImpl) Workers() WorkerRepository {

View File

@@ -3129,6 +3129,12 @@ type V1Payload struct {
UpdatedAt pgtype.Timestamptz `json:"updated_at"`
}
type V1PayloadCutoverJobOffset struct {
Key pgtype.Date `json:"key"`
LastOffset int64 `json:"last_offset"`
IsCompleted bool `json:"is_completed"`
}
type V1PayloadCutoverQueueItem struct {
TenantID pgtype.UUID `json:"tenant_id"`
CutOverAt pgtype.Timestamptz `json:"cut_over_at"`

View File

@@ -0,0 +1,115 @@
package sqlcv1
import (
"context"
"fmt"
"github.com/jackc/pgx/v5/pgtype"
)
type CutoverPayloadToInsert struct {
TenantID pgtype.UUID
ID int64
InsertedAt pgtype.Timestamptz
ExternalID pgtype.UUID
Type V1PayloadType
ExternalLocationKey string
}
func InsertCutOverPayloadsIntoTempTable(ctx context.Context, tx DBTX, tableName string, payloads []CutoverPayloadToInsert) (int64, error) {
tenantIds := make([]pgtype.UUID, 0, len(payloads))
ids := make([]int64, 0, len(payloads))
insertedAts := make([]pgtype.Timestamptz, 0, len(payloads))
externalIds := make([]pgtype.UUID, 0, len(payloads))
types := make([]string, 0, len(payloads))
locations := make([]string, 0, len(payloads))
externalLocationKeys := make([]string, 0, len(payloads))
for _, payload := range payloads {
externalIds = append(externalIds, payload.ExternalID)
tenantIds = append(tenantIds, payload.TenantID)
ids = append(ids, payload.ID)
insertedAts = append(insertedAts, payload.InsertedAt)
types = append(types, string(payload.Type))
locations = append(locations, string(V1PayloadLocationEXTERNAL))
externalLocationKeys = append(externalLocationKeys, string(payload.ExternalLocationKey))
}
row := tx.QueryRow(
ctx,
fmt.Sprintf(
// we unfortunately need to use `INSERT INTO` instead of `COPY` here
// because we can't have conflict resolution with `COPY`.
`
WITH inputs AS (
SELECT
UNNEST($1::UUID[]) AS tenant_id,
UNNEST($2::BIGINT[]) AS id,
UNNEST($3::TIMESTAMPTZ[]) AS inserted_at,
UNNEST($4::UUID[]) AS external_id,
UNNEST($5::TEXT[]) AS type,
UNNEST($6::TEXT[]) AS location,
UNNEST($7::TEXT[]) AS external_location_key
), inserts AS (
INSERT INTO %s (tenant_id, id, inserted_at, external_id, type, location, external_location_key, inline_content, updated_at)
SELECT
tenant_id,
id,
inserted_at,
external_id,
type::v1_payload_type,
location::v1_payload_location,
external_location_key,
NULL,
NOW()
FROM inputs
ORDER BY tenant_id, inserted_at, id, type
ON CONFLICT(tenant_id, id, inserted_at, type) DO NOTHING
RETURNING *
)
SELECT COUNT(*)
FROM inserts
`,
tableName,
),
tenantIds,
ids,
insertedAts,
externalIds,
types,
locations,
externalLocationKeys,
)
var copyCount int64
err := row.Scan(&copyCount)
return copyCount, err
}
func ComparePartitionRowCounts(ctx context.Context, tx DBTX, tempPartitionName, sourcePartitionName string) (bool, error) {
row := tx.QueryRow(
ctx,
fmt.Sprintf(
`
SELECT
(SELECT COUNT(*) FROM %s) AS temp_partition_count,
(SELECT COUNT(*) FROM %s) AS source_partition_count
`,
tempPartitionName,
sourcePartitionName,
),
)
var tempPartitionCount int64
var sourcePartitionCount int64
err := row.Scan(&tempPartitionCount, &sourcePartitionCount)
if err != nil {
return false, err
}
return tempPartitionCount == sourcePartitionCount, nil
}

View File

@@ -49,6 +49,7 @@ SELECT
i.inline_content
FROM
inputs i
ORDER BY i.tenant_id, i.inserted_at, i.id, i.type
ON CONFLICT (tenant_id, id, inserted_at, type)
DO UPDATE SET
location = EXCLUDED.location,
@@ -214,3 +215,49 @@ FROM queue_items
-- name: AnalyzeV1Payload :exec
ANALYZE v1_payload;
-- name: ListPaginatedPayloadsForOffload :many
WITH payloads AS (
SELECT
(p).*
FROM list_paginated_payloads_for_offload(
@partitionDate::DATE,
@limitParam::INT,
@offsetParam::BIGINT
) p
)
SELECT
tenant_id::UUID,
id::BIGINT,
inserted_at::TIMESTAMPTZ,
external_id::UUID,
type::v1_payload_type,
location::v1_payload_location,
COALESCE(external_location_key, '')::TEXT AS external_location_key,
inline_content::JSONB AS inline_content,
updated_at::TIMESTAMPTZ
FROM payloads;
-- name: CreateV1PayloadCutoverTemporaryTable :exec
SELECT copy_v1_payload_partition_structure(@date::DATE);
-- name: SwapV1PayloadPartitionWithTemp :exec
SELECT swap_v1_payload_partition_with_temp(@date::DATE);
-- name: FindLastOffsetForCutoverJob :one
SELECT *
FROM v1_payload_cutover_job_offset
WHERE key = @key::DATE;
-- name: UpsertLastOffsetForCutoverJob :exec
INSERT INTO v1_payload_cutover_job_offset (key, last_offset)
VALUES (@key::DATE, @lastOffset::BIGINT)
ON CONFLICT (key)
DO UPDATE SET last_offset = EXCLUDED.last_offset
;
-- name: MarkCutoverJobAsCompleted :exec
UPDATE v1_payload_cutover_job_offset
SET is_completed = TRUE
WHERE key = @key::DATE
;

View File

@@ -20,6 +20,15 @@ func (q *Queries) AnalyzeV1Payload(ctx context.Context, db DBTX) error {
return err
}
const createV1PayloadCutoverTemporaryTable = `-- name: CreateV1PayloadCutoverTemporaryTable :exec
SELECT copy_v1_payload_partition_structure($1::DATE)
`
func (q *Queries) CreateV1PayloadCutoverTemporaryTable(ctx context.Context, db DBTX, date pgtype.Date) error {
_, err := db.Exec(ctx, createV1PayloadCutoverTemporaryTable, date)
return err
}
const cutOverPayloadsToExternal = `-- name: CutOverPayloadsToExternal :one
WITH tenants AS (
SELECT UNNEST(
@@ -74,6 +83,101 @@ func (q *Queries) CutOverPayloadsToExternal(ctx context.Context, db DBTX, arg Cu
return count, err
}
const findLastOffsetForCutoverJob = `-- name: FindLastOffsetForCutoverJob :one
SELECT key, last_offset, is_completed
FROM v1_payload_cutover_job_offset
WHERE key = $1::DATE
`
func (q *Queries) FindLastOffsetForCutoverJob(ctx context.Context, db DBTX, key pgtype.Date) (*V1PayloadCutoverJobOffset, error) {
row := db.QueryRow(ctx, findLastOffsetForCutoverJob, key)
var i V1PayloadCutoverJobOffset
err := row.Scan(&i.Key, &i.LastOffset, &i.IsCompleted)
return &i, err
}
const listPaginatedPayloadsForOffload = `-- name: ListPaginatedPayloadsForOffload :many
WITH payloads AS (
SELECT
(p).*
FROM list_paginated_payloads_for_offload(
$1::DATE,
$2::INT,
$3::BIGINT
) p
)
SELECT
tenant_id::UUID,
id::BIGINT,
inserted_at::TIMESTAMPTZ,
external_id::UUID,
type::v1_payload_type,
location::v1_payload_location,
COALESCE(external_location_key, '')::TEXT AS external_location_key,
inline_content::JSONB AS inline_content,
updated_at::TIMESTAMPTZ
FROM payloads
`
type ListPaginatedPayloadsForOffloadParams struct {
Partitiondate pgtype.Date `json:"partitiondate"`
Limitparam int32 `json:"limitparam"`
Offsetparam int64 `json:"offsetparam"`
}
type ListPaginatedPayloadsForOffloadRow struct {
TenantID pgtype.UUID `json:"tenant_id"`
ID int64 `json:"id"`
InsertedAt pgtype.Timestamptz `json:"inserted_at"`
ExternalID pgtype.UUID `json:"external_id"`
Type V1PayloadType `json:"type"`
Location V1PayloadLocation `json:"location"`
ExternalLocationKey string `json:"external_location_key"`
InlineContent []byte `json:"inline_content"`
UpdatedAt pgtype.Timestamptz `json:"updated_at"`
}
func (q *Queries) ListPaginatedPayloadsForOffload(ctx context.Context, db DBTX, arg ListPaginatedPayloadsForOffloadParams) ([]*ListPaginatedPayloadsForOffloadRow, error) {
rows, err := db.Query(ctx, listPaginatedPayloadsForOffload, arg.Partitiondate, arg.Limitparam, arg.Offsetparam)
if err != nil {
return nil, err
}
defer rows.Close()
var items []*ListPaginatedPayloadsForOffloadRow
for rows.Next() {
var i ListPaginatedPayloadsForOffloadRow
if err := rows.Scan(
&i.TenantID,
&i.ID,
&i.InsertedAt,
&i.ExternalID,
&i.Type,
&i.Location,
&i.ExternalLocationKey,
&i.InlineContent,
&i.UpdatedAt,
); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const markCutoverJobAsCompleted = `-- name: MarkCutoverJobAsCompleted :exec
UPDATE v1_payload_cutover_job_offset
SET is_completed = TRUE
WHERE key = $1::DATE
`
func (q *Queries) MarkCutoverJobAsCompleted(ctx context.Context, db DBTX, key pgtype.Date) error {
_, err := db.Exec(ctx, markCutoverJobAsCompleted, key)
return err
}
const pollPayloadWALForRecordsToReplicate = `-- name: PollPayloadWALForRecordsToReplicate :many
WITH tenants AS (
SELECT UNNEST(
@@ -320,6 +424,32 @@ func (q *Queries) SetPayloadExternalKeys(ctx context.Context, db DBTX, arg SetPa
return items, nil
}
const swapV1PayloadPartitionWithTemp = `-- name: SwapV1PayloadPartitionWithTemp :exec
SELECT swap_v1_payload_partition_with_temp($1::DATE)
`
func (q *Queries) SwapV1PayloadPartitionWithTemp(ctx context.Context, db DBTX, date pgtype.Date) error {
_, err := db.Exec(ctx, swapV1PayloadPartitionWithTemp, date)
return err
}
const upsertLastOffsetForCutoverJob = `-- name: UpsertLastOffsetForCutoverJob :exec
INSERT INTO v1_payload_cutover_job_offset (key, last_offset)
VALUES ($1::DATE, $2::BIGINT)
ON CONFLICT (key)
DO UPDATE SET last_offset = EXCLUDED.last_offset
`
type UpsertLastOffsetForCutoverJobParams struct {
Key pgtype.Date `json:"key"`
Lastoffset int64 `json:"lastoffset"`
}
func (q *Queries) UpsertLastOffsetForCutoverJob(ctx context.Context, db DBTX, arg UpsertLastOffsetForCutoverJobParams) error {
_, err := db.Exec(ctx, upsertLastOffsetForCutoverJob, arg.Key, arg.Lastoffset)
return err
}
const writePayloadWAL = `-- name: WritePayloadWAL :exec
WITH inputs AS (
SELECT
@@ -401,6 +531,7 @@ SELECT
i.inline_content
FROM
inputs i
ORDER BY i.tenant_id, i.inserted_at, i.id, i.type
ON CONFLICT (tenant_id, id, inserted_at, type)
DO UPDATE SET
location = EXCLUDED.location,

View File

@@ -1756,6 +1756,254 @@ BEGIN
END;
$$;
CREATE TABLE v1_payload_cutover_job_offset (
key DATE PRIMARY KEY,
last_offset BIGINT NOT NULL,
is_completed BOOLEAN NOT NULL DEFAULT FALSE
);
CREATE OR REPLACE FUNCTION copy_v1_payload_partition_structure(
partition_date date
) RETURNS text
LANGUAGE plpgsql AS
$$
DECLARE
partition_date_str varchar;
source_partition_name varchar;
target_table_name varchar;
trigger_function_name varchar;
trigger_name varchar;
partition_start date;
partition_end date;
BEGIN
SELECT to_char(partition_date, 'YYYYMMDD') INTO partition_date_str;
SELECT format('v1_payload_%s', partition_date_str) INTO source_partition_name;
SELECT format('v1_payload_offload_tmp_%s', partition_date_str) INTO target_table_name;
SELECT format('sync_to_%s', target_table_name) INTO trigger_function_name;
SELECT format('trigger_sync_to_%s', target_table_name) INTO trigger_name;
partition_start := partition_date;
partition_end := partition_date + INTERVAL '1 day';
IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = source_partition_name) THEN
RAISE EXCEPTION 'Source partition % does not exist', source_partition_name;
END IF;
IF EXISTS (SELECT 1 FROM pg_tables WHERE tablename = target_table_name) THEN
RAISE NOTICE 'Target table % already exists, skipping creation', target_table_name;
RETURN target_table_name;
END IF;
EXECUTE format(
'CREATE TABLE %I (LIKE %I INCLUDING DEFAULTS INCLUDING CONSTRAINTS INCLUDING INDEXES)',
target_table_name,
source_partition_name
);
EXECUTE format('
ALTER TABLE %I
ADD CONSTRAINT %I
CHECK (
inserted_at IS NOT NULL
AND inserted_at >= %L::TIMESTAMPTZ
AND inserted_at < %L::TIMESTAMPTZ
)
',
target_table_name,
target_table_name || '_iat_chk_bounds',
partition_start,
partition_end
);
EXECUTE format('
CREATE OR REPLACE FUNCTION %I() RETURNS trigger
LANGUAGE plpgsql AS $func$
BEGIN
IF TG_OP = ''INSERT'' THEN
INSERT INTO %I (tenant_id, id, inserted_at, external_id, type, location, external_location_key, inline_content, updated_at)
VALUES (NEW.tenant_id, NEW.id, NEW.inserted_at, NEW.external_id, NEW.type, NEW.location, NEW.external_location_key, NEW.inline_content, NEW.updated_at)
ON CONFLICT (tenant_id, id, inserted_at, type) DO UPDATE
SET
location = EXCLUDED.location,
external_location_key = EXCLUDED.external_location_key,
inline_content = EXCLUDED.inline_content,
updated_at = EXCLUDED.updated_at;
RETURN NEW;
ELSIF TG_OP = ''UPDATE'' THEN
UPDATE %I
SET
location = NEW.location,
external_location_key = NEW.external_location_key,
inline_content = NEW.inline_content,
updated_at = NEW.updated_at
WHERE
tenant_id = NEW.tenant_id
AND id = NEW.id
AND inserted_at = NEW.inserted_at
AND type = NEW.type;
RETURN NEW;
ELSIF TG_OP = ''DELETE'' THEN
DELETE FROM %I
WHERE
tenant_id = OLD.tenant_id
AND id = OLD.id
AND inserted_at = OLD.inserted_at
AND type = OLD.type;
RETURN OLD;
END IF;
RETURN NULL;
END;
$func$;
', trigger_function_name, target_table_name, target_table_name, target_table_name);
EXECUTE format('DROP TRIGGER IF EXISTS %I ON %I', trigger_name, source_partition_name);
EXECUTE format('
CREATE TRIGGER %I
AFTER INSERT OR UPDATE OR DELETE ON %I
FOR EACH ROW
EXECUTE FUNCTION %I();
', trigger_name, source_partition_name, trigger_function_name);
RAISE NOTICE 'Created table % as a copy of partition % with sync trigger', target_table_name, source_partition_name;
RETURN target_table_name;
END;
$$;
CREATE OR REPLACE FUNCTION list_paginated_payloads_for_offload(
partition_date date,
limit_param int,
offset_param bigint
) RETURNS TABLE (
tenant_id UUID,
id BIGINT,
inserted_at TIMESTAMPTZ,
external_id UUID,
type v1_payload_type,
location v1_payload_location,
external_location_key TEXT,
inline_content JSONB,
updated_at TIMESTAMPTZ
)
LANGUAGE plpgsql AS
$$
DECLARE
partition_date_str varchar;
source_partition_name varchar;
query text;
BEGIN
IF partition_date IS NULL THEN
RAISE EXCEPTION 'partition_date parameter cannot be NULL';
END IF;
SELECT to_char(partition_date, 'YYYYMMDD') INTO partition_date_str;
SELECT format('v1_payload_%s', partition_date_str) INTO source_partition_name;
IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = source_partition_name) THEN
RAISE EXCEPTION 'Partition % does not exist', source_partition_name;
END IF;
query := format('
SELECT tenant_id, id, inserted_at, external_id, type, location,
external_location_key, inline_content, updated_at
FROM %I
ORDER BY tenant_id, inserted_at, id, type
LIMIT $1
OFFSET $2
', source_partition_name);
RETURN QUERY EXECUTE query USING limit_param, offset_param;
END;
$$;
CREATE OR REPLACE FUNCTION swap_v1_payload_partition_with_temp(
partition_date date
) RETURNS text
LANGUAGE plpgsql AS
$$
DECLARE
partition_date_str varchar;
source_partition_name varchar;
temp_table_name varchar;
old_pk_name varchar;
new_pk_name varchar;
partition_start date;
partition_end date;
trigger_function_name varchar;
trigger_name varchar;
BEGIN
IF partition_date IS NULL THEN
RAISE EXCEPTION 'partition_date parameter cannot be NULL';
END IF;
SELECT to_char(partition_date, 'YYYYMMDD') INTO partition_date_str;
SELECT format('v1_payload_%s', partition_date_str) INTO source_partition_name;
SELECT format('v1_payload_offload_tmp_%s', partition_date_str) INTO temp_table_name;
SELECT format('v1_payload_offload_tmp_%s_pkey', partition_date_str) INTO old_pk_name;
SELECT format('v1_payload_%s_pkey', partition_date_str) INTO new_pk_name;
SELECT format('sync_to_%s', temp_table_name) INTO trigger_function_name;
SELECT format('trigger_sync_to_%s', temp_table_name) INTO trigger_name;
IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = temp_table_name) THEN
RAISE EXCEPTION 'Temp table % does not exist', temp_table_name;
END IF;
partition_start := partition_date;
partition_end := partition_date + INTERVAL '1 day';
EXECUTE format(
'ALTER TABLE %I SET (
autovacuum_vacuum_scale_factor = ''0.1'',
autovacuum_analyze_scale_factor = ''0.05'',
autovacuum_vacuum_threshold = ''25'',
autovacuum_analyze_threshold = ''25'',
autovacuum_vacuum_cost_delay = ''10'',
autovacuum_vacuum_cost_limit = ''1000''
)',
temp_table_name
);
RAISE NOTICE 'Set autovacuum settings on partition %', temp_table_name;
LOCK TABLE v1_payload IN ACCESS EXCLUSIVE MODE;
RAISE NOTICE 'Dropping trigger from partition %', source_partition_name;
EXECUTE format('DROP TRIGGER IF EXISTS %I ON %I', trigger_name, source_partition_name);
RAISE NOTICE 'Dropping trigger function %', trigger_function_name;
EXECUTE format('DROP FUNCTION IF EXISTS %I()', trigger_function_name);
IF EXISTS (SELECT 1 FROM pg_tables WHERE tablename = source_partition_name) THEN
RAISE NOTICE 'Dropping old partition %', source_partition_name;
EXECUTE format('ALTER TABLE v1_payload DETACH PARTITION %I', source_partition_name);
EXECUTE format('DROP TABLE %I CASCADE', source_partition_name);
END IF;
RAISE NOTICE 'Renaming primary key % to %', old_pk_name, new_pk_name;
EXECUTE format('ALTER INDEX %I RENAME TO %I', old_pk_name, new_pk_name);
RAISE NOTICE 'Renaming temp table % to %', temp_table_name, source_partition_name;
EXECUTE format('ALTER TABLE %I RENAME TO %I', temp_table_name, source_partition_name);
RAISE NOTICE 'Attaching new partition % to v1_payload', source_partition_name;
EXECUTE format(
'ALTER TABLE v1_payload ATTACH PARTITION %I FOR VALUES FROM (%L) TO (%L)',
source_partition_name,
partition_start,
partition_end
);
RAISE NOTICE 'Dropping hack check constraint';
EXECUTE format(
'ALTER TABLE %I DROP CONSTRAINT %I',
source_partition_name,
temp_table_name || '_iat_chk_bounds'
);
RAISE NOTICE 'Successfully swapped partition %', source_partition_name;
RETURN source_partition_name;
END;
$$;
CREATE TABLE v1_idempotency_key (
tenant_id UUID NOT NULL,