mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-01-28 03:38:52 -06:00
debug: add logging
This commit is contained in:
@@ -231,7 +231,7 @@ func New(fs ...TasksControllerOpt) (*TasksControllerImpl, error) {
|
||||
t.emitSleepOperations = queueutils.NewOperationPool(opts.l, timeout, "emit sleep step runs", t.processSleeps).WithJitter(jitter)
|
||||
t.reassignTaskOperations = queueutils.NewOperationPool(opts.l, timeout, "reassign step runs", t.processTaskReassignments).WithJitter(jitter)
|
||||
t.retryTaskOperations = queueutils.NewOperationPool(opts.l, timeout, "retry step runs", t.processTaskRetryQueueItems).WithJitter(jitter)
|
||||
t.processPayloadWALOperations = queueutils.NewOperationPool[int64](opts.l, timeout, "process payload WAL", t.processPayloadWAL).WithJitter(jitter)
|
||||
t.processPayloadWALOperations = queueutils.NewOperationPool(opts.l, timeout, "process payload WAL", t.processPayloadWAL).WithJitter(jitter)
|
||||
t.evictExpiredIdempotencyKeysOperations = queueutils.NewOperationPool(opts.l, timeout, "evict expired idempotency keys", t.evictExpiredIdempotencyKeys).WithJitter(jitter)
|
||||
|
||||
return t, nil
|
||||
|
||||
@@ -246,6 +246,12 @@ func (p *payloadStoreRepositoryImpl) offloadToExternal(ctx context.Context, payl
|
||||
|
||||
func (p *payloadStoreRepositoryImpl) ProcessPayloadWAL(ctx context.Context, partitionNumber int64) (bool, error) {
|
||||
// no need to process the WAL if external store is not enabled
|
||||
fmt.Printf(
|
||||
"DEBUG %s: processing payload WAL, external store enabled: %t, inline store TTL: %v\n",
|
||||
time.Now().String(),
|
||||
p.externalStoreEnabled,
|
||||
p.inlineStoreTTL,
|
||||
)
|
||||
if !p.externalStoreEnabled {
|
||||
return false, nil
|
||||
}
|
||||
@@ -253,16 +259,24 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadWAL(ctx context.Context, part
|
||||
ctx, span := telemetry.NewSpan(ctx, "process-payload-wal")
|
||||
defer span.End()
|
||||
|
||||
fmt.Printf("DEBUG %s: processing payload WAL for partition %d\n", time.Now().String(), partitionNumber)
|
||||
|
||||
tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l, 5000)
|
||||
|
||||
fmt.Printf("DEBUG %s: prepared tx for processing payload WAL for partition %d\n", time.Now().String(), partitionNumber)
|
||||
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to prepare transaction: %w", err)
|
||||
}
|
||||
|
||||
defer rollback()
|
||||
|
||||
fmt.Printf("DEBUG %s: acquiring advisory lock for processing payload WAL for partition %d\n", time.Now().String(), partitionNumber)
|
||||
|
||||
advisoryLockAcquired, err := p.queries.TryAdvisoryLock(ctx, tx, hash(fmt.Sprintf("process-payload-wal-lease-%d", partitionNumber)))
|
||||
|
||||
fmt.Printf("DEBUG %s: acquired advisory lock (%t) for processing payload WAL for partition %d\n", time.Now().String(), advisoryLockAcquired, partitionNumber)
|
||||
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to acquire advisory lock: %w", err)
|
||||
}
|
||||
@@ -278,8 +292,12 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadWAL(ctx context.Context, part
|
||||
Partitionnumber: int32(partitionNumber),
|
||||
})
|
||||
|
||||
fmt.Printf("DEBUG %s: polled %d WAL records for processing payload WAL for partition %d\n", time.Now().String(), len(walRecords), partitionNumber)
|
||||
|
||||
hasMoreWALRecords := len(walRecords) == pollLimit
|
||||
|
||||
fmt.Printf("DEBUG %s: has more WAL records: %t for partition %d\n", time.Now().String(), hasMoreWALRecords, partitionNumber)
|
||||
|
||||
if len(walRecords) == 0 {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
@@ -106,7 +106,6 @@ WHERE
|
||||
ORDER BY offload_at, payload_id, payload_inserted_at, payload_type, tenant_id
|
||||
FOR UPDATE
|
||||
LIMIT @pollLimit::INT
|
||||
|
||||
;
|
||||
|
||||
-- name: FinalizePayloadOffloads :exec
|
||||
|
||||
Reference in New Issue
Block a user