From bdedab653af289c2bb47f32515055eb1378a406f Mon Sep 17 00:00:00 2001 From: matt Date: Tue, 16 Sep 2025 12:44:55 -0400 Subject: [PATCH] Fix: WAL partition poll function type (#2301) * fix: type * fix: cast to int32 * debug: add logging * debug: more logs * Revert "debug: more logs" This reverts commit 2ff8033f89687e06d07384f6320052160c346e9f. * Revert "debug: add logging" This reverts commit a7aaa05b9cd8b027638eae8fbfefae5e19fadd2b. * fix: rm unnecessary generic * feat: span attrs + names * fix: span naming, more details --- .../controllers/v1/task/controller.go | 2 +- pkg/repository/v1/payloadstore.go | 33 +++++++++++++++++-- pkg/repository/v1/sqlcv1/payload-store.sql | 2 +- pkg/repository/v1/sqlcv1/payload-store.sql.go | 4 +-- 4 files changed, 35 insertions(+), 6 deletions(-) diff --git a/internal/services/controllers/v1/task/controller.go b/internal/services/controllers/v1/task/controller.go index f4c653188..58c19f3a7 100644 --- a/internal/services/controllers/v1/task/controller.go +++ b/internal/services/controllers/v1/task/controller.go @@ -231,7 +231,7 @@ func New(fs ...TasksControllerOpt) (*TasksControllerImpl, error) { t.emitSleepOperations = queueutils.NewOperationPool(opts.l, timeout, "emit sleep step runs", t.processSleeps).WithJitter(jitter) t.reassignTaskOperations = queueutils.NewOperationPool(opts.l, timeout, "reassign step runs", t.processTaskReassignments).WithJitter(jitter) t.retryTaskOperations = queueutils.NewOperationPool(opts.l, timeout, "retry step runs", t.processTaskRetryQueueItems).WithJitter(jitter) - t.processPayloadWALOperations = queueutils.NewOperationPool[int64](opts.l, timeout, "process payload WAL", t.processPayloadWAL).WithJitter(jitter) + t.processPayloadWALOperations = queueutils.NewOperationPool(opts.l, timeout, "process payload WAL", t.processPayloadWAL).WithJitter(jitter) t.evictExpiredIdempotencyKeysOperations = queueutils.NewOperationPool(opts.l, timeout, "evict expired idempotency keys", t.evictExpiredIdempotencyKeys).WithJitter(jitter) return t, nil diff --git a/pkg/repository/v1/payloadstore.go b/pkg/repository/v1/payloadstore.go index ee5fd06a9..75fe2a8a7 100644 --- a/pkg/repository/v1/payloadstore.go +++ b/pkg/repository/v1/payloadstore.go @@ -11,6 +11,7 @@ import ( "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" "github.com/rs/zerolog" + "go.opentelemetry.io/otel/attribute" ) type StorePayloadOpts struct { @@ -236,6 +237,11 @@ func (p *payloadStoreRepositoryImpl) bulkRetrieve(ctx context.Context, tx sqlcv1 } func (p *payloadStoreRepositoryImpl) offloadToExternal(ctx context.Context, payloads ...OffloadToExternalStoreOpts) (map[RetrievePayloadOpts]ExternalPayloadLocationKey, error) { + ctx, span := telemetry.NewSpan(ctx, "payloadstore.offload_to_external_store") + defer span.End() + + span.SetAttributes(attribute.Int("payloadstore.offload_to_external_store.num_payloads_to_offload", len(payloads))) + // this is only intended to be called from ProcessPayloadWAL, which short-circuits if external store is not enabled if !p.externalStoreEnabled { return nil, fmt.Errorf("external store not enabled") @@ -250,7 +256,7 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadWAL(ctx context.Context, part return false, nil } - ctx, span := telemetry.NewSpan(ctx, "process-payload-wal") + ctx, span := telemetry.NewSpan(ctx, "payloadstore.process_payload_wal") defer span.End() tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l, 5000) @@ -275,7 +281,7 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadWAL(ctx context.Context, part walRecords, err := p.queries.PollPayloadWALForRecordsToOffload(ctx, tx, sqlcv1.PollPayloadWALForRecordsToOffloadParams{ Polllimit: int32(pollLimit), - Partitionnumber: partitionNumber, + Partitionnumber: int32(partitionNumber), }) hasMoreWALRecords := len(walRecords) == pollLimit @@ -310,6 +316,25 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadWAL(ctx context.Context, part } externalStoreOpts := make([]OffloadToExternalStoreOpts, 0) + minOffloadAt := time.Now().Add(100 * time.Hour) + offloadLag := time.Since(minOffloadAt).Seconds() + + attrs := []attribute.KeyValue{ + { + Key: "payloadstore.process_payload_wal.payload_wal_offload_partition_number", + Value: attribute.Int64Value(partitionNumber), + }, + { + Key: "payloadstore.process_payload_wal.payload_wal_offload_count", + Value: attribute.IntValue(len(retrieveOpts)), + }, + { + Key: "payloadstore.process_payload_wal.payload_wal_offload_lag_seconds", + Value: attribute.Float64Value(offloadLag), + }, + } + + span.SetAttributes(attrs...) for opts, payload := range payloads { offloadAt, ok := retrieveOptsToOffloadAt[opts] @@ -328,6 +353,10 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadWAL(ctx context.Context, part }, OffloadAt: offloadAt.Time, }) + + if offloadAt.Time.Before(minOffloadAt) { + minOffloadAt = offloadAt.Time + } } if err := commit(ctx); err != nil { diff --git a/pkg/repository/v1/sqlcv1/payload-store.sql b/pkg/repository/v1/sqlcv1/payload-store.sql index bcda52d06..a8797c265 100644 --- a/pkg/repository/v1/sqlcv1/payload-store.sql +++ b/pkg/repository/v1/sqlcv1/payload-store.sql @@ -93,7 +93,7 @@ FROM WITH tenants AS ( SELECT UNNEST( find_matching_tenants_in_payload_wal_partition( - @partitionNumber::BIGINT + @partitionNumber::INT ) ) AS tenant_id ) diff --git a/pkg/repository/v1/sqlcv1/payload-store.sql.go b/pkg/repository/v1/sqlcv1/payload-store.sql.go index 96d0b904b..28fee1ba8 100644 --- a/pkg/repository/v1/sqlcv1/payload-store.sql.go +++ b/pkg/repository/v1/sqlcv1/payload-store.sql.go @@ -67,7 +67,7 @@ const pollPayloadWALForRecordsToOffload = `-- name: PollPayloadWALForRecordsToOf WITH tenants AS ( SELECT UNNEST( find_matching_tenants_in_payload_wal_partition( - $2::BIGINT + $2::INT ) ) AS tenant_id ) @@ -84,7 +84,7 @@ LIMIT $1::INT type PollPayloadWALForRecordsToOffloadParams struct { Polllimit int32 `json:"polllimit"` - Partitionnumber int64 `json:"partitionnumber"` + Partitionnumber int32 `json:"partitionnumber"` } func (q *Queries) PollPayloadWALForRecordsToOffload(ctx context.Context, db DBTX, arg PollPayloadWALForRecordsToOffloadParams) ([]*V1PayloadWal, error) {