mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-01-08 01:39:46 -06:00
debug: add some logging
This commit is contained in:
@@ -233,6 +233,7 @@ func (p *payloadStoreRepositoryImpl) offloadToExternal(ctx context.Context, payl
|
||||
return nil, fmt.Errorf("external store not enabled")
|
||||
}
|
||||
|
||||
fmt.Println("offloading", len(payloads), "payloads to external store...")
|
||||
return p.externalStore.Store(ctx, payloads...)
|
||||
}
|
||||
|
||||
@@ -242,6 +243,8 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadWAL(ctx context.Context, part
|
||||
return false, nil
|
||||
}
|
||||
|
||||
fmt.Println("processing payload WAL...")
|
||||
|
||||
ctx, span := telemetry.NewSpan(ctx, "process-payload-wal")
|
||||
defer span.End()
|
||||
|
||||
@@ -262,6 +265,8 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadWAL(ctx context.Context, part
|
||||
Partitionnumber: partitionNumber,
|
||||
})
|
||||
|
||||
fmt.Println("found", len(walRecords), "WAL records to process...")
|
||||
|
||||
hasMoreWALRecords := len(walRecords) == pollLimit
|
||||
|
||||
if len(walRecords) == 0 {
|
||||
@@ -313,6 +318,8 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadWAL(ctx context.Context, part
|
||||
|
||||
retrieveOptsToStoredKey, err := p.offloadToExternal(ctx, externalStoreOpts...)
|
||||
|
||||
fmt.Println("offloaded", len(retrieveOptsToStoredKey), "payloads to external store...")
|
||||
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
@@ -353,6 +360,8 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadWAL(ctx context.Context, part
|
||||
return false, fmt.Errorf("failed to prepare transaction for offloading: %w", err)
|
||||
}
|
||||
|
||||
fmt.Println("finalizing", len(ids), "offloaded payloads...")
|
||||
|
||||
err = p.queries.FinalizePayloadOffloads(ctx, tx, sqlcv1.FinalizePayloadOffloadsParams{
|
||||
Ids: ids,
|
||||
Insertedats: insertedAts,
|
||||
|
||||
Reference in New Issue
Block a user