diff --git a/cmd/hatchet-migrate/migrate/migrations/20251128190238_v1_0_54.sql b/cmd/hatchet-migrate/migrate/migrations/20251128190238_v1_0_54.sql new file mode 100644 index 000000000..fad074ee0 --- /dev/null +++ b/cmd/hatchet-migrate/migrate/migrations/20251128190238_v1_0_54.sql @@ -0,0 +1,258 @@ +-- +goose Up +-- +goose StatementBegin +CREATE TABLE v1_payload_cutover_job_offset ( + key DATE PRIMARY KEY, + last_offset BIGINT NOT NULL, + is_completed BOOLEAN NOT NULL DEFAULT FALSE +); + +CREATE OR REPLACE FUNCTION copy_v1_payload_partition_structure( + partition_date date +) RETURNS text + LANGUAGE plpgsql AS +$$ +DECLARE + partition_date_str varchar; + source_partition_name varchar; + target_table_name varchar; + trigger_function_name varchar; + trigger_name varchar; + partition_start date; + partition_end date; +BEGIN + SELECT to_char(partition_date, 'YYYYMMDD') INTO partition_date_str; + SELECT format('v1_payload_%s', partition_date_str) INTO source_partition_name; + SELECT format('v1_payload_offload_tmp_%s', partition_date_str) INTO target_table_name; + SELECT format('sync_to_%s', target_table_name) INTO trigger_function_name; + SELECT format('trigger_sync_to_%s', target_table_name) INTO trigger_name; + partition_start := partition_date; + partition_end := partition_date + INTERVAL '1 day'; + + IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = source_partition_name) THEN + RAISE EXCEPTION 'Source partition % does not exist', source_partition_name; + END IF; + + IF EXISTS (SELECT 1 FROM pg_tables WHERE tablename = target_table_name) THEN + RAISE NOTICE 'Target table % already exists, skipping creation', target_table_name; + RETURN target_table_name; + END IF; + + EXECUTE format( + 'CREATE TABLE %I (LIKE %I INCLUDING DEFAULTS INCLUDING CONSTRAINTS INCLUDING INDEXES)', + target_table_name, + source_partition_name + ); + + EXECUTE format(' + ALTER TABLE %I + ADD CONSTRAINT %I + CHECK ( + inserted_at IS NOT NULL + AND inserted_at >= %L::TIMESTAMPTZ + AND inserted_at < %L::TIMESTAMPTZ + ) + ', + target_table_name, + target_table_name || '_iat_chk_bounds', + partition_start, + partition_end + ); + + EXECUTE format(' + CREATE OR REPLACE FUNCTION %I() RETURNS trigger + LANGUAGE plpgsql AS $func$ + BEGIN + IF TG_OP = ''INSERT'' THEN + INSERT INTO %I (tenant_id, id, inserted_at, external_id, type, location, external_location_key, inline_content, updated_at) + VALUES (NEW.tenant_id, NEW.id, NEW.inserted_at, NEW.external_id, NEW.type, NEW.location, NEW.external_location_key, NEW.inline_content, NEW.updated_at) + ON CONFLICT (tenant_id, id, inserted_at, type) DO UPDATE + SET + location = EXCLUDED.location, + external_location_key = EXCLUDED.external_location_key, + inline_content = EXCLUDED.inline_content, + updated_at = EXCLUDED.updated_at; + RETURN NEW; + ELSIF TG_OP = ''UPDATE'' THEN + UPDATE %I + SET + location = NEW.location, + external_location_key = NEW.external_location_key, + inline_content = NEW.inline_content, + updated_at = NEW.updated_at + WHERE + tenant_id = NEW.tenant_id + AND id = NEW.id + AND inserted_at = NEW.inserted_at + AND type = NEW.type; + RETURN NEW; + ELSIF TG_OP = ''DELETE'' THEN + DELETE FROM %I + WHERE + tenant_id = OLD.tenant_id + AND id = OLD.id + AND inserted_at = OLD.inserted_at + AND type = OLD.type; + RETURN OLD; + END IF; + RETURN NULL; + END; + $func$; + ', trigger_function_name, target_table_name, target_table_name, target_table_name); + + EXECUTE format('DROP TRIGGER IF EXISTS %I ON %I', trigger_name, source_partition_name); + + EXECUTE format(' + CREATE TRIGGER %I + AFTER INSERT OR UPDATE OR DELETE ON %I + FOR EACH ROW + EXECUTE FUNCTION %I(); + ', trigger_name, source_partition_name, trigger_function_name); + + RAISE NOTICE 'Created table % as a copy of partition % with sync trigger', target_table_name, source_partition_name; + + RETURN target_table_name; +END; +$$; + +CREATE OR REPLACE FUNCTION list_paginated_payloads_for_offload( + partition_date date, + limit_param int, + offset_param bigint +) RETURNS TABLE ( + tenant_id UUID, + id BIGINT, + inserted_at TIMESTAMPTZ, + external_id UUID, + type v1_payload_type, + location v1_payload_location, + external_location_key TEXT, + inline_content JSONB, + updated_at TIMESTAMPTZ +) + LANGUAGE plpgsql AS +$$ +DECLARE + partition_date_str varchar; + source_partition_name varchar; + query text; +BEGIN + IF partition_date IS NULL THEN + RAISE EXCEPTION 'partition_date parameter cannot be NULL'; + END IF; + + SELECT to_char(partition_date, 'YYYYMMDD') INTO partition_date_str; + SELECT format('v1_payload_%s', partition_date_str) INTO source_partition_name; + + IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = source_partition_name) THEN + RAISE EXCEPTION 'Partition % does not exist', source_partition_name; + END IF; + + query := format(' + SELECT tenant_id, id, inserted_at, external_id, type, location, + external_location_key, inline_content, updated_at + FROM %I + ORDER BY tenant_id, inserted_at, id, type + LIMIT $1 + OFFSET $2 + ', source_partition_name); + + RETURN QUERY EXECUTE query USING limit_param, offset_param; +END; +$$; + +CREATE OR REPLACE FUNCTION swap_v1_payload_partition_with_temp( + partition_date date +) RETURNS text + LANGUAGE plpgsql AS +$$ +DECLARE + partition_date_str varchar; + source_partition_name varchar; + temp_table_name varchar; + old_pk_name varchar; + new_pk_name varchar; + partition_start date; + partition_end date; + trigger_function_name varchar; + trigger_name varchar; +BEGIN + IF partition_date IS NULL THEN + RAISE EXCEPTION 'partition_date parameter cannot be NULL'; + END IF; + + SELECT to_char(partition_date, 'YYYYMMDD') INTO partition_date_str; + SELECT format('v1_payload_%s', partition_date_str) INTO source_partition_name; + SELECT format('v1_payload_offload_tmp_%s', partition_date_str) INTO temp_table_name; + SELECT format('v1_payload_offload_tmp_%s_pkey', partition_date_str) INTO old_pk_name; + SELECT format('v1_payload_%s_pkey', partition_date_str) INTO new_pk_name; + SELECT format('sync_to_%s', temp_table_name) INTO trigger_function_name; + SELECT format('trigger_sync_to_%s', temp_table_name) INTO trigger_name; + + IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = temp_table_name) THEN + RAISE EXCEPTION 'Temp table % does not exist', temp_table_name; + END IF; + + partition_start := partition_date; + partition_end := partition_date + INTERVAL '1 day'; + + EXECUTE format( + 'ALTER TABLE %I SET ( + autovacuum_vacuum_scale_factor = ''0.1'', + autovacuum_analyze_scale_factor = ''0.05'', + autovacuum_vacuum_threshold = ''25'', + autovacuum_analyze_threshold = ''25'', + autovacuum_vacuum_cost_delay = ''10'', + autovacuum_vacuum_cost_limit = ''1000'' + )', + temp_table_name + ); + RAISE NOTICE 'Set autovacuum settings on partition %', temp_table_name; + + LOCK TABLE v1_payload IN ACCESS EXCLUSIVE MODE; + + RAISE NOTICE 'Dropping trigger from partition %', source_partition_name; + EXECUTE format('DROP TRIGGER IF EXISTS %I ON %I', trigger_name, source_partition_name); + + RAISE NOTICE 'Dropping trigger function %', trigger_function_name; + EXECUTE format('DROP FUNCTION IF EXISTS %I()', trigger_function_name); + + IF EXISTS (SELECT 1 FROM pg_tables WHERE tablename = source_partition_name) THEN + RAISE NOTICE 'Dropping old partition %', source_partition_name; + EXECUTE format('ALTER TABLE v1_payload DETACH PARTITION %I', source_partition_name); + EXECUTE format('DROP TABLE %I CASCADE', source_partition_name); + END IF; + + RAISE NOTICE 'Renaming primary key % to %', old_pk_name, new_pk_name; + EXECUTE format('ALTER INDEX %I RENAME TO %I', old_pk_name, new_pk_name); + + RAISE NOTICE 'Renaming temp table % to %', temp_table_name, source_partition_name; + EXECUTE format('ALTER TABLE %I RENAME TO %I', temp_table_name, source_partition_name); + + RAISE NOTICE 'Attaching new partition % to v1_payload', source_partition_name; + EXECUTE format( + 'ALTER TABLE v1_payload ATTACH PARTITION %I FOR VALUES FROM (%L) TO (%L)', + source_partition_name, + partition_start, + partition_end + ); + + RAISE NOTICE 'Dropping hack check constraint'; + EXECUTE format( + 'ALTER TABLE %I DROP CONSTRAINT %I', + source_partition_name, + temp_table_name || '_iat_chk_bounds' + ); + + RAISE NOTICE 'Successfully swapped partition %', source_partition_name; + RETURN source_partition_name; +END; +$$; +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +DROP TABLE v1_payload_cutover_job_offset; +DROP FUNCTION copy_v1_payload_partition_structure(date); +DROP FUNCTION list_paginated_payloads_for_offload(date, int, bigint); +DROP FUNCTION swap_v1_payload_partition_with_temp(date); +-- +goose StatementEnd diff --git a/internal/services/controllers/v1/task/controller.go b/internal/services/controllers/v1/task/controller.go index d8a98872d..57c183186 100644 --- a/internal/services/controllers/v1/task/controller.go +++ b/internal/services/controllers/v1/task/controller.go @@ -42,29 +42,27 @@ type TasksController interface { } type TasksControllerImpl struct { - mq msgqueue.MessageQueue - pubBuffer *msgqueue.MQPubBuffer - l *zerolog.Logger - queueLogger *zerolog.Logger - pgxStatsLogger *zerolog.Logger - repo repository.EngineRepository - repov1 v1.Repository - dv datautils.DataDecoderValidator - s gocron.Scheduler - a *hatcheterrors.Wrapped - p *partition.Partition - celParser *cel.CELParser - opsPoolPollInterval time.Duration - opsPoolJitter time.Duration - timeoutTaskOperations *operation.OperationPool - reassignTaskOperations *operation.OperationPool - retryTaskOperations *operation.OperationPool - emitSleepOperations *operation.OperationPool - evictExpiredIdempotencyKeysOperations *operation.OperationPool - processPayloadWALOperations *queueutils.OperationPool[int64] - processPayloadExternalCutoversOperations *queueutils.OperationPool[int64] - replayEnabled bool - analyzeCronInterval time.Duration + mq msgqueue.MessageQueue + pubBuffer *msgqueue.MQPubBuffer + l *zerolog.Logger + queueLogger *zerolog.Logger + pgxStatsLogger *zerolog.Logger + repo repository.EngineRepository + repov1 v1.Repository + dv datautils.DataDecoderValidator + s gocron.Scheduler + a *hatcheterrors.Wrapped + p *partition.Partition + celParser *cel.CELParser + opsPoolPollInterval time.Duration + opsPoolJitter time.Duration + timeoutTaskOperations *operation.OperationPool + reassignTaskOperations *operation.OperationPool + retryTaskOperations *operation.OperationPool + emitSleepOperations *operation.OperationPool + evictExpiredIdempotencyKeysOperations *operation.OperationPool + replayEnabled bool + analyzeCronInterval time.Duration } type TasksControllerOpt func(*TasksControllerOpts) @@ -284,9 +282,6 @@ func New(fs ...TasksControllerOpt) (*TasksControllerImpl, error) { opts.repov1.Tasks().DefaultTaskActivityGauge, )) - t.processPayloadWALOperations = queueutils.NewOperationPool(opts.l, timeout, "process payload WAL", t.processPayloadWAL).WithJitter(jitter) - t.processPayloadExternalCutoversOperations = queueutils.NewOperationPool(opts.l, timeout, "process payload external cutovers", t.processPayloadExternalCutovers).WithJitter(jitter) - return t, nil } @@ -332,29 +327,12 @@ func (tc *TasksControllerImpl) Start() (func() error, error) { return nil, wrappedErr } - _, err = tc.s.NewJob( - gocron.DurationJob(tc.repov1.Payloads().WALProcessInterval()), - gocron.NewTask( - tc.runProcessPayloadWAL(spanContext), - ), - ) - - if err != nil { - wrappedErr := fmt.Errorf("could not schedule process payload WAL: %w", err) - - cancel() - span.RecordError(err) - span.SetStatus(codes.Error, "could not run process payload WAL") - span.End() - - return nil, wrappedErr - } - _, err = tc.s.NewJob( gocron.DurationJob(tc.repov1.Payloads().ExternalCutoverProcessInterval()), gocron.NewTask( - tc.runProcessPayloadExternalCutovers(spanContext), + tc.processPayloadExternalCutovers(spanContext), ), + gocron.WithSingletonMode(gocron.LimitModeReschedule), ) if err != nil { diff --git a/internal/services/controllers/v1/task/process_payload_wal.go b/internal/services/controllers/v1/task/process_payload_wal.go index c742fffd5..943de60b0 100644 --- a/internal/services/controllers/v1/task/process_payload_wal.go +++ b/internal/services/controllers/v1/task/process_payload_wal.go @@ -2,40 +2,24 @@ package task import ( "context" + + "github.com/hatchet-dev/hatchet/pkg/telemetry" + "go.opentelemetry.io/otel/codes" ) -func (tc *TasksControllerImpl) runProcessPayloadWAL(ctx context.Context) func() { +func (tc *TasksControllerImpl) processPayloadExternalCutovers(ctx context.Context) func() { return func() { - tc.l.Debug().Msgf("processing payload WAL") + ctx, span := telemetry.NewSpan(ctx, "TasksControllerImpl.processPayloadExternalCutovers") + defer span.End() - partitions := []int64{0, 1, 2, 3} + tc.l.Debug().Msgf("payload external cutover: processing external cutover payloads") - tc.processPayloadWALOperations.SetPartitions(partitions) + err := tc.repov1.Payloads().CopyOffloadedPayloadsIntoTempTable(ctx) - for _, partitionId := range partitions { - tc.processPayloadWALOperations.RunOrContinue(partitionId) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "could not process external cutover payloads") + tc.l.Error().Err(err).Msg("could not process external cutover payloads") } } } - -func (tc *TasksControllerImpl) processPayloadWAL(ctx context.Context, partitionNumber int64) (bool, error) { - return tc.repov1.Payloads().ProcessPayloadWAL(ctx, partitionNumber, tc.pubBuffer) -} - -func (tc *TasksControllerImpl) runProcessPayloadExternalCutovers(ctx context.Context) func() { - return func() { - tc.l.Debug().Msgf("processing payload external cutovers") - - partitions := []int64{0, 1, 2, 3} - - tc.processPayloadExternalCutoversOperations.SetPartitions(partitions) - - for _, partitionId := range partitions { - tc.processPayloadExternalCutoversOperations.RunOrContinue(partitionId) - } - } -} - -func (tc *TasksControllerImpl) processPayloadExternalCutovers(ctx context.Context, partitionNumber int64) (bool, error) { - return tc.repov1.Payloads().ProcessPayloadExternalCutovers(ctx, partitionNumber) -} diff --git a/pkg/config/loader/loader.go b/pkg/config/loader/loader.go index 1c28be88e..96293dc79 100644 --- a/pkg/config/loader/loader.go +++ b/pkg/config/loader/loader.go @@ -293,15 +293,17 @@ func (c *ConfigLoader) InitDataLayer() (res *database.Layer, err error) { DurableSleepLimit: scf.Runtime.TaskOperationLimits.DurableSleepLimit, } + inlineStoreTTL := time.Duration(scf.PayloadStore.InlineStoreTTLDays) * 24 * time.Hour + payloadStoreOpts := repov1.PayloadStoreRepositoryOpts{ EnablePayloadDualWrites: scf.PayloadStore.EnablePayloadDualWrites, EnableTaskEventPayloadDualWrites: scf.PayloadStore.EnableTaskEventPayloadDualWrites, EnableOLAPPayloadDualWrites: scf.PayloadStore.EnableOLAPPayloadDualWrites, EnableDagDataPayloadDualWrites: scf.PayloadStore.EnableDagDataPayloadDualWrites, - WALPollLimit: scf.PayloadStore.WALPollLimit, - WALProcessInterval: scf.PayloadStore.WALProcessInterval, ExternalCutoverProcessInterval: scf.PayloadStore.ExternalCutoverProcessInterval, - WALEnabled: scf.PayloadStore.WALEnabled, + ExternalCutoverBatchSize: scf.PayloadStore.ExternalCutoverBatchSize, + InlineStoreTTL: &inlineStoreTTL, + EnableImmediateOffloads: scf.PayloadStore.EnableImmediateOffloads, } statusUpdateOpts := repov1.StatusUpdateBatchSizeLimits{ diff --git a/pkg/config/server/server.go b/pkg/config/server/server.go index b8725845a..dd55ecede 100644 --- a/pkg/config/server/server.go +++ b/pkg/config/server/server.go @@ -642,10 +642,10 @@ type PayloadStoreConfig struct { EnableTaskEventPayloadDualWrites bool `mapstructure:"enableTaskEventPayloadDualWrites" json:"enableTaskEventPayloadDualWrites,omitempty" default:"true"` EnableDagDataPayloadDualWrites bool `mapstructure:"enableDagDataPayloadDualWrites" json:"enableDagDataPayloadDualWrites,omitempty" default:"true"` EnableOLAPPayloadDualWrites bool `mapstructure:"enableOLAPPayloadDualWrites" json:"enableOLAPPayloadDualWrites,omitempty" default:"true"` - WALPollLimit int `mapstructure:"walPollLimit" json:"walPollLimit,omitempty" default:"100"` - WALProcessInterval time.Duration `mapstructure:"walProcessInterval" json:"walProcessInterval,omitempty" default:"15s"` ExternalCutoverProcessInterval time.Duration `mapstructure:"externalCutoverProcessInterval" json:"externalCutoverProcessInterval,omitempty" default:"15s"` - WALEnabled bool `mapstructure:"walEnabled" json:"walEnabled,omitempty" default:"true"` + ExternalCutoverBatchSize int32 `mapstructure:"externalCutoverBatchSize" json:"externalCutoverBatchSize,omitempty" default:"1000"` + InlineStoreTTLDays int32 `mapstructure:"inlineStoreTTLDays" json:"inlineStoreTTLDays,omitempty" default:"2"` + EnableImmediateOffloads bool `mapstructure:"enableImmediateOffloads" json:"enableImmediateOffloads,omitempty" default:"false"` } func (c *ServerConfig) HasService(name string) bool { @@ -920,10 +920,10 @@ func BindAllEnv(v *viper.Viper) { _ = v.BindEnv("payloadStore.enableTaskEventPayloadDualWrites", "SERVER_PAYLOAD_STORE_ENABLE_TASK_EVENT_PAYLOAD_DUAL_WRITES") _ = v.BindEnv("payloadStore.enableDagDataPayloadDualWrites", "SERVER_PAYLOAD_STORE_ENABLE_DAG_DATA_PAYLOAD_DUAL_WRITES") _ = v.BindEnv("payloadStore.enableOLAPPayloadDualWrites", "SERVER_PAYLOAD_STORE_ENABLE_OLAP_PAYLOAD_DUAL_WRITES") - _ = v.BindEnv("payloadStore.walPollLimit", "SERVER_PAYLOAD_STORE_WAL_POLL_LIMIT") - _ = v.BindEnv("payloadStore.walProcessInterval", "SERVER_PAYLOAD_STORE_WAL_PROCESS_INTERVAL") _ = v.BindEnv("payloadStore.externalCutoverProcessInterval", "SERVER_PAYLOAD_STORE_EXTERNAL_CUTOVER_PROCESS_INTERVAL") - _ = v.BindEnv("payloadStore.walEnabled", "SERVER_PAYLOAD_STORE_WAL_ENABLED") + _ = v.BindEnv("payloadStore.externalCutoverBatchSize", "SERVER_PAYLOAD_STORE_EXTERNAL_CUTOVER_BATCH_SIZE") + _ = v.BindEnv("payloadStore.inlineStoreTTLDays", "SERVER_PAYLOAD_STORE_INLINE_STORE_TTL_DAYS") + _ = v.BindEnv("payloadStore.enableImmediateOffloads", "SERVER_PAYLOAD_STORE_ENABLE_IMMEDIATE_OFFLOADS") // cron operations options _ = v.BindEnv("cronOperations.taskAnalyzeCronInterval", "SERVER_CRON_OPERATIONS_TASK_ANALYZE_CRON_INTERVAL") diff --git a/pkg/repository/v1/payloadstore.go b/pkg/repository/v1/payloadstore.go index c208d1cb1..60c8b1626 100644 --- a/pkg/repository/v1/payloadstore.go +++ b/pkg/repository/v1/payloadstore.go @@ -6,14 +6,12 @@ import ( "sort" "time" - msgqueue "github.com/hatchet-dev/hatchet/internal/msgqueue/v1" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" "github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1" - "github.com/hatchet-dev/hatchet/pkg/telemetry" + "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" "github.com/rs/zerolog" - "go.opentelemetry.io/otel/attribute" ) type StorePayloadOpts struct { @@ -57,19 +55,15 @@ type PayloadStoreRepository interface { Store(ctx context.Context, tx sqlcv1.DBTX, payloads ...StorePayloadOpts) error Retrieve(ctx context.Context, tx sqlcv1.DBTX, opts ...RetrievePayloadOpts) (map[RetrievePayloadOpts][]byte, error) RetrieveFromExternal(ctx context.Context, keys ...ExternalPayloadLocationKey) (map[ExternalPayloadLocationKey][]byte, error) - ProcessPayloadWAL(ctx context.Context, partitionNumber int64, pubBuffer *msgqueue.MQPubBuffer) (bool, error) - ProcessPayloadExternalCutovers(ctx context.Context, partitionNumber int64) (bool, error) - OverwriteExternalStore(store ExternalStore, inlineStoreTTL time.Duration) + OverwriteExternalStore(store ExternalStore) DualWritesEnabled() bool TaskEventDualWritesEnabled() bool DagDataDualWritesEnabled() bool OLAPDualWritesEnabled() bool - WALPollLimit() int - WALProcessInterval() time.Duration - WALEnabled() bool ExternalCutoverProcessInterval() time.Duration ExternalStoreEnabled() bool ExternalStore() ExternalStore + CopyOffloadedPayloadsIntoTempTable(ctx context.Context) error } type payloadStoreRepositoryImpl struct { @@ -83,13 +77,9 @@ type payloadStoreRepositoryImpl struct { enableTaskEventPayloadDualWrites bool enableDagDataPayloadDualWrites bool enableOLAPPayloadDualWrites bool - walPollLimit int - walProcessInterval time.Duration externalCutoverProcessInterval time.Duration - // walEnabled controls the payload storage strategy: - // - true: Use WAL system (inline + async external offload) - // - false: Direct external storage (skip inline, save DB space) - walEnabled bool + externalCutoverBatchSize int32 + enableImmediateOffloads bool } type PayloadStoreRepositoryOpts struct { @@ -97,15 +87,10 @@ type PayloadStoreRepositoryOpts struct { EnableTaskEventPayloadDualWrites bool EnableDagDataPayloadDualWrites bool EnableOLAPPayloadDualWrites bool - WALPollLimit int - WALProcessInterval time.Duration ExternalCutoverProcessInterval time.Duration - // WALEnabled controls the payload storage strategy: - // - true: Use WAL (Write-Ahead Log) system - store payloads inline in PostgreSQL first, - // then asynchronously offload to external storage via WAL processing - // - false: Skip WAL system - store payloads directly to external storage immediately, - // saving database space by not storing inline content in PostgreSQL - WALEnabled bool + ExternalCutoverBatchSize int32 + InlineStoreTTL *time.Duration + EnableImmediateOffloads bool } func NewPayloadStoreRepository( @@ -120,16 +105,15 @@ func NewPayloadStoreRepository( queries: queries, externalStoreEnabled: false, - inlineStoreTTL: nil, + inlineStoreTTL: opts.InlineStoreTTL, externalStore: &NoOpExternalStore{}, enablePayloadDualWrites: opts.EnablePayloadDualWrites, enableTaskEventPayloadDualWrites: opts.EnableTaskEventPayloadDualWrites, enableDagDataPayloadDualWrites: opts.EnableDagDataPayloadDualWrites, enableOLAPPayloadDualWrites: opts.EnableOLAPPayloadDualWrites, - walPollLimit: opts.WALPollLimit, - walProcessInterval: opts.WALProcessInterval, externalCutoverProcessInterval: opts.ExternalCutoverProcessInterval, - walEnabled: opts.WALEnabled, + externalCutoverBatchSize: opts.ExternalCutoverBatchSize, + enableImmediateOffloads: opts.EnableImmediateOffloads, } } @@ -141,7 +125,6 @@ type PayloadUniqueKey struct { } func (p *payloadStoreRepositoryImpl) Store(ctx context.Context, tx sqlcv1.DBTX, payloads ...StorePayloadOpts) error { - // We store payloads as array in order to use PostgreSQL's UNNEST to batch insert them taskIds := make([]int64, 0, len(payloads)) taskInsertedAts := make([]pgtype.Timestamptz, 0, len(payloads)) payloadTypes := make([]string, 0, len(payloads)) @@ -150,7 +133,6 @@ func (p *payloadStoreRepositoryImpl) Store(ctx context.Context, tx sqlcv1.DBTX, tenantIds := make([]pgtype.UUID, 0, len(payloads)) locations := make([]string, 0, len(payloads)) externalIds := make([]pgtype.UUID, 0, len(payloads)) - // If the WAL is disabled, we'll store the external location key in the payloads table right away externalLocationKeys := make([]string, 0, len(payloads)) seenPayloadUniqueKeys := make(map[PayloadUniqueKey]struct{}) @@ -160,9 +142,77 @@ func (p *payloadStoreRepositoryImpl) Store(ctx context.Context, tx sqlcv1.DBTX, return payloads[i].InsertedAt.Time.After(payloads[j].InsertedAt.Time) }) - // Handle WAL_ENABLED logic - if p.walEnabled { - // WAL_ENABLED = true: insert into payloads table and WAL table + if p.enableImmediateOffloads && p.externalStoreEnabled { + externalOpts := make([]OffloadToExternalStoreOpts, 0, len(payloads)) + payloadIndexMap := make(map[PayloadUniqueKey]int) + + for i, payload := range payloads { + tenantId := sqlchelpers.UUIDFromStr(payload.TenantId) + uniqueKey := PayloadUniqueKey{ + ID: payload.Id, + InsertedAt: payload.InsertedAt, + TenantId: tenantId, + Type: payload.Type, + } + + if _, exists := seenPayloadUniqueKeys[uniqueKey]; exists { + continue + } + + seenPayloadUniqueKeys[uniqueKey] = struct{}{} + payloadIndexMap[uniqueKey] = i + + externalOpts = append(externalOpts, OffloadToExternalStoreOpts{ + StorePayloadOpts: &payload, + OffloadAt: time.Now(), // Immediate offload + }) + } + + retrieveOptsToExternalKey, err := p.externalStore.Store(ctx, externalOpts...) + if err != nil { + return fmt.Errorf("failed to store in external store: %w", err) + } + + for _, payload := range payloads { + tenantId := sqlchelpers.UUIDFromStr(payload.TenantId) + uniqueKey := PayloadUniqueKey{ + ID: payload.Id, + InsertedAt: payload.InsertedAt, + TenantId: tenantId, + Type: payload.Type, + } + + if _, exists := seenPayloadUniqueKeys[uniqueKey]; !exists { + continue // Skip if already processed + } + + retrieveOpts := RetrievePayloadOpts{ + Id: payload.Id, + InsertedAt: payload.InsertedAt, + Type: payload.Type, + TenantId: tenantId, + } + + externalKey, exists := retrieveOptsToExternalKey[retrieveOpts] + if !exists { + return fmt.Errorf("external key not found for payload %d", payload.Id) + } + + taskIds = append(taskIds, payload.Id) + taskInsertedAts = append(taskInsertedAts, payload.InsertedAt) + payloadTypes = append(payloadTypes, string(payload.Type)) + tenantIds = append(tenantIds, tenantId) + locations = append(locations, string(sqlcv1.V1PayloadLocationEXTERNAL)) + inlineContents = append(inlineContents, nil) + externalIds = append(externalIds, payload.ExternalId) + externalLocationKeys = append(externalLocationKeys, string(externalKey)) + offloadAts = append(offloadAts, pgtype.Timestamptz{Time: time.Now(), Valid: true}) + } + } else { + if p.enableImmediateOffloads { + p.l.Warn().Msg("immediate offloads enabled but external store is not enabled, skipping immediate offloads") + } + for _, payload := range payloads { tenantId := sqlchelpers.UUIDFromStr(payload.TenantId) uniqueKey := PayloadUniqueKey{ @@ -187,89 +237,12 @@ func (p *payloadStoreRepositoryImpl) Store(ctx context.Context, tx sqlcv1.DBTX, externalIds = append(externalIds, payload.ExternalId) externalLocationKeys = append(externalLocationKeys, "") - if p.externalStoreEnabled { - offloadAts = append(offloadAts, pgtype.Timestamptz{Time: payload.InsertedAt.Time.Add(*p.inlineStoreTTL), Valid: true}) - } else { - offloadAts = append(offloadAts, pgtype.Timestamptz{Time: time.Now(), Valid: true}) - } - } - } else { - // WAL_ENABLED = false: Skip inline, go directly to external store, we will: - // 1. Prepare external store options for immediate offload - // 2. Call external store first to get storage keys (S3/GCS URLs) - // 3. Process results and prepare for PostgreSQL with external keys only - if !p.externalStoreEnabled { - return fmt.Errorf("external store must be enabled when WAL is disabled") - } - - // 1. Prepare external store options for immediate offload - externalOpts := make([]OffloadToExternalStoreOpts, 0, len(payloads)) - payloadIndexMap := make(map[PayloadUniqueKey]int) // Map to track original payload indices - - for i, payload := range payloads { - tenantId := sqlchelpers.UUIDFromStr(payload.TenantId) - uniqueKey := PayloadUniqueKey{ - ID: payload.Id, - InsertedAt: payload.InsertedAt, - TenantId: tenantId, - Type: payload.Type, + offloadAt := pgtype.Timestamptz{Time: time.Now(), Valid: true} + if p.inlineStoreTTL != nil { + offloadAt = pgtype.Timestamptz{Time: time.Now().Add(*p.inlineStoreTTL), Valid: true} } - if _, exists := seenPayloadUniqueKeys[uniqueKey]; exists { - continue - } - - seenPayloadUniqueKeys[uniqueKey] = struct{}{} - payloadIndexMap[uniqueKey] = i - - externalOpts = append(externalOpts, OffloadToExternalStoreOpts{ - StorePayloadOpts: &payload, - OffloadAt: time.Now(), // Immediate offload - }) - } - - // 2. Call external store first to get storage keys (S3/GCS URLs) - externalKeys, err := p.externalStore.Store(ctx, externalOpts...) - if err != nil { - return fmt.Errorf("failed to store in external store: %w", err) - } - - // 3. Process results and prepare for PostgreSQL with external keys only - for _, payload := range payloads { - tenantId := sqlchelpers.UUIDFromStr(payload.TenantId) - uniqueKey := PayloadUniqueKey{ - ID: payload.Id, - InsertedAt: payload.InsertedAt, - TenantId: tenantId, - Type: payload.Type, - } - - if _, exists := seenPayloadUniqueKeys[uniqueKey]; !exists { - continue // Skip if already processed - } - - // Find the external key for this payload - retrieveOpts := RetrievePayloadOpts{ - Id: payload.Id, - InsertedAt: payload.InsertedAt, - Type: payload.Type, - TenantId: tenantId, - } - - externalKey, exists := externalKeys[retrieveOpts] - if !exists { - return fmt.Errorf("external key not found for payload %d", payload.Id) - } - - taskIds = append(taskIds, payload.Id) - taskInsertedAts = append(taskInsertedAts, payload.InsertedAt) - payloadTypes = append(payloadTypes, string(payload.Type)) - tenantIds = append(tenantIds, tenantId) - locations = append(locations, string(sqlcv1.V1PayloadLocationEXTERNAL)) - inlineContents = append(inlineContents, nil) // No inline content - externalIds = append(externalIds, payload.ExternalId) - externalLocationKeys = append(externalLocationKeys, string(externalKey)) - offloadAts = append(offloadAts, pgtype.Timestamptz{Time: time.Now(), Valid: true}) + offloadAts = append(offloadAts, offloadAt) } } @@ -288,22 +261,6 @@ func (p *payloadStoreRepositoryImpl) Store(ctx context.Context, tx sqlcv1.DBTX, return fmt.Errorf("failed to write payloads: %w", err) } - // Only write to WAL if external store is configured AND WAL is enabled - // When WAL is disabled, payloads are already in external storage, so no WAL needed - if p.externalStoreEnabled && p.walEnabled { - err = p.queries.WritePayloadWAL(ctx, tx, sqlcv1.WritePayloadWALParams{ - Tenantids: tenantIds, - Payloadids: taskIds, - Payloadinsertedats: taskInsertedAts, - Payloadtypes: payloadTypes, - Offloadats: offloadAts, - }) - - if err != nil { - return fmt.Errorf("failed to write payload WAL: %w", err) - } - } - return err } @@ -393,274 +350,8 @@ func (p *payloadStoreRepositoryImpl) retrieve(ctx context.Context, tx sqlcv1.DBT return optsToPayload, nil } -func (p *payloadStoreRepositoryImpl) offloadToExternal(ctx context.Context, payloads ...OffloadToExternalStoreOpts) (map[RetrievePayloadOpts]ExternalPayloadLocationKey, error) { - ctx, span := telemetry.NewSpan(ctx, "payloadstore.offload_to_external_store") - defer span.End() - - span.SetAttributes(attribute.Int("payloadstore.offload_to_external_store.num_payloads_to_offload", len(payloads))) - - // this is only intended to be called from ProcessPayloadWAL, which short-circuits if external store is not enabled - if !p.externalStoreEnabled { - return nil, fmt.Errorf("external store not enabled") - } - - return p.externalStore.Store(ctx, payloads...) -} - -func (p *payloadStoreRepositoryImpl) ProcessPayloadWAL(ctx context.Context, partitionNumber int64, pubBuffer *msgqueue.MQPubBuffer) (bool, error) { - // no need to process the WAL if external store is not enabled - if !p.externalStoreEnabled { - return false, nil - } - - ctx, span := telemetry.NewSpan(ctx, "payloadstore.process_payload_wal") - defer span.End() - - tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l, 30000) - - if err != nil { - return false, fmt.Errorf("failed to prepare transaction: %w", err) - } - - defer rollback() - - advisoryLockAcquired, err := p.queries.TryAdvisoryLock(ctx, tx, hash(fmt.Sprintf("process-payload-wal-lease-%d", partitionNumber))) - - if err != nil { - return false, fmt.Errorf("failed to acquire advisory lock: %w", err) - } - - if !advisoryLockAcquired { - return false, nil - } - - walRecords, err := p.queries.PollPayloadWALForRecordsToReplicate(ctx, tx, sqlcv1.PollPayloadWALForRecordsToReplicateParams{ - Polllimit: int32(p.walPollLimit), - Partitionnumber: int32(partitionNumber), - }) - - if err != nil { - return false, err - } - - hasMoreWALRecords := len(walRecords) == p.walPollLimit - - if len(walRecords) == 0 { - return false, nil - } - - retrieveOpts := make([]RetrievePayloadOpts, len(walRecords)) - retrieveOptsToOffloadAt := make(map[RetrievePayloadOpts]pgtype.Timestamptz) - retrieveOptsToPayload := make(map[RetrievePayloadOpts][]byte) - - for i, record := range walRecords { - opts := RetrievePayloadOpts{ - Id: record.PayloadID, - InsertedAt: record.PayloadInsertedAt, - Type: record.PayloadType, - TenantId: record.TenantID, - } - - retrieveOpts[i] = opts - retrieveOptsToOffloadAt[opts] = record.OffloadAt - - if record.Location == sqlcv1.V1PayloadLocationINLINE { - retrieveOptsToPayload[opts] = record.InlineContent - } - } - - externalStoreOpts := make([]OffloadToExternalStoreOpts, 0) - minOffloadAt := time.Now().Add(100 * time.Hour) - offloadLag := time.Since(minOffloadAt).Seconds() - - attrs := []attribute.KeyValue{ - { - Key: "payloadstore.process_payload_wal.payload_wal_offload_partition_number", - Value: attribute.Int64Value(partitionNumber), - }, - { - Key: "payloadstore.process_payload_wal.payload_wal_offload_count", - Value: attribute.IntValue(len(retrieveOpts)), - }, - { - Key: "payloadstore.process_payload_wal.payload_wal_offload_lag_seconds", - Value: attribute.Float64Value(offloadLag), - }, - } - - span.SetAttributes(attrs...) - - for _, opts := range retrieveOpts { - offloadAt, ok := retrieveOptsToOffloadAt[opts] - - if !ok { - return false, fmt.Errorf("offload at not found for opts: %+v", opts) - } - - externalStoreOpts = append(externalStoreOpts, OffloadToExternalStoreOpts{ - StorePayloadOpts: &StorePayloadOpts{ - Id: opts.Id, - InsertedAt: opts.InsertedAt, - Type: opts.Type, - Payload: retrieveOptsToPayload[opts], - TenantId: opts.TenantId.String(), - }, - OffloadAt: offloadAt.Time, - }) - - if offloadAt.Time.Before(minOffloadAt) { - minOffloadAt = offloadAt.Time - } - } - - if err := commit(ctx); err != nil { - return false, err - } - - retrieveOptsToStoredKey, err := p.offloadToExternal(ctx, externalStoreOpts...) - - if err != nil { - return false, err - } - - offloadAts := make([]pgtype.Timestamptz, 0, len(retrieveOptsToStoredKey)) - ids := make([]int64, 0, len(retrieveOptsToStoredKey)) - insertedAts := make([]pgtype.Timestamptz, 0, len(retrieveOptsToStoredKey)) - types := make([]string, 0, len(retrieveOptsToStoredKey)) - tenantIds := make([]pgtype.UUID, 0, len(retrieveOptsToStoredKey)) - externalLocationKeys := make([]string, 0, len(retrieveOptsToStoredKey)) - - for _, opt := range retrieveOpts { - offloadAt, exists := retrieveOptsToOffloadAt[opt] - - if !exists { - return false, fmt.Errorf("offload at not found for opts: %+v", opt) - } - - offloadAts = append(offloadAts, offloadAt) - ids = append(ids, opt.Id) - insertedAts = append(insertedAts, opt.InsertedAt) - types = append(types, string(opt.Type)) - - key, ok := retrieveOptsToStoredKey[opt] - if !ok { - // important: if there's no key here, it's likely because the payloads table did not contain the payload - // this is okay - it can happen if e.g. a payload partition is dropped before the WAL is processed (not a great situation, but not catastrophic) - // if this happens, we log an error and set the key to `""` which will allow it to be evicted from the WAL. it'll never cause - // an update in the payloads table because there won't be a matching row - p.l.Error().Int64("id", opt.Id).Time("insertedAt", opt.InsertedAt.Time).Msg("external location key not found for opts") - key = "" - } - - externalLocationKeys = append(externalLocationKeys, string(key)) - tenantIds = append(tenantIds, opt.TenantId) - } - - // Second transaction, persist the offload to the db once we've successfully offloaded to the external store - tx, commit, rollback, err = sqlchelpers.PrepareTx(ctx, p.pool, p.l, 5000) - defer rollback() - - if err != nil { - return false, fmt.Errorf("failed to prepare transaction for offloading: %w", err) - } - - updatedPayloads, err := p.queries.SetPayloadExternalKeys(ctx, tx, sqlcv1.SetPayloadExternalKeysParams{ - Ids: ids, - Insertedats: insertedAts, - Payloadtypes: types, - Offloadats: offloadAts, - Tenantids: tenantIds, - Externallocationkeys: externalLocationKeys, - }) - - if err != nil { - return false, err - } - - tenantIdToPayloads := make(map[string][]OLAPPayloadToOffload) - - for _, updatedPayload := range updatedPayloads { - if updatedPayload == nil || updatedPayload.Type == sqlcv1.V1PayloadTypeTASKEVENTDATA { - continue - } - - tenantIdToPayloads[updatedPayload.TenantID.String()] = append(tenantIdToPayloads[updatedPayload.TenantID.String()], OLAPPayloadToOffload{ - ExternalId: updatedPayload.ExternalID, - ExternalLocationKey: updatedPayload.ExternalLocationKey.String, - }) - } - - if err := commit(ctx); err != nil { - return false, err - } - - // todo: make this transactionally safe - // there's no application-level risk here because the worst case if - // we miss an event is we don't mark the payload as external and there's a bit - // of disk bloat, but it'd be good to not need to worry about that - for tenantId, payloads := range tenantIdToPayloads { - msg, err := OLAPPayloadOffloadMessage(tenantId, payloads) - if err != nil { - return false, fmt.Errorf("failed to create OLAP payload offload message: %w", err) - } - pubBuffer.Pub(ctx, msgqueue.OLAP_QUEUE, msg, false) - } - - return hasMoreWALRecords, nil -} - -func (p *payloadStoreRepositoryImpl) ProcessPayloadExternalCutovers(ctx context.Context, partitionNumber int64) (bool, error) { - // no need to cut over if external store is not enabled - if !p.externalStoreEnabled { - return false, nil - } - - ctx, span := telemetry.NewSpan(ctx, "payloadstore.process_payload_external_cutovers") - defer span.End() - - tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l, 30000) - - if err != nil { - return false, fmt.Errorf("failed to prepare transaction: %w", err) - } - - defer rollback() - - advisoryLockAcquired, err := p.queries.TryAdvisoryLock(ctx, tx, hash(fmt.Sprintf("process-payload-cut-overs-lease-%d", partitionNumber))) - - if err != nil { - return false, fmt.Errorf("failed to acquire advisory lock: %w", err) - } - - if !advisoryLockAcquired { - return false, nil - } - - queueItemsCutOver, err := p.queries.CutOverPayloadsToExternal(ctx, tx, sqlcv1.CutOverPayloadsToExternalParams{ - Polllimit: int32(p.walPollLimit), // nolint: gosec - Partitionnumber: int32(partitionNumber), // nolint: gosec - }) - - if err != nil { - return false, err - } - - if queueItemsCutOver == 0 { - return false, nil - } - - hasMoreQueueItems := int(queueItemsCutOver) == p.walPollLimit - - if err := commit(ctx); err != nil { - return false, err - } - - return hasMoreQueueItems, nil -} - -func (p *payloadStoreRepositoryImpl) OverwriteExternalStore(store ExternalStore, inlineStoreTTL time.Duration) { +func (p *payloadStoreRepositoryImpl) OverwriteExternalStore(store ExternalStore) { p.externalStoreEnabled = true - p.inlineStoreTTL = &inlineStoreTTL p.externalStore = store } @@ -680,14 +371,6 @@ func (p *payloadStoreRepositoryImpl) OLAPDualWritesEnabled() bool { return p.enableOLAPPayloadDualWrites } -func (p *payloadStoreRepositoryImpl) WALPollLimit() int { - return p.walPollLimit -} - -func (p *payloadStoreRepositoryImpl) WALProcessInterval() time.Duration { - return p.walProcessInterval -} - func (p *payloadStoreRepositoryImpl) ExternalCutoverProcessInterval() time.Duration { return p.externalCutoverProcessInterval } @@ -700,8 +383,295 @@ func (p *payloadStoreRepositoryImpl) ExternalStore() ExternalStore { return p.externalStore } -func (p *payloadStoreRepositoryImpl) WALEnabled() bool { - return p.walEnabled +type BulkCutOverPayload struct { + TenantID pgtype.UUID + Id int64 + InsertedAt pgtype.Timestamptz + ExternalId pgtype.UUID + Type sqlcv1.V1PayloadType + ExternalLocationKey ExternalPayloadLocationKey +} + +type CutoverBatchOutcome struct { + ShouldContinue bool + NextOffset int64 +} + +func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Context, partitionDate pgtype.Date, partitionDateStr string, offset int64) (*CutoverBatchOutcome, error) { + tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l, 10000) + + if err != nil { + return nil, fmt.Errorf("failed to prepare transaction for copying offloaded payloads: %w", err) + } + + defer rollback() + + tableName := fmt.Sprintf("v1_payload_offload_tmp_%s", partitionDateStr) + payloads, err := p.queries.ListPaginatedPayloadsForOffload(ctx, tx, sqlcv1.ListPaginatedPayloadsForOffloadParams{ + Partitiondate: partitionDate, + Offsetparam: offset, + Limitparam: p.externalCutoverBatchSize, + }) + + if err != nil { + return nil, fmt.Errorf("failed to list payloads for offload: %w", err) + } + + alreadyExternalPayloads := make(map[RetrievePayloadOpts]ExternalPayloadLocationKey) + offloadOpts := make([]OffloadToExternalStoreOpts, 0, len(payloads)) + retrieveOptsToExternalId := make(map[RetrievePayloadOpts]string) + + for _, payload := range payloads { + if payload.Location != sqlcv1.V1PayloadLocationINLINE { + retrieveOpt := RetrievePayloadOpts{ + Id: payload.ID, + InsertedAt: payload.InsertedAt, + Type: payload.Type, + TenantId: payload.TenantID, + } + + alreadyExternalPayloads[retrieveOpt] = ExternalPayloadLocationKey(payload.ExternalLocationKey) + retrieveOptsToExternalId[retrieveOpt] = payload.ExternalID.String() + } else { + offloadOpts = append(offloadOpts, OffloadToExternalStoreOpts{ + StorePayloadOpts: &StorePayloadOpts{ + Id: payload.ID, + InsertedAt: payload.InsertedAt, + Type: payload.Type, + Payload: payload.InlineContent, + TenantId: payload.TenantID.String(), + ExternalId: payload.ExternalID, + }, + OffloadAt: time.Now(), + }) + retrieveOptsToExternalId[RetrievePayloadOpts{ + Id: payload.ID, + InsertedAt: payload.InsertedAt, + Type: payload.Type, + TenantId: payload.TenantID, + }] = payload.ExternalID.String() + } + } + + retrieveOptsToKey, err := p.ExternalStore().Store(ctx, offloadOpts...) + + if err != nil { + return nil, fmt.Errorf("failed to offload payloads to external store: %w", err) + } + + for r, k := range alreadyExternalPayloads { + retrieveOptsToKey[r] = k + } + + payloadsToInsert := make([]sqlcv1.CutoverPayloadToInsert, 0, len(payloads)) + + for r, k := range retrieveOptsToKey { + externalId := retrieveOptsToExternalId[r] + + payloadsToInsert = append(payloadsToInsert, sqlcv1.CutoverPayloadToInsert{ + TenantID: r.TenantId, + ID: r.Id, + InsertedAt: r.InsertedAt, + ExternalID: sqlchelpers.UUIDFromStr(externalId), + Type: r.Type, + ExternalLocationKey: string(k), + }) + } + + _, err = sqlcv1.InsertCutOverPayloadsIntoTempTable(ctx, tx, tableName, payloadsToInsert) + + if err != nil { + return nil, fmt.Errorf("failed to copy offloaded payloads into temp table: %w", err) + } + + offset += int64(len(payloads)) + + err = p.queries.UpsertLastOffsetForCutoverJob(ctx, tx, sqlcv1.UpsertLastOffsetForCutoverJobParams{ + Key: partitionDate, + Lastoffset: offset, + }) + + if err != nil { + return nil, fmt.Errorf("failed to upsert last offset for cutover job: %w", err) + } + + if err := commit(ctx); err != nil { + return nil, fmt.Errorf("failed to commit copy offloaded payloads transaction: %w", err) + } + + if len(payloads) < int(p.externalCutoverBatchSize) { + return &CutoverBatchOutcome{ + ShouldContinue: false, + NextOffset: offset, + }, nil + } + + return &CutoverBatchOutcome{ + ShouldContinue: true, + NextOffset: offset, + }, nil +} + +type CutoverJobRunMetadata struct { + ShouldRun bool + LastOffset int64 + PartitionDate pgtype.Date + PartitionDateStr string +} + +func (p *payloadStoreRepositoryImpl) prepareCutoverTableJob(ctx context.Context) (*CutoverJobRunMetadata, error) { + if p.inlineStoreTTL == nil { + return nil, fmt.Errorf("inline store TTL is not set") + } + + partitionTime := time.Now().Add(-1 * *p.inlineStoreTTL) + partitionDate := pgtype.Date{ + Time: partitionTime, + Valid: true, + } + partitionDateStr := partitionTime.Format("20060102") + + tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l, 10000) + + if err != nil { + return nil, err + } + + defer rollback() + + hashKey := fmt.Sprintf("payload-cutover-temp-table-lease-%s", partitionDateStr) + + lockAcquired, err := p.queries.TryAdvisoryLock(ctx, tx, hash(hashKey)) + + if err != nil { + return nil, fmt.Errorf("failed to acquire advisory lock for payload cutover temp table: %w", err) + } + + if !lockAcquired { + return &CutoverJobRunMetadata{ + ShouldRun: false, + LastOffset: 0, + PartitionDate: partitionDate, + PartitionDateStr: partitionDateStr, + }, nil + } + + jobStatus, err := p.queries.FindLastOffsetForCutoverJob(ctx, p.pool, partitionDate) + + var offset int64 + var isCompleted bool + + if err != nil { + if err == pgx.ErrNoRows { + offset = 0 + isCompleted = false + } else { + return nil, fmt.Errorf("failed to find last offset for cutover job: %w", err) + } + } else { + offset = jobStatus.LastOffset + isCompleted = jobStatus.IsCompleted + } + + if isCompleted { + return &CutoverJobRunMetadata{ + ShouldRun: false, + LastOffset: offset, + PartitionDate: partitionDate, + PartitionDateStr: partitionDateStr, + }, nil + } + + err = p.queries.CreateV1PayloadCutoverTemporaryTable(ctx, tx, partitionDate) + + if err != nil { + return nil, fmt.Errorf("failed to create payload cutover temporary table: %w", err) + } + + if err := commit(ctx); err != nil { + return nil, fmt.Errorf("failed to commit copy offloaded payloads transaction: %w", err) + } + + return &CutoverJobRunMetadata{ + ShouldRun: true, + LastOffset: offset, + PartitionDate: partitionDate, + PartitionDateStr: partitionDateStr, + }, nil +} + +func (p *payloadStoreRepositoryImpl) CopyOffloadedPayloadsIntoTempTable(ctx context.Context) error { + if !p.externalStoreEnabled { + return nil + } + + jobMeta, err := p.prepareCutoverTableJob(ctx) + + if err != nil { + return fmt.Errorf("failed to prepare cutover table job: %w", err) + } + + if !jobMeta.ShouldRun { + return nil + } + + partitionDate := jobMeta.PartitionDate + partitionDateStr := jobMeta.PartitionDateStr + offset := jobMeta.LastOffset + + for { + outcome, err := p.ProcessPayloadCutoverBatch(ctx, partitionDate, partitionDateStr, offset) + + if err != nil { + return fmt.Errorf("failed to process payload cutover batch: %w", err) + } + + if !outcome.ShouldContinue { + break + } + + offset = outcome.NextOffset + } + + tempPartitionName := fmt.Sprintf("v1_payload_offload_tmp_%s", partitionDateStr) + sourcePartitionName := fmt.Sprintf("v1_payload_%s", partitionDateStr) + + countsEqual, err := sqlcv1.ComparePartitionRowCounts(ctx, p.pool, tempPartitionName, sourcePartitionName) + + if err != nil { + return fmt.Errorf("failed to compare partition row counts: %w", err) + } + + if !countsEqual { + return fmt.Errorf("row counts do not match between temp and source partitions for date %s", partitionDateStr) + } + + tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l, 10000) + + if err != nil { + return fmt.Errorf("failed to prepare transaction for swapping payload cutover temp table: %w", err) + } + + defer rollback() + + err = p.queries.SwapV1PayloadPartitionWithTemp(ctx, tx, partitionDate) + + if err != nil { + return fmt.Errorf("failed to swap payload cutover temp table: %w", err) + } + + err = p.queries.MarkCutoverJobAsCompleted(ctx, tx, partitionDate) + + if err != nil { + return fmt.Errorf("failed to mark cutover job as completed: %w", err) + } + + if err := commit(ctx); err != nil { + return fmt.Errorf("failed to commit swap payload cutover temp table transaction: %w", err) + } + + return nil + } type NoOpExternalStore struct{} diff --git a/pkg/repository/v1/repository.go b/pkg/repository/v1/repository.go index f92c61008..1a80b13c3 100644 --- a/pkg/repository/v1/repository.go +++ b/pkg/repository/v1/repository.go @@ -27,7 +27,7 @@ type Repository interface { Logs() LogLineRepository OverwriteLogsRepository(l LogLineRepository) Payloads() PayloadStoreRepository - OverwriteExternalPayloadStore(o ExternalStore, nativeStoreTTL time.Duration) + OverwriteExternalPayloadStore(o ExternalStore) Workers() WorkerRepository Workflows() WorkflowRepository Ticker() TickerRepository @@ -126,8 +126,8 @@ func (r *repositoryImpl) Payloads() PayloadStoreRepository { return r.payloadStore } -func (r *repositoryImpl) OverwriteExternalPayloadStore(o ExternalStore, nativeStoreTTL time.Duration) { - r.payloadStore.OverwriteExternalStore(o, nativeStoreTTL) +func (r *repositoryImpl) OverwriteExternalPayloadStore(o ExternalStore) { + r.payloadStore.OverwriteExternalStore(o) } func (r *repositoryImpl) Workers() WorkerRepository { diff --git a/pkg/repository/v1/sqlcv1/models.go b/pkg/repository/v1/sqlcv1/models.go index e194467ba..35689175d 100644 --- a/pkg/repository/v1/sqlcv1/models.go +++ b/pkg/repository/v1/sqlcv1/models.go @@ -3129,6 +3129,12 @@ type V1Payload struct { UpdatedAt pgtype.Timestamptz `json:"updated_at"` } +type V1PayloadCutoverJobOffset struct { + Key pgtype.Date `json:"key"` + LastOffset int64 `json:"last_offset"` + IsCompleted bool `json:"is_completed"` +} + type V1PayloadCutoverQueueItem struct { TenantID pgtype.UUID `json:"tenant_id"` CutOverAt pgtype.Timestamptz `json:"cut_over_at"` diff --git a/pkg/repository/v1/sqlcv1/payload-store-overwrite.sql.go b/pkg/repository/v1/sqlcv1/payload-store-overwrite.sql.go new file mode 100644 index 000000000..f94aaa766 --- /dev/null +++ b/pkg/repository/v1/sqlcv1/payload-store-overwrite.sql.go @@ -0,0 +1,115 @@ +package sqlcv1 + +import ( + "context" + "fmt" + + "github.com/jackc/pgx/v5/pgtype" +) + +type CutoverPayloadToInsert struct { + TenantID pgtype.UUID + ID int64 + InsertedAt pgtype.Timestamptz + ExternalID pgtype.UUID + Type V1PayloadType + ExternalLocationKey string +} + +func InsertCutOverPayloadsIntoTempTable(ctx context.Context, tx DBTX, tableName string, payloads []CutoverPayloadToInsert) (int64, error) { + tenantIds := make([]pgtype.UUID, 0, len(payloads)) + ids := make([]int64, 0, len(payloads)) + insertedAts := make([]pgtype.Timestamptz, 0, len(payloads)) + externalIds := make([]pgtype.UUID, 0, len(payloads)) + types := make([]string, 0, len(payloads)) + locations := make([]string, 0, len(payloads)) + externalLocationKeys := make([]string, 0, len(payloads)) + + for _, payload := range payloads { + externalIds = append(externalIds, payload.ExternalID) + tenantIds = append(tenantIds, payload.TenantID) + ids = append(ids, payload.ID) + insertedAts = append(insertedAts, payload.InsertedAt) + types = append(types, string(payload.Type)) + locations = append(locations, string(V1PayloadLocationEXTERNAL)) + externalLocationKeys = append(externalLocationKeys, string(payload.ExternalLocationKey)) + } + + row := tx.QueryRow( + ctx, + fmt.Sprintf( + // we unfortunately need to use `INSERT INTO` instead of `COPY` here + // because we can't have conflict resolution with `COPY`. + ` + WITH inputs AS ( + SELECT + UNNEST($1::UUID[]) AS tenant_id, + UNNEST($2::BIGINT[]) AS id, + UNNEST($3::TIMESTAMPTZ[]) AS inserted_at, + UNNEST($4::UUID[]) AS external_id, + UNNEST($5::TEXT[]) AS type, + UNNEST($6::TEXT[]) AS location, + UNNEST($7::TEXT[]) AS external_location_key + ), inserts AS ( + INSERT INTO %s (tenant_id, id, inserted_at, external_id, type, location, external_location_key, inline_content, updated_at) + SELECT + tenant_id, + id, + inserted_at, + external_id, + type::v1_payload_type, + location::v1_payload_location, + external_location_key, + NULL, + NOW() + FROM inputs + ORDER BY tenant_id, inserted_at, id, type + ON CONFLICT(tenant_id, id, inserted_at, type) DO NOTHING + RETURNING * + ) + + SELECT COUNT(*) + FROM inserts + `, + tableName, + ), + tenantIds, + ids, + insertedAts, + externalIds, + types, + locations, + externalLocationKeys, + ) + + var copyCount int64 + err := row.Scan(©Count) + + return copyCount, err +} + +func ComparePartitionRowCounts(ctx context.Context, tx DBTX, tempPartitionName, sourcePartitionName string) (bool, error) { + row := tx.QueryRow( + ctx, + fmt.Sprintf( + ` + SELECT + (SELECT COUNT(*) FROM %s) AS temp_partition_count, + (SELECT COUNT(*) FROM %s) AS source_partition_count + `, + tempPartitionName, + sourcePartitionName, + ), + ) + + var tempPartitionCount int64 + var sourcePartitionCount int64 + + err := row.Scan(&tempPartitionCount, &sourcePartitionCount) + + if err != nil { + return false, err + } + + return tempPartitionCount == sourcePartitionCount, nil +} diff --git a/pkg/repository/v1/sqlcv1/payload-store.sql b/pkg/repository/v1/sqlcv1/payload-store.sql index b7ba81fde..e41c79f8e 100644 --- a/pkg/repository/v1/sqlcv1/payload-store.sql +++ b/pkg/repository/v1/sqlcv1/payload-store.sql @@ -49,6 +49,7 @@ SELECT i.inline_content FROM inputs i +ORDER BY i.tenant_id, i.inserted_at, i.id, i.type ON CONFLICT (tenant_id, id, inserted_at, type) DO UPDATE SET location = EXCLUDED.location, @@ -214,3 +215,49 @@ FROM queue_items -- name: AnalyzeV1Payload :exec ANALYZE v1_payload; + +-- name: ListPaginatedPayloadsForOffload :many +WITH payloads AS ( + SELECT + (p).* + FROM list_paginated_payloads_for_offload( + @partitionDate::DATE, + @limitParam::INT, + @offsetParam::BIGINT + ) p +) +SELECT + tenant_id::UUID, + id::BIGINT, + inserted_at::TIMESTAMPTZ, + external_id::UUID, + type::v1_payload_type, + location::v1_payload_location, + COALESCE(external_location_key, '')::TEXT AS external_location_key, + inline_content::JSONB AS inline_content, + updated_at::TIMESTAMPTZ +FROM payloads; + +-- name: CreateV1PayloadCutoverTemporaryTable :exec +SELECT copy_v1_payload_partition_structure(@date::DATE); + +-- name: SwapV1PayloadPartitionWithTemp :exec +SELECT swap_v1_payload_partition_with_temp(@date::DATE); + +-- name: FindLastOffsetForCutoverJob :one +SELECT * +FROM v1_payload_cutover_job_offset +WHERE key = @key::DATE; + +-- name: UpsertLastOffsetForCutoverJob :exec +INSERT INTO v1_payload_cutover_job_offset (key, last_offset) +VALUES (@key::DATE, @lastOffset::BIGINT) +ON CONFLICT (key) +DO UPDATE SET last_offset = EXCLUDED.last_offset +; + +-- name: MarkCutoverJobAsCompleted :exec +UPDATE v1_payload_cutover_job_offset +SET is_completed = TRUE +WHERE key = @key::DATE +; diff --git a/pkg/repository/v1/sqlcv1/payload-store.sql.go b/pkg/repository/v1/sqlcv1/payload-store.sql.go index a44f8bf44..e88022550 100644 --- a/pkg/repository/v1/sqlcv1/payload-store.sql.go +++ b/pkg/repository/v1/sqlcv1/payload-store.sql.go @@ -20,6 +20,15 @@ func (q *Queries) AnalyzeV1Payload(ctx context.Context, db DBTX) error { return err } +const createV1PayloadCutoverTemporaryTable = `-- name: CreateV1PayloadCutoverTemporaryTable :exec +SELECT copy_v1_payload_partition_structure($1::DATE) +` + +func (q *Queries) CreateV1PayloadCutoverTemporaryTable(ctx context.Context, db DBTX, date pgtype.Date) error { + _, err := db.Exec(ctx, createV1PayloadCutoverTemporaryTable, date) + return err +} + const cutOverPayloadsToExternal = `-- name: CutOverPayloadsToExternal :one WITH tenants AS ( SELECT UNNEST( @@ -74,6 +83,101 @@ func (q *Queries) CutOverPayloadsToExternal(ctx context.Context, db DBTX, arg Cu return count, err } +const findLastOffsetForCutoverJob = `-- name: FindLastOffsetForCutoverJob :one +SELECT key, last_offset, is_completed +FROM v1_payload_cutover_job_offset +WHERE key = $1::DATE +` + +func (q *Queries) FindLastOffsetForCutoverJob(ctx context.Context, db DBTX, key pgtype.Date) (*V1PayloadCutoverJobOffset, error) { + row := db.QueryRow(ctx, findLastOffsetForCutoverJob, key) + var i V1PayloadCutoverJobOffset + err := row.Scan(&i.Key, &i.LastOffset, &i.IsCompleted) + return &i, err +} + +const listPaginatedPayloadsForOffload = `-- name: ListPaginatedPayloadsForOffload :many +WITH payloads AS ( + SELECT + (p).* + FROM list_paginated_payloads_for_offload( + $1::DATE, + $2::INT, + $3::BIGINT + ) p +) +SELECT + tenant_id::UUID, + id::BIGINT, + inserted_at::TIMESTAMPTZ, + external_id::UUID, + type::v1_payload_type, + location::v1_payload_location, + COALESCE(external_location_key, '')::TEXT AS external_location_key, + inline_content::JSONB AS inline_content, + updated_at::TIMESTAMPTZ +FROM payloads +` + +type ListPaginatedPayloadsForOffloadParams struct { + Partitiondate pgtype.Date `json:"partitiondate"` + Limitparam int32 `json:"limitparam"` + Offsetparam int64 `json:"offsetparam"` +} + +type ListPaginatedPayloadsForOffloadRow struct { + TenantID pgtype.UUID `json:"tenant_id"` + ID int64 `json:"id"` + InsertedAt pgtype.Timestamptz `json:"inserted_at"` + ExternalID pgtype.UUID `json:"external_id"` + Type V1PayloadType `json:"type"` + Location V1PayloadLocation `json:"location"` + ExternalLocationKey string `json:"external_location_key"` + InlineContent []byte `json:"inline_content"` + UpdatedAt pgtype.Timestamptz `json:"updated_at"` +} + +func (q *Queries) ListPaginatedPayloadsForOffload(ctx context.Context, db DBTX, arg ListPaginatedPayloadsForOffloadParams) ([]*ListPaginatedPayloadsForOffloadRow, error) { + rows, err := db.Query(ctx, listPaginatedPayloadsForOffload, arg.Partitiondate, arg.Limitparam, arg.Offsetparam) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*ListPaginatedPayloadsForOffloadRow + for rows.Next() { + var i ListPaginatedPayloadsForOffloadRow + if err := rows.Scan( + &i.TenantID, + &i.ID, + &i.InsertedAt, + &i.ExternalID, + &i.Type, + &i.Location, + &i.ExternalLocationKey, + &i.InlineContent, + &i.UpdatedAt, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const markCutoverJobAsCompleted = `-- name: MarkCutoverJobAsCompleted :exec +UPDATE v1_payload_cutover_job_offset +SET is_completed = TRUE +WHERE key = $1::DATE +` + +func (q *Queries) MarkCutoverJobAsCompleted(ctx context.Context, db DBTX, key pgtype.Date) error { + _, err := db.Exec(ctx, markCutoverJobAsCompleted, key) + return err +} + const pollPayloadWALForRecordsToReplicate = `-- name: PollPayloadWALForRecordsToReplicate :many WITH tenants AS ( SELECT UNNEST( @@ -320,6 +424,32 @@ func (q *Queries) SetPayloadExternalKeys(ctx context.Context, db DBTX, arg SetPa return items, nil } +const swapV1PayloadPartitionWithTemp = `-- name: SwapV1PayloadPartitionWithTemp :exec +SELECT swap_v1_payload_partition_with_temp($1::DATE) +` + +func (q *Queries) SwapV1PayloadPartitionWithTemp(ctx context.Context, db DBTX, date pgtype.Date) error { + _, err := db.Exec(ctx, swapV1PayloadPartitionWithTemp, date) + return err +} + +const upsertLastOffsetForCutoverJob = `-- name: UpsertLastOffsetForCutoverJob :exec +INSERT INTO v1_payload_cutover_job_offset (key, last_offset) +VALUES ($1::DATE, $2::BIGINT) +ON CONFLICT (key) +DO UPDATE SET last_offset = EXCLUDED.last_offset +` + +type UpsertLastOffsetForCutoverJobParams struct { + Key pgtype.Date `json:"key"` + Lastoffset int64 `json:"lastoffset"` +} + +func (q *Queries) UpsertLastOffsetForCutoverJob(ctx context.Context, db DBTX, arg UpsertLastOffsetForCutoverJobParams) error { + _, err := db.Exec(ctx, upsertLastOffsetForCutoverJob, arg.Key, arg.Lastoffset) + return err +} + const writePayloadWAL = `-- name: WritePayloadWAL :exec WITH inputs AS ( SELECT @@ -401,6 +531,7 @@ SELECT i.inline_content FROM inputs i +ORDER BY i.tenant_id, i.inserted_at, i.id, i.type ON CONFLICT (tenant_id, id, inserted_at, type) DO UPDATE SET location = EXCLUDED.location, diff --git a/sql/schema/v1-core.sql b/sql/schema/v1-core.sql index 129cf6490..39a97a5bb 100644 --- a/sql/schema/v1-core.sql +++ b/sql/schema/v1-core.sql @@ -1756,6 +1756,254 @@ BEGIN END; $$; +CREATE TABLE v1_payload_cutover_job_offset ( + key DATE PRIMARY KEY, + last_offset BIGINT NOT NULL, + is_completed BOOLEAN NOT NULL DEFAULT FALSE +); + +CREATE OR REPLACE FUNCTION copy_v1_payload_partition_structure( + partition_date date +) RETURNS text + LANGUAGE plpgsql AS +$$ +DECLARE + partition_date_str varchar; + source_partition_name varchar; + target_table_name varchar; + trigger_function_name varchar; + trigger_name varchar; + partition_start date; + partition_end date; +BEGIN + SELECT to_char(partition_date, 'YYYYMMDD') INTO partition_date_str; + SELECT format('v1_payload_%s', partition_date_str) INTO source_partition_name; + SELECT format('v1_payload_offload_tmp_%s', partition_date_str) INTO target_table_name; + SELECT format('sync_to_%s', target_table_name) INTO trigger_function_name; + SELECT format('trigger_sync_to_%s', target_table_name) INTO trigger_name; + partition_start := partition_date; + partition_end := partition_date + INTERVAL '1 day'; + + IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = source_partition_name) THEN + RAISE EXCEPTION 'Source partition % does not exist', source_partition_name; + END IF; + + IF EXISTS (SELECT 1 FROM pg_tables WHERE tablename = target_table_name) THEN + RAISE NOTICE 'Target table % already exists, skipping creation', target_table_name; + RETURN target_table_name; + END IF; + + EXECUTE format( + 'CREATE TABLE %I (LIKE %I INCLUDING DEFAULTS INCLUDING CONSTRAINTS INCLUDING INDEXES)', + target_table_name, + source_partition_name + ); + + EXECUTE format(' + ALTER TABLE %I + ADD CONSTRAINT %I + CHECK ( + inserted_at IS NOT NULL + AND inserted_at >= %L::TIMESTAMPTZ + AND inserted_at < %L::TIMESTAMPTZ + ) + ', + target_table_name, + target_table_name || '_iat_chk_bounds', + partition_start, + partition_end + ); + + EXECUTE format(' + CREATE OR REPLACE FUNCTION %I() RETURNS trigger + LANGUAGE plpgsql AS $func$ + BEGIN + IF TG_OP = ''INSERT'' THEN + INSERT INTO %I (tenant_id, id, inserted_at, external_id, type, location, external_location_key, inline_content, updated_at) + VALUES (NEW.tenant_id, NEW.id, NEW.inserted_at, NEW.external_id, NEW.type, NEW.location, NEW.external_location_key, NEW.inline_content, NEW.updated_at) + ON CONFLICT (tenant_id, id, inserted_at, type) DO UPDATE + SET + location = EXCLUDED.location, + external_location_key = EXCLUDED.external_location_key, + inline_content = EXCLUDED.inline_content, + updated_at = EXCLUDED.updated_at; + RETURN NEW; + ELSIF TG_OP = ''UPDATE'' THEN + UPDATE %I + SET + location = NEW.location, + external_location_key = NEW.external_location_key, + inline_content = NEW.inline_content, + updated_at = NEW.updated_at + WHERE + tenant_id = NEW.tenant_id + AND id = NEW.id + AND inserted_at = NEW.inserted_at + AND type = NEW.type; + RETURN NEW; + ELSIF TG_OP = ''DELETE'' THEN + DELETE FROM %I + WHERE + tenant_id = OLD.tenant_id + AND id = OLD.id + AND inserted_at = OLD.inserted_at + AND type = OLD.type; + RETURN OLD; + END IF; + RETURN NULL; + END; + $func$; + ', trigger_function_name, target_table_name, target_table_name, target_table_name); + + EXECUTE format('DROP TRIGGER IF EXISTS %I ON %I', trigger_name, source_partition_name); + + EXECUTE format(' + CREATE TRIGGER %I + AFTER INSERT OR UPDATE OR DELETE ON %I + FOR EACH ROW + EXECUTE FUNCTION %I(); + ', trigger_name, source_partition_name, trigger_function_name); + + RAISE NOTICE 'Created table % as a copy of partition % with sync trigger', target_table_name, source_partition_name; + + RETURN target_table_name; +END; +$$; + +CREATE OR REPLACE FUNCTION list_paginated_payloads_for_offload( + partition_date date, + limit_param int, + offset_param bigint +) RETURNS TABLE ( + tenant_id UUID, + id BIGINT, + inserted_at TIMESTAMPTZ, + external_id UUID, + type v1_payload_type, + location v1_payload_location, + external_location_key TEXT, + inline_content JSONB, + updated_at TIMESTAMPTZ +) + LANGUAGE plpgsql AS +$$ +DECLARE + partition_date_str varchar; + source_partition_name varchar; + query text; +BEGIN + IF partition_date IS NULL THEN + RAISE EXCEPTION 'partition_date parameter cannot be NULL'; + END IF; + + SELECT to_char(partition_date, 'YYYYMMDD') INTO partition_date_str; + SELECT format('v1_payload_%s', partition_date_str) INTO source_partition_name; + + IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = source_partition_name) THEN + RAISE EXCEPTION 'Partition % does not exist', source_partition_name; + END IF; + + query := format(' + SELECT tenant_id, id, inserted_at, external_id, type, location, + external_location_key, inline_content, updated_at + FROM %I + ORDER BY tenant_id, inserted_at, id, type + LIMIT $1 + OFFSET $2 + ', source_partition_name); + + RETURN QUERY EXECUTE query USING limit_param, offset_param; +END; +$$; + +CREATE OR REPLACE FUNCTION swap_v1_payload_partition_with_temp( + partition_date date +) RETURNS text + LANGUAGE plpgsql AS +$$ +DECLARE + partition_date_str varchar; + source_partition_name varchar; + temp_table_name varchar; + old_pk_name varchar; + new_pk_name varchar; + partition_start date; + partition_end date; + trigger_function_name varchar; + trigger_name varchar; +BEGIN + IF partition_date IS NULL THEN + RAISE EXCEPTION 'partition_date parameter cannot be NULL'; + END IF; + + SELECT to_char(partition_date, 'YYYYMMDD') INTO partition_date_str; + SELECT format('v1_payload_%s', partition_date_str) INTO source_partition_name; + SELECT format('v1_payload_offload_tmp_%s', partition_date_str) INTO temp_table_name; + SELECT format('v1_payload_offload_tmp_%s_pkey', partition_date_str) INTO old_pk_name; + SELECT format('v1_payload_%s_pkey', partition_date_str) INTO new_pk_name; + SELECT format('sync_to_%s', temp_table_name) INTO trigger_function_name; + SELECT format('trigger_sync_to_%s', temp_table_name) INTO trigger_name; + + IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = temp_table_name) THEN + RAISE EXCEPTION 'Temp table % does not exist', temp_table_name; + END IF; + + partition_start := partition_date; + partition_end := partition_date + INTERVAL '1 day'; + + EXECUTE format( + 'ALTER TABLE %I SET ( + autovacuum_vacuum_scale_factor = ''0.1'', + autovacuum_analyze_scale_factor = ''0.05'', + autovacuum_vacuum_threshold = ''25'', + autovacuum_analyze_threshold = ''25'', + autovacuum_vacuum_cost_delay = ''10'', + autovacuum_vacuum_cost_limit = ''1000'' + )', + temp_table_name + ); + RAISE NOTICE 'Set autovacuum settings on partition %', temp_table_name; + + LOCK TABLE v1_payload IN ACCESS EXCLUSIVE MODE; + + RAISE NOTICE 'Dropping trigger from partition %', source_partition_name; + EXECUTE format('DROP TRIGGER IF EXISTS %I ON %I', trigger_name, source_partition_name); + + RAISE NOTICE 'Dropping trigger function %', trigger_function_name; + EXECUTE format('DROP FUNCTION IF EXISTS %I()', trigger_function_name); + + IF EXISTS (SELECT 1 FROM pg_tables WHERE tablename = source_partition_name) THEN + RAISE NOTICE 'Dropping old partition %', source_partition_name; + EXECUTE format('ALTER TABLE v1_payload DETACH PARTITION %I', source_partition_name); + EXECUTE format('DROP TABLE %I CASCADE', source_partition_name); + END IF; + + RAISE NOTICE 'Renaming primary key % to %', old_pk_name, new_pk_name; + EXECUTE format('ALTER INDEX %I RENAME TO %I', old_pk_name, new_pk_name); + + RAISE NOTICE 'Renaming temp table % to %', temp_table_name, source_partition_name; + EXECUTE format('ALTER TABLE %I RENAME TO %I', temp_table_name, source_partition_name); + + RAISE NOTICE 'Attaching new partition % to v1_payload', source_partition_name; + EXECUTE format( + 'ALTER TABLE v1_payload ATTACH PARTITION %I FOR VALUES FROM (%L) TO (%L)', + source_partition_name, + partition_start, + partition_end + ); + + RAISE NOTICE 'Dropping hack check constraint'; + EXECUTE format( + 'ALTER TABLE %I DROP CONSTRAINT %I', + source_partition_name, + temp_table_name || '_iat_chk_bounds' + ); + + RAISE NOTICE 'Successfully swapped partition %', source_partition_name; + RETURN source_partition_name; +END; +$$; + CREATE TABLE v1_idempotency_key ( tenant_id UUID NOT NULL,