diff --git a/pkg/repository/v1/payloadstore.go b/pkg/repository/v1/payloadstore.go index 71b3abc88..3dd1c75f5 100644 --- a/pkg/repository/v1/payloadstore.go +++ b/pkg/repository/v1/payloadstore.go @@ -233,6 +233,7 @@ func (p *payloadStoreRepositoryImpl) offloadToExternal(ctx context.Context, payl return nil, fmt.Errorf("external store not enabled") } + fmt.Println("offloading", len(payloads), "payloads to external store...") return p.externalStore.Store(ctx, payloads...) } @@ -242,6 +243,8 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadWAL(ctx context.Context, part return false, nil } + fmt.Println("processing payload WAL...") + ctx, span := telemetry.NewSpan(ctx, "process-payload-wal") defer span.End() @@ -262,6 +265,8 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadWAL(ctx context.Context, part Partitionnumber: partitionNumber, }) + fmt.Println("found", len(walRecords), "WAL records to process...") + hasMoreWALRecords := len(walRecords) == pollLimit if len(walRecords) == 0 { @@ -313,6 +318,8 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadWAL(ctx context.Context, part retrieveOptsToStoredKey, err := p.offloadToExternal(ctx, externalStoreOpts...) + fmt.Println("offloaded", len(retrieveOptsToStoredKey), "payloads to external store...") + if err != nil { return false, err } @@ -353,6 +360,8 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadWAL(ctx context.Context, part return false, fmt.Errorf("failed to prepare transaction for offloading: %w", err) } + fmt.Println("finalizing", len(ids), "offloaded payloads...") + err = p.queries.FinalizePayloadOffloads(ctx, tx, sqlcv1.FinalizePayloadOffloadsParams{ Ids: ids, Insertedats: insertedAts,