debug: rm pub buffer param

This commit is contained in:
mrkaye97
2025-10-10 20:16:44 -04:00
parent ba05cd9ced
commit b42a5cacbb
2 changed files with 7 additions and 5 deletions

View File

@@ -19,7 +19,7 @@ func (tc *TasksControllerImpl) runProcessPayloadWAL(ctx context.Context) func()
}
func (tc *TasksControllerImpl) processPayloadWAL(ctx context.Context, partitionNumber int64) (bool, error) {
return tc.repov1.Payloads().ProcessPayloadWAL(ctx, partitionNumber, tc.pubBuffer)
return tc.repov1.Payloads().ProcessPayloadWAL(ctx, partitionNumber)
}
func (tc *TasksControllerImpl) runProcessPayloadExternalCutovers(ctx context.Context) func() {

View File

@@ -2,11 +2,11 @@ package v1
import (
"context"
"encoding/json"
"fmt"
"sort"
"time"
msgqueue "github.com/hatchet-dev/hatchet/internal/msgqueue/v1"
"github.com/hatchet-dev/hatchet/internal/telemetry"
"github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers"
"github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1"
@@ -59,7 +59,7 @@ type ExternalStore interface {
type PayloadStoreRepository interface {
Store(ctx context.Context, tx sqlcv1.DBTX, payloads ...StorePayloadOpts) error
BulkRetrieve(ctx context.Context, opts ...RetrievePayloadOpts) (map[RetrievePayloadOpts][]byte, error)
ProcessPayloadWAL(ctx context.Context, partitionNumber int64, pubBuffer *msgqueue.MQPubBuffer) (bool, error)
ProcessPayloadWAL(ctx context.Context, partitionNumber int64) (bool, error)
ProcessPayloadExternalCutovers(ctx context.Context, partitionNumber int64) (bool, error)
OverwriteExternalStore(store ExternalStore, inlineStoreTTL time.Duration)
DualWritesEnabled() bool
@@ -294,7 +294,7 @@ func (p *payloadStoreRepositoryImpl) offloadToExternal(ctx context.Context, payl
return p.externalStore.Store(ctx, payloads...)
}
func (p *payloadStoreRepositoryImpl) ProcessPayloadWAL(ctx context.Context, partitionNumber int64, pubBuffer *msgqueue.MQPubBuffer) (bool, error) {
func (p *payloadStoreRepositoryImpl) ProcessPayloadWAL(ctx context.Context, partitionNumber int64) (bool, error) {
// no need to process the WAL if external store is not enabled
if !p.externalStoreEnabled {
return false, nil
@@ -483,7 +483,9 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadWAL(ctx context.Context, part
if err != nil {
return false, fmt.Errorf("failed to create OLAP payload offload message: %w", err)
}
pubBuffer.Pub(ctx, msgqueue.OLAP_QUEUE, msg, false)
mj, _ := json.Marshal(msg)
fmt.Println(string(mj))
// pubBuffer.Pub(ctx, msgqueue.OLAP_QUEUE, msg, false)
}
if err := commit(ctx); err != nil {