From a7aaa05b9cd8b027638eae8fbfefae5e19fadd2b Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Mon, 15 Sep 2025 16:41:09 -0400 Subject: [PATCH] debug: add logging --- .../services/controllers/v1/task/controller.go | 2 +- pkg/repository/v1/payloadstore.go | 18 ++++++++++++++++++ pkg/repository/v1/sqlcv1/payload-store.sql | 1 - 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/internal/services/controllers/v1/task/controller.go b/internal/services/controllers/v1/task/controller.go index f4c653188..58c19f3a7 100644 --- a/internal/services/controllers/v1/task/controller.go +++ b/internal/services/controllers/v1/task/controller.go @@ -231,7 +231,7 @@ func New(fs ...TasksControllerOpt) (*TasksControllerImpl, error) { t.emitSleepOperations = queueutils.NewOperationPool(opts.l, timeout, "emit sleep step runs", t.processSleeps).WithJitter(jitter) t.reassignTaskOperations = queueutils.NewOperationPool(opts.l, timeout, "reassign step runs", t.processTaskReassignments).WithJitter(jitter) t.retryTaskOperations = queueutils.NewOperationPool(opts.l, timeout, "retry step runs", t.processTaskRetryQueueItems).WithJitter(jitter) - t.processPayloadWALOperations = queueutils.NewOperationPool[int64](opts.l, timeout, "process payload WAL", t.processPayloadWAL).WithJitter(jitter) + t.processPayloadWALOperations = queueutils.NewOperationPool(opts.l, timeout, "process payload WAL", t.processPayloadWAL).WithJitter(jitter) t.evictExpiredIdempotencyKeysOperations = queueutils.NewOperationPool(opts.l, timeout, "evict expired idempotency keys", t.evictExpiredIdempotencyKeys).WithJitter(jitter) return t, nil diff --git a/pkg/repository/v1/payloadstore.go b/pkg/repository/v1/payloadstore.go index bd9a3909e..ea376ffcf 100644 --- a/pkg/repository/v1/payloadstore.go +++ b/pkg/repository/v1/payloadstore.go @@ -246,6 +246,12 @@ func (p *payloadStoreRepositoryImpl) offloadToExternal(ctx context.Context, payl func (p *payloadStoreRepositoryImpl) ProcessPayloadWAL(ctx context.Context, partitionNumber int64) (bool, error) { // no need to process the WAL if external store is not enabled + fmt.Printf( + "DEBUG %s: processing payload WAL, external store enabled: %t, inline store TTL: %v\n", + time.Now().String(), + p.externalStoreEnabled, + p.inlineStoreTTL, + ) if !p.externalStoreEnabled { return false, nil } @@ -253,16 +259,24 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadWAL(ctx context.Context, part ctx, span := telemetry.NewSpan(ctx, "process-payload-wal") defer span.End() + fmt.Printf("DEBUG %s: processing payload WAL for partition %d\n", time.Now().String(), partitionNumber) + tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l, 5000) + fmt.Printf("DEBUG %s: prepared tx for processing payload WAL for partition %d\n", time.Now().String(), partitionNumber) + if err != nil { return false, fmt.Errorf("failed to prepare transaction: %w", err) } defer rollback() + fmt.Printf("DEBUG %s: acquiring advisory lock for processing payload WAL for partition %d\n", time.Now().String(), partitionNumber) + advisoryLockAcquired, err := p.queries.TryAdvisoryLock(ctx, tx, hash(fmt.Sprintf("process-payload-wal-lease-%d", partitionNumber))) + fmt.Printf("DEBUG %s: acquired advisory lock (%t) for processing payload WAL for partition %d\n", time.Now().String(), advisoryLockAcquired, partitionNumber) + if err != nil { return false, fmt.Errorf("failed to acquire advisory lock: %w", err) } @@ -278,8 +292,12 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadWAL(ctx context.Context, part Partitionnumber: int32(partitionNumber), }) + fmt.Printf("DEBUG %s: polled %d WAL records for processing payload WAL for partition %d\n", time.Now().String(), len(walRecords), partitionNumber) + hasMoreWALRecords := len(walRecords) == pollLimit + fmt.Printf("DEBUG %s: has more WAL records: %t for partition %d\n", time.Now().String(), hasMoreWALRecords, partitionNumber) + if len(walRecords) == 0 { return false, nil } diff --git a/pkg/repository/v1/sqlcv1/payload-store.sql b/pkg/repository/v1/sqlcv1/payload-store.sql index a8797c265..424bc0511 100644 --- a/pkg/repository/v1/sqlcv1/payload-store.sql +++ b/pkg/repository/v1/sqlcv1/payload-store.sql @@ -106,7 +106,6 @@ WHERE ORDER BY offload_at, payload_id, payload_inserted_at, payload_type, tenant_id FOR UPDATE LIMIT @pollLimit::INT - ; -- name: FinalizePayloadOffloads :exec