Feat: Process all old partitions in a loop (#2613)

* feat: process old partitions in a loop

* fix: param

* fix: query return

* feat: add spans

* fix: naming
This commit is contained in:
matt
2025-12-08 11:00:24 -05:00
committed by GitHub
parent cebf3a6fa7
commit bede3efe0d
3 changed files with 93 additions and 16 deletions

View File

@@ -14,7 +14,7 @@ func (tc *TasksControllerImpl) processPayloadExternalCutovers(ctx context.Contex
tc.l.Debug().Msgf("payload external cutover: processing external cutover payloads")
err := tc.repov1.Payloads().CopyOffloadedPayloadsIntoTempTable(ctx)
err := tc.repov1.Payloads().ProcessPayloadCutovers(ctx)
if err != nil {
span.RecordError(err)

View File

@@ -10,6 +10,7 @@ import (
"github.com/google/uuid"
"github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers"
"github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1"
"github.com/hatchet-dev/hatchet/pkg/telemetry"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
@@ -65,7 +66,7 @@ type PayloadStoreRepository interface {
ExternalCutoverProcessInterval() time.Duration
ExternalStoreEnabled() bool
ExternalStore() ExternalStore
CopyOffloadedPayloadsIntoTempTable(ctx context.Context) error
ProcessPayloadCutovers(ctx context.Context) error
}
type payloadStoreRepositoryImpl struct {
@@ -566,17 +567,11 @@ func (p *payloadStoreRepositoryImpl) acquireOrExtendJobLease(ctx context.Context
}, nil
}
func (p *payloadStoreRepositoryImpl) prepareCutoverTableJob(ctx context.Context, processId pgtype.UUID) (*CutoverJobRunMetadata, error) {
func (p *payloadStoreRepositoryImpl) prepareCutoverTableJob(ctx context.Context, processId pgtype.UUID, partitionDate PartitionDate) (*CutoverJobRunMetadata, error) {
if p.inlineStoreTTL == nil {
return nil, fmt.Errorf("inline store TTL is not set")
}
partitionTime := time.Now().Add(-1 * *p.inlineStoreTTL)
partitionDate := PartitionDate(pgtype.Date{
Time: partitionTime,
Valid: true,
})
tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l, 10000)
if err != nil {
@@ -615,13 +610,11 @@ func (p *payloadStoreRepositoryImpl) prepareCutoverTableJob(ctx context.Context,
}, nil
}
func (p *payloadStoreRepositoryImpl) CopyOffloadedPayloadsIntoTempTable(ctx context.Context) error {
if !p.externalStoreEnabled {
return nil
}
func (p *payloadStoreRepositoryImpl) processSinglePartition(ctx context.Context, processId pgtype.UUID, partitionDate PartitionDate) error {
ctx, span := telemetry.NewSpan(ctx, "payload_store_repository_impl.processSinglePartition")
defer span.End()
processId := sqlchelpers.UUIDFromStr(uuid.NewString())
jobMeta, err := p.prepareCutoverTableJob(ctx, processId)
jobMeta, err := p.prepareCutoverTableJob(ctx, processId, partitionDate)
if err != nil {
return fmt.Errorf("failed to prepare cutover table job: %w", err)
@@ -631,7 +624,6 @@ func (p *payloadStoreRepositoryImpl) CopyOffloadedPayloadsIntoTempTable(ctx cont
return nil
}
partitionDate := jobMeta.PartitionDate
offset := jobMeta.LastOffset
for {
@@ -685,6 +677,43 @@ func (p *payloadStoreRepositoryImpl) CopyOffloadedPayloadsIntoTempTable(ctx cont
return fmt.Errorf("failed to commit swap payload cutover temp table transaction: %w", err)
}
return nil
}
func (p *payloadStoreRepositoryImpl) ProcessPayloadCutovers(ctx context.Context) error {
if !p.externalStoreEnabled {
return nil
}
ctx, span := telemetry.NewSpan(ctx, "payload_store_repository_impl.ProcessPayloadCutovers")
defer span.End()
if p.inlineStoreTTL == nil {
return fmt.Errorf("inline store TTL is not set")
}
mostRecentPartitionToOffload := pgtype.Date{
Time: time.Now().Add(-1 * *p.inlineStoreTTL),
Valid: true,
}
partitions, err := p.queries.FindV1PayloadPartitionsBeforeDate(ctx, p.pool, mostRecentPartitionToOffload)
if err != nil {
return fmt.Errorf("failed to find payload partitions before date %s: %w", mostRecentPartitionToOffload.Time.String(), err)
}
processId := sqlchelpers.UUIDFromStr(uuid.NewString())
for _, partition := range partitions {
p.l.Info().Str("partition", partition.PartitionName).Msg("processing payload cutover for partition")
err = p.processSinglePartition(ctx, processId, PartitionDate(partition.PartitionDate))
if err != nil {
return fmt.Errorf("failed to process partition %s: %w", partition.PartitionName, err)
}
}
return nil
}

View File

@@ -113,3 +113,51 @@ func ComparePartitionRowCounts(ctx context.Context, tx DBTX, tempPartitionName,
return tempPartitionCount == sourcePartitionCount, nil
}
const findV1PayloadPartitionsBeforeDate = `-- name: findV1PayloadPartitionsBeforeDate :many
WITH partitions AS (
SELECT
child.relname::text AS partition_name,
SUBSTRING(pg_get_expr(child.relpartbound, child.oid) FROM 'FROM \(''([^'']+)')::DATE AS lower_bound,
SUBSTRING(pg_get_expr(child.relpartbound, child.oid) FROM 'TO \(''([^'']+)')::DATE AS upper_bound
FROM pg_inherits
JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
WHERE parent.relname = 'v1_payload'
ORDER BY child.relname
)
SELECT partition_name, lower_bound AS partition_date
FROM partitions
WHERE lower_bound <= $1::DATE
`
type FindV1PayloadPartitionsBeforeDateRow struct {
PartitionName string `json:"partition_name"`
PartitionDate pgtype.Date `json:"partition_date"`
}
func (q *Queries) FindV1PayloadPartitionsBeforeDate(ctx context.Context, db DBTX, date pgtype.Date) ([]*FindV1PayloadPartitionsBeforeDateRow, error) {
rows, err := db.Query(ctx, findV1PayloadPartitionsBeforeDate,
date,
)
if err != nil {
return nil, err
}
defer rows.Close()
var items []*FindV1PayloadPartitionsBeforeDateRow
for rows.Next() {
var i FindV1PayloadPartitionsBeforeDateRow
if err := rows.Scan(
&i.PartitionName,
&i.PartitionDate,
); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}