From c44c70bd0ca8bfcd5afddb28f98a035fbc35f482 Mon Sep 17 00:00:00 2001 From: matt Date: Mon, 4 Aug 2025 11:19:07 -0400 Subject: [PATCH] Debug: Add debug logs around put log method (#2079) * feat: add logger to ingestor * debug: add a bunch of debug logs * fix: add prefix for grep * fix: copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * fix: panic --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- internal/services/ingestor/ingestor.go | 7 +++++++ internal/services/ingestor/server.go | 2 ++ internal/services/ingestor/server_v1.go | 2 ++ pkg/repository/v1/task.go | 4 ++++ 4 files changed, 15 insertions(+) diff --git a/internal/services/ingestor/ingestor.go b/internal/services/ingestor/ingestor.go index ca9240603..4882bdf5a 100644 --- a/internal/services/ingestor/ingestor.go +++ b/internal/services/ingestor/ingestor.go @@ -5,6 +5,7 @@ import ( "fmt" lru "github.com/hashicorp/golang-lru/v2" + "github.com/rs/zerolog" "github.com/hatchet-dev/hatchet/internal/datautils" "github.com/hatchet-dev/hatchet/internal/msgqueue" @@ -12,6 +13,7 @@ import ( "github.com/hatchet-dev/hatchet/internal/services/ingestor/contracts" "github.com/hatchet-dev/hatchet/internal/services/shared/tasktypes" "github.com/hatchet-dev/hatchet/internal/telemetry" + "github.com/hatchet-dev/hatchet/pkg/logger" "github.com/hatchet-dev/hatchet/pkg/repository" "github.com/hatchet-dev/hatchet/pkg/repository/metered" "github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc" @@ -117,6 +119,7 @@ type IngestorImpl struct { mq msgqueue.MessageQueue mqv1 msgqueuev1.MessageQueue v validator.Validator + l *zerolog.Logger repov1 v1.Repository isLogIngestionEnabled bool @@ -156,6 +159,7 @@ func NewIngestor(fs ...IngestorOptFunc) (Ingestor, error) { if opts.stepRunRepository == nil { return nil, fmt.Errorf("step run repository is required. use WithStepRunRepository") } + // estimate of 1000 * 2 * UUID string size (roughly 104kb max) stepRunCache, err := lru.New[string, string](1000) @@ -163,6 +167,8 @@ func NewIngestor(fs ...IngestorOptFunc) (Ingestor, error) { return nil, fmt.Errorf("could not create step run cache: %w", err) } + logger := logger.NewDefaultLogger("ingestor-service") + return &IngestorImpl{ eventRepository: opts.eventRepository, streamEventRepository: opts.streamEventRepository, @@ -175,6 +181,7 @@ func NewIngestor(fs ...IngestorOptFunc) (Ingestor, error) { v: validator.NewDefaultValidator(), repov1: opts.repov1, isLogIngestionEnabled: opts.isLogIngestionEnabled, + l: &logger, }, nil } diff --git a/internal/services/ingestor/server.go b/internal/services/ingestor/server.go index 3746659fd..c4640679a 100644 --- a/internal/services/ingestor/server.go +++ b/internal/services/ingestor/server.go @@ -231,7 +231,9 @@ func (i *IngestorImpl) putStreamEventV0(ctx context.Context, tenant *dbsqlc.Tena } func (i *IngestorImpl) PutLog(ctx context.Context, req *contracts.PutLogRequest) (*contracts.PutLogResponse, error) { + i.l.Debug().Str("method", "PutLog").Str("stepRunId", req.StepRunId).Msg("loki-debug: handling PutLog request") tenant := ctx.Value("tenant").(*dbsqlc.Tenant) + i.l.Debug().Str("tenantId", sqlchelpers.UUIDToStr(tenant.ID)).Msg("loki-debug: tenant context for PutLog") switch tenant.Version { case dbsqlc.TenantMajorEngineVersionV0: diff --git a/internal/services/ingestor/server_v1.go b/internal/services/ingestor/server_v1.go index 2cb5f84d1..a8685a972 100644 --- a/internal/services/ingestor/server_v1.go +++ b/internal/services/ingestor/server_v1.go @@ -62,6 +62,7 @@ func (i *IngestorImpl) getSingleTask(ctx context.Context, tenantId, taskExternal } func (i *IngestorImpl) putLogV1(ctx context.Context, tenant *dbsqlc.Tenant, req *contracts.PutLogRequest) (*contracts.PutLogResponse, error) { + i.l.Debug().Str("method", "putLogV1").Str("stepRunId", req.StepRunId).Bool("isLogIngestionEnabled", i.isLogIngestionEnabled).Msg("loki-debug: handling putLogV1 call") tenantId := sqlchelpers.UUIDToStr(tenant.ID) if !i.isLogIngestionEnabled { @@ -74,6 +75,7 @@ func (i *IngestorImpl) putLogV1(ctx context.Context, tenant *dbsqlc.Tenant, req return nil, err } + i.l.Debug().Str("taskExternalId", sqlchelpers.UUIDToStr(task.ExternalID)).Msg("loki-debug: retrieved task for log ingestion") var createdAt *time.Time if t := req.CreatedAt.AsTime(); !t.IsZero() { diff --git a/pkg/repository/v1/task.go b/pkg/repository/v1/task.go index 0ba815471..4fd5faafd 100644 --- a/pkg/repository/v1/task.go +++ b/pkg/repository/v1/task.go @@ -367,6 +367,8 @@ func (r *TaskRepositoryImpl) UpdateTablePartitions(ctx context.Context) error { } func (r *TaskRepositoryImpl) GetTaskByExternalId(ctx context.Context, tenantId, taskExternalId string, skipCache bool) (*sqlcv1.FlattenExternalIdsRow, error) { + r.l.Debug().Str("method", "GetTaskByExternalId").Str("taskExternalId", taskExternalId).Bool("skipCache", skipCache).Msg("loki-debug: retrieving task by external id") + if !skipCache { // check the cache first key := taskExternalIdTenantIdTuple{ @@ -385,6 +387,8 @@ func (r *TaskRepositoryImpl) GetTaskByExternalId(ctx context.Context, tenantId, Externalids: []pgtype.UUID{sqlchelpers.UUIDFromStr(taskExternalId)}, }) + r.l.Debug().Bool("FlattenExternalIdsSucceeded", err == nil).Int("lenTasks", len(dbTasks)).Msg("loki-debug: executed FlattenExternalIds query") + if err != nil { return nil, err }