diff --git a/cmd/hatchet-migrate/migrate/migrations/20251205170304_v1_0_55.sql b/cmd/hatchet-migrate/migrate/migrations/20251205170304_v1_0_55.sql new file mode 100644 index 000000000..f63a5ea86 --- /dev/null +++ b/cmd/hatchet-migrate/migrate/migrations/20251205170304_v1_0_55.sql @@ -0,0 +1,14 @@ +-- +goose Up +-- +goose StatementBegin +ALTER TABLE v1_payload_cutover_job_offset +ADD COLUMN lease_process_id UUID NOT NULL DEFAULT gen_random_uuid(), +ADD COLUMN lease_expires_at TIMESTAMPTZ NOT NULL DEFAULT NOW(); + +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +ALTER TABLE v1_payload_cutover_job_offset +DROP COLUMN lease_expires_at, +DROP COLUMN lease_process_id; +-- +goose StatementEnd diff --git a/pkg/repository/v1/payloadstore.go b/pkg/repository/v1/payloadstore.go index 60c8b1626..b720fd40d 100644 --- a/pkg/repository/v1/payloadstore.go +++ b/pkg/repository/v1/payloadstore.go @@ -2,10 +2,12 @@ package v1 import ( "context" + "errors" "fmt" "sort" "time" + "github.com/google/uuid" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" "github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1" "github.com/jackc/pgx/v5" @@ -397,7 +399,13 @@ type CutoverBatchOutcome struct { NextOffset int64 } -func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Context, partitionDate pgtype.Date, partitionDateStr string, offset int64) (*CutoverBatchOutcome, error) { +type PartitionDate pgtype.Date + +func (d PartitionDate) String() string { + return d.Time.Format("20060102") +} + +func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Context, processId pgtype.UUID, partitionDate PartitionDate, offset int64) (*CutoverBatchOutcome, error) { tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l, 10000) if err != nil { @@ -406,9 +414,9 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Cont defer rollback() - tableName := fmt.Sprintf("v1_payload_offload_tmp_%s", partitionDateStr) + tableName := fmt.Sprintf("v1_payload_offload_tmp_%s", partitionDate.String()) payloads, err := p.queries.ListPaginatedPayloadsForOffload(ctx, tx, sqlcv1.ListPaginatedPayloadsForOffloadParams{ - Partitiondate: partitionDate, + Partitiondate: pgtype.Date(partitionDate), Offsetparam: offset, Limitparam: p.externalCutoverBatchSize, }) @@ -486,13 +494,10 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Cont offset += int64(len(payloads)) - err = p.queries.UpsertLastOffsetForCutoverJob(ctx, tx, sqlcv1.UpsertLastOffsetForCutoverJobParams{ - Key: partitionDate, - Lastoffset: offset, - }) + _, err = p.acquireOrExtendJobLease(ctx, tx, processId, partitionDate, offset) if err != nil { - return nil, fmt.Errorf("failed to upsert last offset for cutover job: %w", err) + return nil, fmt.Errorf("failed to extend cutover job lease: %w", err) } if err := commit(ctx); err != nil { @@ -513,23 +518,64 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Cont } type CutoverJobRunMetadata struct { - ShouldRun bool - LastOffset int64 - PartitionDate pgtype.Date - PartitionDateStr string + ShouldRun bool + LastOffset int64 + PartitionDate PartitionDate + LeaseProcessId pgtype.UUID } -func (p *payloadStoreRepositoryImpl) prepareCutoverTableJob(ctx context.Context) (*CutoverJobRunMetadata, error) { +func (p *payloadStoreRepositoryImpl) acquireOrExtendJobLease(ctx context.Context, tx pgx.Tx, processId pgtype.UUID, partitionDate PartitionDate, offset int64) (*CutoverJobRunMetadata, error) { + leaseInterval := 2 * time.Minute + leaseExpiresAt := sqlchelpers.TimestamptzFromTime(time.Now().Add(leaseInterval)) + + lease, err := p.queries.AcquireOrExtendCutoverJobLease(ctx, tx, sqlcv1.AcquireOrExtendCutoverJobLeaseParams{ + Key: pgtype.Date(partitionDate), + Lastoffset: offset, + Leaseprocessid: processId, + Leaseexpiresat: leaseExpiresAt, + }) + + if err != nil { + // ErrNoRows here means that something else is holding the lease + // since we did not insert a new record, and the `UPDATE` returned an empty set + if errors.Is(err, pgx.ErrNoRows) { + return &CutoverJobRunMetadata{ + ShouldRun: false, + LastOffset: 0, + PartitionDate: partitionDate, + LeaseProcessId: processId, + }, nil + } + return nil, fmt.Errorf("failed to create initial cutover job lease: %w", err) + } + + if lease.LeaseProcessID != processId || lease.IsCompleted { + return &CutoverJobRunMetadata{ + ShouldRun: false, + LastOffset: lease.LastOffset, + PartitionDate: partitionDate, + LeaseProcessId: lease.LeaseProcessID, + }, nil + } + + return &CutoverJobRunMetadata{ + ShouldRun: true, + LastOffset: lease.LastOffset, + PartitionDate: partitionDate, + LeaseProcessId: processId, + }, nil +} + +func (p *payloadStoreRepositoryImpl) prepareCutoverTableJob(ctx context.Context, processId pgtype.UUID) (*CutoverJobRunMetadata, error) { if p.inlineStoreTTL == nil { return nil, fmt.Errorf("inline store TTL is not set") } partitionTime := time.Now().Add(-1 * *p.inlineStoreTTL) - partitionDate := pgtype.Date{ + partitionDate := PartitionDate(pgtype.Date{ Time: partitionTime, Valid: true, - } - partitionDateStr := partitionTime.Format("20060102") + }) tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l, 10000) @@ -539,50 +585,19 @@ func (p *payloadStoreRepositoryImpl) prepareCutoverTableJob(ctx context.Context) defer rollback() - hashKey := fmt.Sprintf("payload-cutover-temp-table-lease-%s", partitionDateStr) - - lockAcquired, err := p.queries.TryAdvisoryLock(ctx, tx, hash(hashKey)) + lease, err := p.acquireOrExtendJobLease(ctx, tx, processId, partitionDate, 0) if err != nil { - return nil, fmt.Errorf("failed to acquire advisory lock for payload cutover temp table: %w", err) + return nil, fmt.Errorf("failed to acquire or extend cutover job lease: %w", err) } - if !lockAcquired { - return &CutoverJobRunMetadata{ - ShouldRun: false, - LastOffset: 0, - PartitionDate: partitionDate, - PartitionDateStr: partitionDateStr, - }, nil + if !lease.ShouldRun { + return lease, nil } - jobStatus, err := p.queries.FindLastOffsetForCutoverJob(ctx, p.pool, partitionDate) + offset := lease.LastOffset - var offset int64 - var isCompleted bool - - if err != nil { - if err == pgx.ErrNoRows { - offset = 0 - isCompleted = false - } else { - return nil, fmt.Errorf("failed to find last offset for cutover job: %w", err) - } - } else { - offset = jobStatus.LastOffset - isCompleted = jobStatus.IsCompleted - } - - if isCompleted { - return &CutoverJobRunMetadata{ - ShouldRun: false, - LastOffset: offset, - PartitionDate: partitionDate, - PartitionDateStr: partitionDateStr, - }, nil - } - - err = p.queries.CreateV1PayloadCutoverTemporaryTable(ctx, tx, partitionDate) + err = p.queries.CreateV1PayloadCutoverTemporaryTable(ctx, tx, pgtype.Date(partitionDate)) if err != nil { return nil, fmt.Errorf("failed to create payload cutover temporary table: %w", err) @@ -593,10 +608,10 @@ func (p *payloadStoreRepositoryImpl) prepareCutoverTableJob(ctx context.Context) } return &CutoverJobRunMetadata{ - ShouldRun: true, - LastOffset: offset, - PartitionDate: partitionDate, - PartitionDateStr: partitionDateStr, + ShouldRun: true, + LastOffset: offset, + PartitionDate: partitionDate, + LeaseProcessId: processId, }, nil } @@ -605,7 +620,8 @@ func (p *payloadStoreRepositoryImpl) CopyOffloadedPayloadsIntoTempTable(ctx cont return nil } - jobMeta, err := p.prepareCutoverTableJob(ctx) + processId := sqlchelpers.UUIDFromStr(uuid.NewString()) + jobMeta, err := p.prepareCutoverTableJob(ctx, processId) if err != nil { return fmt.Errorf("failed to prepare cutover table job: %w", err) @@ -616,11 +632,10 @@ func (p *payloadStoreRepositoryImpl) CopyOffloadedPayloadsIntoTempTable(ctx cont } partitionDate := jobMeta.PartitionDate - partitionDateStr := jobMeta.PartitionDateStr offset := jobMeta.LastOffset for { - outcome, err := p.ProcessPayloadCutoverBatch(ctx, partitionDate, partitionDateStr, offset) + outcome, err := p.ProcessPayloadCutoverBatch(ctx, processId, partitionDate, offset) if err != nil { return fmt.Errorf("failed to process payload cutover batch: %w", err) @@ -633,8 +648,8 @@ func (p *payloadStoreRepositoryImpl) CopyOffloadedPayloadsIntoTempTable(ctx cont offset = outcome.NextOffset } - tempPartitionName := fmt.Sprintf("v1_payload_offload_tmp_%s", partitionDateStr) - sourcePartitionName := fmt.Sprintf("v1_payload_%s", partitionDateStr) + tempPartitionName := fmt.Sprintf("v1_payload_offload_tmp_%s", partitionDate.String()) + sourcePartitionName := fmt.Sprintf("v1_payload_%s", partitionDate.String()) countsEqual, err := sqlcv1.ComparePartitionRowCounts(ctx, p.pool, tempPartitionName, sourcePartitionName) @@ -643,7 +658,7 @@ func (p *payloadStoreRepositoryImpl) CopyOffloadedPayloadsIntoTempTable(ctx cont } if !countsEqual { - return fmt.Errorf("row counts do not match between temp and source partitions for date %s", partitionDateStr) + return fmt.Errorf("row counts do not match between temp and source partitions for date %s", partitionDate.String()) } tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l, 10000) @@ -654,13 +669,13 @@ func (p *payloadStoreRepositoryImpl) CopyOffloadedPayloadsIntoTempTable(ctx cont defer rollback() - err = p.queries.SwapV1PayloadPartitionWithTemp(ctx, tx, partitionDate) + err = p.queries.SwapV1PayloadPartitionWithTemp(ctx, tx, pgtype.Date(partitionDate)) if err != nil { return fmt.Errorf("failed to swap payload cutover temp table: %w", err) } - err = p.queries.MarkCutoverJobAsCompleted(ctx, tx, partitionDate) + err = p.queries.MarkCutoverJobAsCompleted(ctx, tx, pgtype.Date(partitionDate)) if err != nil { return fmt.Errorf("failed to mark cutover job as completed: %w", err) diff --git a/pkg/repository/v1/sqlcv1/models.go b/pkg/repository/v1/sqlcv1/models.go index 35689175d..5fb25dfde 100644 --- a/pkg/repository/v1/sqlcv1/models.go +++ b/pkg/repository/v1/sqlcv1/models.go @@ -3130,9 +3130,11 @@ type V1Payload struct { } type V1PayloadCutoverJobOffset struct { - Key pgtype.Date `json:"key"` - LastOffset int64 `json:"last_offset"` - IsCompleted bool `json:"is_completed"` + Key pgtype.Date `json:"key"` + LastOffset int64 `json:"last_offset"` + IsCompleted bool `json:"is_completed"` + LeaseProcessID pgtype.UUID `json:"lease_process_id"` + LeaseExpiresAt pgtype.Timestamptz `json:"lease_expires_at"` } type V1PayloadCutoverQueueItem struct { diff --git a/pkg/repository/v1/sqlcv1/payload-store.sql b/pkg/repository/v1/sqlcv1/payload-store.sql index e41c79f8e..0c85581cb 100644 --- a/pkg/repository/v1/sqlcv1/payload-store.sql +++ b/pkg/repository/v1/sqlcv1/payload-store.sql @@ -244,16 +244,16 @@ SELECT copy_v1_payload_partition_structure(@date::DATE); -- name: SwapV1PayloadPartitionWithTemp :exec SELECT swap_v1_payload_partition_with_temp(@date::DATE); --- name: FindLastOffsetForCutoverJob :one -SELECT * -FROM v1_payload_cutover_job_offset -WHERE key = @key::DATE; - --- name: UpsertLastOffsetForCutoverJob :exec -INSERT INTO v1_payload_cutover_job_offset (key, last_offset) -VALUES (@key::DATE, @lastOffset::BIGINT) +-- name: AcquireOrExtendCutoverJobLease :one +INSERT INTO v1_payload_cutover_job_offset (key, last_offset, lease_process_id, lease_expires_at) +VALUES (@key::DATE, @lastOffset::BIGINT, @leaseProcessId::UUID, @leaseExpiresAt::TIMESTAMPTZ) ON CONFLICT (key) -DO UPDATE SET last_offset = EXCLUDED.last_offset +DO UPDATE SET + last_offset = EXCLUDED.last_offset, + lease_process_id = EXCLUDED.lease_process_id, + lease_expires_at = EXCLUDED.lease_expires_at +WHERE v1_payload_cutover_job_offset.lease_expires_at < NOW() OR v1_payload_cutover_job_offset.lease_process_id = @leaseProcessId::UUID +RETURNING * ; -- name: MarkCutoverJobAsCompleted :exec diff --git a/pkg/repository/v1/sqlcv1/payload-store.sql.go b/pkg/repository/v1/sqlcv1/payload-store.sql.go index e88022550..49ebb9c88 100644 --- a/pkg/repository/v1/sqlcv1/payload-store.sql.go +++ b/pkg/repository/v1/sqlcv1/payload-store.sql.go @@ -11,6 +11,43 @@ import ( "github.com/jackc/pgx/v5/pgtype" ) +const acquireOrExtendCutoverJobLease = `-- name: AcquireOrExtendCutoverJobLease :one +INSERT INTO v1_payload_cutover_job_offset (key, last_offset, lease_process_id, lease_expires_at) +VALUES ($1::DATE, $2::BIGINT, $3::UUID, $4::TIMESTAMPTZ) +ON CONFLICT (key) +DO UPDATE SET + last_offset = EXCLUDED.last_offset, + lease_process_id = EXCLUDED.lease_process_id, + lease_expires_at = EXCLUDED.lease_expires_at +WHERE v1_payload_cutover_job_offset.lease_expires_at < NOW() OR v1_payload_cutover_job_offset.lease_process_id = $3::UUID +RETURNING key, last_offset, is_completed, lease_process_id, lease_expires_at +` + +type AcquireOrExtendCutoverJobLeaseParams struct { + Key pgtype.Date `json:"key"` + Lastoffset int64 `json:"lastoffset"` + Leaseprocessid pgtype.UUID `json:"leaseprocessid"` + Leaseexpiresat pgtype.Timestamptz `json:"leaseexpiresat"` +} + +func (q *Queries) AcquireOrExtendCutoverJobLease(ctx context.Context, db DBTX, arg AcquireOrExtendCutoverJobLeaseParams) (*V1PayloadCutoverJobOffset, error) { + row := db.QueryRow(ctx, acquireOrExtendCutoverJobLease, + arg.Key, + arg.Lastoffset, + arg.Leaseprocessid, + arg.Leaseexpiresat, + ) + var i V1PayloadCutoverJobOffset + err := row.Scan( + &i.Key, + &i.LastOffset, + &i.IsCompleted, + &i.LeaseProcessID, + &i.LeaseExpiresAt, + ) + return &i, err +} + const analyzeV1Payload = `-- name: AnalyzeV1Payload :exec ANALYZE v1_payload ` @@ -83,19 +120,6 @@ func (q *Queries) CutOverPayloadsToExternal(ctx context.Context, db DBTX, arg Cu return count, err } -const findLastOffsetForCutoverJob = `-- name: FindLastOffsetForCutoverJob :one -SELECT key, last_offset, is_completed -FROM v1_payload_cutover_job_offset -WHERE key = $1::DATE -` - -func (q *Queries) FindLastOffsetForCutoverJob(ctx context.Context, db DBTX, key pgtype.Date) (*V1PayloadCutoverJobOffset, error) { - row := db.QueryRow(ctx, findLastOffsetForCutoverJob, key) - var i V1PayloadCutoverJobOffset - err := row.Scan(&i.Key, &i.LastOffset, &i.IsCompleted) - return &i, err -} - const listPaginatedPayloadsForOffload = `-- name: ListPaginatedPayloadsForOffload :many WITH payloads AS ( SELECT @@ -433,23 +457,6 @@ func (q *Queries) SwapV1PayloadPartitionWithTemp(ctx context.Context, db DBTX, d return err } -const upsertLastOffsetForCutoverJob = `-- name: UpsertLastOffsetForCutoverJob :exec -INSERT INTO v1_payload_cutover_job_offset (key, last_offset) -VALUES ($1::DATE, $2::BIGINT) -ON CONFLICT (key) -DO UPDATE SET last_offset = EXCLUDED.last_offset -` - -type UpsertLastOffsetForCutoverJobParams struct { - Key pgtype.Date `json:"key"` - Lastoffset int64 `json:"lastoffset"` -} - -func (q *Queries) UpsertLastOffsetForCutoverJob(ctx context.Context, db DBTX, arg UpsertLastOffsetForCutoverJobParams) error { - _, err := db.Exec(ctx, upsertLastOffsetForCutoverJob, arg.Key, arg.Lastoffset) - return err -} - const writePayloadWAL = `-- name: WritePayloadWAL :exec WITH inputs AS ( SELECT diff --git a/sql/schema/v1-core.sql b/sql/schema/v1-core.sql index 39a97a5bb..03b11aa8a 100644 --- a/sql/schema/v1-core.sql +++ b/sql/schema/v1-core.sql @@ -1759,7 +1759,9 @@ $$; CREATE TABLE v1_payload_cutover_job_offset ( key DATE PRIMARY KEY, last_offset BIGINT NOT NULL, - is_completed BOOLEAN NOT NULL DEFAULT FALSE + is_completed BOOLEAN NOT NULL DEFAULT FALSE, + lease_process_id UUID NOT NULL DEFAULT gen_random_uuid(), + lease_expires_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE OR REPLACE FUNCTION copy_v1_payload_partition_structure(