From bede3efe0dccce3378491bb45599f8fa0f17f2ff Mon Sep 17 00:00:00 2001 From: matt Date: Mon, 8 Dec 2025 11:00:24 -0500 Subject: [PATCH] Feat: Process all old partitions in a loop (#2613) * feat: process old partitions in a loop * fix: param * fix: query return * feat: add spans * fix: naming --- .../v1/task/process_payload_wal.go | 2 +- pkg/repository/v1/payloadstore.go | 59 ++++++++++++++----- .../v1/sqlcv1/payload-store-overwrite.sql.go | 48 +++++++++++++++ 3 files changed, 93 insertions(+), 16 deletions(-) diff --git a/internal/services/controllers/v1/task/process_payload_wal.go b/internal/services/controllers/v1/task/process_payload_wal.go index 943de60b0..a3630b907 100644 --- a/internal/services/controllers/v1/task/process_payload_wal.go +++ b/internal/services/controllers/v1/task/process_payload_wal.go @@ -14,7 +14,7 @@ func (tc *TasksControllerImpl) processPayloadExternalCutovers(ctx context.Contex tc.l.Debug().Msgf("payload external cutover: processing external cutover payloads") - err := tc.repov1.Payloads().CopyOffloadedPayloadsIntoTempTable(ctx) + err := tc.repov1.Payloads().ProcessPayloadCutovers(ctx) if err != nil { span.RecordError(err) diff --git a/pkg/repository/v1/payloadstore.go b/pkg/repository/v1/payloadstore.go index b720fd40d..9575b7a3e 100644 --- a/pkg/repository/v1/payloadstore.go +++ b/pkg/repository/v1/payloadstore.go @@ -10,6 +10,7 @@ import ( "github.com/google/uuid" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" "github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1" + "github.com/hatchet-dev/hatchet/pkg/telemetry" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" @@ -65,7 +66,7 @@ type PayloadStoreRepository interface { ExternalCutoverProcessInterval() time.Duration ExternalStoreEnabled() bool ExternalStore() ExternalStore - CopyOffloadedPayloadsIntoTempTable(ctx context.Context) error + ProcessPayloadCutovers(ctx context.Context) error } type payloadStoreRepositoryImpl struct { @@ -566,17 +567,11 @@ func (p *payloadStoreRepositoryImpl) acquireOrExtendJobLease(ctx context.Context }, nil } -func (p *payloadStoreRepositoryImpl) prepareCutoverTableJob(ctx context.Context, processId pgtype.UUID) (*CutoverJobRunMetadata, error) { +func (p *payloadStoreRepositoryImpl) prepareCutoverTableJob(ctx context.Context, processId pgtype.UUID, partitionDate PartitionDate) (*CutoverJobRunMetadata, error) { if p.inlineStoreTTL == nil { return nil, fmt.Errorf("inline store TTL is not set") } - partitionTime := time.Now().Add(-1 * *p.inlineStoreTTL) - partitionDate := PartitionDate(pgtype.Date{ - Time: partitionTime, - Valid: true, - }) - tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l, 10000) if err != nil { @@ -615,13 +610,11 @@ func (p *payloadStoreRepositoryImpl) prepareCutoverTableJob(ctx context.Context, }, nil } -func (p *payloadStoreRepositoryImpl) CopyOffloadedPayloadsIntoTempTable(ctx context.Context) error { - if !p.externalStoreEnabled { - return nil - } +func (p *payloadStoreRepositoryImpl) processSinglePartition(ctx context.Context, processId pgtype.UUID, partitionDate PartitionDate) error { + ctx, span := telemetry.NewSpan(ctx, "payload_store_repository_impl.processSinglePartition") + defer span.End() - processId := sqlchelpers.UUIDFromStr(uuid.NewString()) - jobMeta, err := p.prepareCutoverTableJob(ctx, processId) + jobMeta, err := p.prepareCutoverTableJob(ctx, processId, partitionDate) if err != nil { return fmt.Errorf("failed to prepare cutover table job: %w", err) @@ -631,7 +624,6 @@ func (p *payloadStoreRepositoryImpl) CopyOffloadedPayloadsIntoTempTable(ctx cont return nil } - partitionDate := jobMeta.PartitionDate offset := jobMeta.LastOffset for { @@ -685,6 +677,43 @@ func (p *payloadStoreRepositoryImpl) CopyOffloadedPayloadsIntoTempTable(ctx cont return fmt.Errorf("failed to commit swap payload cutover temp table transaction: %w", err) } + return nil +} + +func (p *payloadStoreRepositoryImpl) ProcessPayloadCutovers(ctx context.Context) error { + if !p.externalStoreEnabled { + return nil + } + + ctx, span := telemetry.NewSpan(ctx, "payload_store_repository_impl.ProcessPayloadCutovers") + defer span.End() + + if p.inlineStoreTTL == nil { + return fmt.Errorf("inline store TTL is not set") + } + + mostRecentPartitionToOffload := pgtype.Date{ + Time: time.Now().Add(-1 * *p.inlineStoreTTL), + Valid: true, + } + + partitions, err := p.queries.FindV1PayloadPartitionsBeforeDate(ctx, p.pool, mostRecentPartitionToOffload) + + if err != nil { + return fmt.Errorf("failed to find payload partitions before date %s: %w", mostRecentPartitionToOffload.Time.String(), err) + } + + processId := sqlchelpers.UUIDFromStr(uuid.NewString()) + + for _, partition := range partitions { + p.l.Info().Str("partition", partition.PartitionName).Msg("processing payload cutover for partition") + err = p.processSinglePartition(ctx, processId, PartitionDate(partition.PartitionDate)) + + if err != nil { + return fmt.Errorf("failed to process partition %s: %w", partition.PartitionName, err) + } + } + return nil } diff --git a/pkg/repository/v1/sqlcv1/payload-store-overwrite.sql.go b/pkg/repository/v1/sqlcv1/payload-store-overwrite.sql.go index f94aaa766..729c0b80b 100644 --- a/pkg/repository/v1/sqlcv1/payload-store-overwrite.sql.go +++ b/pkg/repository/v1/sqlcv1/payload-store-overwrite.sql.go @@ -113,3 +113,51 @@ func ComparePartitionRowCounts(ctx context.Context, tx DBTX, tempPartitionName, return tempPartitionCount == sourcePartitionCount, nil } + +const findV1PayloadPartitionsBeforeDate = `-- name: findV1PayloadPartitionsBeforeDate :many +WITH partitions AS ( + SELECT + child.relname::text AS partition_name, + SUBSTRING(pg_get_expr(child.relpartbound, child.oid) FROM 'FROM \(''([^'']+)')::DATE AS lower_bound, + SUBSTRING(pg_get_expr(child.relpartbound, child.oid) FROM 'TO \(''([^'']+)')::DATE AS upper_bound + FROM pg_inherits + JOIN pg_class parent ON pg_inherits.inhparent = parent.oid + JOIN pg_class child ON pg_inherits.inhrelid = child.oid + WHERE parent.relname = 'v1_payload' + ORDER BY child.relname +) + +SELECT partition_name, lower_bound AS partition_date +FROM partitions +WHERE lower_bound <= $1::DATE +` + +type FindV1PayloadPartitionsBeforeDateRow struct { + PartitionName string `json:"partition_name"` + PartitionDate pgtype.Date `json:"partition_date"` +} + +func (q *Queries) FindV1PayloadPartitionsBeforeDate(ctx context.Context, db DBTX, date pgtype.Date) ([]*FindV1PayloadPartitionsBeforeDateRow, error) { + rows, err := db.Query(ctx, findV1PayloadPartitionsBeforeDate, + date, + ) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*FindV1PayloadPartitionsBeforeDateRow + for rows.Next() { + var i FindV1PayloadPartitionsBeforeDateRow + if err := rows.Scan( + &i.PartitionName, + &i.PartitionDate, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +}