mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-03-13 13:38:53 -05:00
Fix: WAL partition poll function type (#2301)
* fix: type * fix: cast to int32 * debug: add logging * debug: more logs * Revert "debug: more logs" This reverts commit2ff8033f89. * Revert "debug: add logging" This reverts commita7aaa05b9c. * fix: rm unnecessary generic * feat: span attrs + names * fix: span naming, more details
This commit is contained in:
@@ -231,7 +231,7 @@ func New(fs ...TasksControllerOpt) (*TasksControllerImpl, error) {
|
||||
t.emitSleepOperations = queueutils.NewOperationPool(opts.l, timeout, "emit sleep step runs", t.processSleeps).WithJitter(jitter)
|
||||
t.reassignTaskOperations = queueutils.NewOperationPool(opts.l, timeout, "reassign step runs", t.processTaskReassignments).WithJitter(jitter)
|
||||
t.retryTaskOperations = queueutils.NewOperationPool(opts.l, timeout, "retry step runs", t.processTaskRetryQueueItems).WithJitter(jitter)
|
||||
t.processPayloadWALOperations = queueutils.NewOperationPool[int64](opts.l, timeout, "process payload WAL", t.processPayloadWAL).WithJitter(jitter)
|
||||
t.processPayloadWALOperations = queueutils.NewOperationPool(opts.l, timeout, "process payload WAL", t.processPayloadWAL).WithJitter(jitter)
|
||||
t.evictExpiredIdempotencyKeysOperations = queueutils.NewOperationPool(opts.l, timeout, "evict expired idempotency keys", t.evictExpiredIdempotencyKeys).WithJitter(jitter)
|
||||
|
||||
return t, nil
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
"github.com/rs/zerolog"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
)
|
||||
|
||||
type StorePayloadOpts struct {
|
||||
@@ -236,6 +237,11 @@ func (p *payloadStoreRepositoryImpl) bulkRetrieve(ctx context.Context, tx sqlcv1
|
||||
}
|
||||
|
||||
func (p *payloadStoreRepositoryImpl) offloadToExternal(ctx context.Context, payloads ...OffloadToExternalStoreOpts) (map[RetrievePayloadOpts]ExternalPayloadLocationKey, error) {
|
||||
ctx, span := telemetry.NewSpan(ctx, "payloadstore.offload_to_external_store")
|
||||
defer span.End()
|
||||
|
||||
span.SetAttributes(attribute.Int("payloadstore.offload_to_external_store.num_payloads_to_offload", len(payloads)))
|
||||
|
||||
// this is only intended to be called from ProcessPayloadWAL, which short-circuits if external store is not enabled
|
||||
if !p.externalStoreEnabled {
|
||||
return nil, fmt.Errorf("external store not enabled")
|
||||
@@ -250,7 +256,7 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadWAL(ctx context.Context, part
|
||||
return false, nil
|
||||
}
|
||||
|
||||
ctx, span := telemetry.NewSpan(ctx, "process-payload-wal")
|
||||
ctx, span := telemetry.NewSpan(ctx, "payloadstore.process_payload_wal")
|
||||
defer span.End()
|
||||
|
||||
tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l, 5000)
|
||||
@@ -275,7 +281,7 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadWAL(ctx context.Context, part
|
||||
|
||||
walRecords, err := p.queries.PollPayloadWALForRecordsToOffload(ctx, tx, sqlcv1.PollPayloadWALForRecordsToOffloadParams{
|
||||
Polllimit: int32(pollLimit),
|
||||
Partitionnumber: partitionNumber,
|
||||
Partitionnumber: int32(partitionNumber),
|
||||
})
|
||||
|
||||
hasMoreWALRecords := len(walRecords) == pollLimit
|
||||
@@ -310,6 +316,25 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadWAL(ctx context.Context, part
|
||||
}
|
||||
|
||||
externalStoreOpts := make([]OffloadToExternalStoreOpts, 0)
|
||||
minOffloadAt := time.Now().Add(100 * time.Hour)
|
||||
offloadLag := time.Since(minOffloadAt).Seconds()
|
||||
|
||||
attrs := []attribute.KeyValue{
|
||||
{
|
||||
Key: "payloadstore.process_payload_wal.payload_wal_offload_partition_number",
|
||||
Value: attribute.Int64Value(partitionNumber),
|
||||
},
|
||||
{
|
||||
Key: "payloadstore.process_payload_wal.payload_wal_offload_count",
|
||||
Value: attribute.IntValue(len(retrieveOpts)),
|
||||
},
|
||||
{
|
||||
Key: "payloadstore.process_payload_wal.payload_wal_offload_lag_seconds",
|
||||
Value: attribute.Float64Value(offloadLag),
|
||||
},
|
||||
}
|
||||
|
||||
span.SetAttributes(attrs...)
|
||||
|
||||
for opts, payload := range payloads {
|
||||
offloadAt, ok := retrieveOptsToOffloadAt[opts]
|
||||
@@ -328,6 +353,10 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadWAL(ctx context.Context, part
|
||||
},
|
||||
OffloadAt: offloadAt.Time,
|
||||
})
|
||||
|
||||
if offloadAt.Time.Before(minOffloadAt) {
|
||||
minOffloadAt = offloadAt.Time
|
||||
}
|
||||
}
|
||||
|
||||
if err := commit(ctx); err != nil {
|
||||
|
||||
@@ -93,7 +93,7 @@ FROM
|
||||
WITH tenants AS (
|
||||
SELECT UNNEST(
|
||||
find_matching_tenants_in_payload_wal_partition(
|
||||
@partitionNumber::BIGINT
|
||||
@partitionNumber::INT
|
||||
)
|
||||
) AS tenant_id
|
||||
)
|
||||
|
||||
@@ -67,7 +67,7 @@ const pollPayloadWALForRecordsToOffload = `-- name: PollPayloadWALForRecordsToOf
|
||||
WITH tenants AS (
|
||||
SELECT UNNEST(
|
||||
find_matching_tenants_in_payload_wal_partition(
|
||||
$2::BIGINT
|
||||
$2::INT
|
||||
)
|
||||
) AS tenant_id
|
||||
)
|
||||
@@ -84,7 +84,7 @@ LIMIT $1::INT
|
||||
|
||||
type PollPayloadWALForRecordsToOffloadParams struct {
|
||||
Polllimit int32 `json:"polllimit"`
|
||||
Partitionnumber int64 `json:"partitionnumber"`
|
||||
Partitionnumber int32 `json:"partitionnumber"`
|
||||
}
|
||||
|
||||
func (q *Queries) PollPayloadWALForRecordsToOffload(ctx context.Context, db DBTX, arg PollPayloadWALForRecordsToOffloadParams) ([]*V1PayloadWal, error) {
|
||||
|
||||
Reference in New Issue
Block a user