mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-01-08 01:39:46 -06:00
Debug: Add debug logs around put log method (#2079)
* feat: add logger to ingestor * debug: add a bunch of debug logs * fix: add prefix for grep * fix: copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * fix: panic --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"github.com/rs/zerolog"
|
||||
|
||||
"github.com/hatchet-dev/hatchet/internal/datautils"
|
||||
"github.com/hatchet-dev/hatchet/internal/msgqueue"
|
||||
@@ -12,6 +13,7 @@ import (
|
||||
"github.com/hatchet-dev/hatchet/internal/services/ingestor/contracts"
|
||||
"github.com/hatchet-dev/hatchet/internal/services/shared/tasktypes"
|
||||
"github.com/hatchet-dev/hatchet/internal/telemetry"
|
||||
"github.com/hatchet-dev/hatchet/pkg/logger"
|
||||
"github.com/hatchet-dev/hatchet/pkg/repository"
|
||||
"github.com/hatchet-dev/hatchet/pkg/repository/metered"
|
||||
"github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc"
|
||||
@@ -117,6 +119,7 @@ type IngestorImpl struct {
|
||||
mq msgqueue.MessageQueue
|
||||
mqv1 msgqueuev1.MessageQueue
|
||||
v validator.Validator
|
||||
l *zerolog.Logger
|
||||
repov1 v1.Repository
|
||||
|
||||
isLogIngestionEnabled bool
|
||||
@@ -156,6 +159,7 @@ func NewIngestor(fs ...IngestorOptFunc) (Ingestor, error) {
|
||||
if opts.stepRunRepository == nil {
|
||||
return nil, fmt.Errorf("step run repository is required. use WithStepRunRepository")
|
||||
}
|
||||
|
||||
// estimate of 1000 * 2 * UUID string size (roughly 104kb max)
|
||||
stepRunCache, err := lru.New[string, string](1000)
|
||||
|
||||
@@ -163,6 +167,8 @@ func NewIngestor(fs ...IngestorOptFunc) (Ingestor, error) {
|
||||
return nil, fmt.Errorf("could not create step run cache: %w", err)
|
||||
}
|
||||
|
||||
logger := logger.NewDefaultLogger("ingestor-service")
|
||||
|
||||
return &IngestorImpl{
|
||||
eventRepository: opts.eventRepository,
|
||||
streamEventRepository: opts.streamEventRepository,
|
||||
@@ -175,6 +181,7 @@ func NewIngestor(fs ...IngestorOptFunc) (Ingestor, error) {
|
||||
v: validator.NewDefaultValidator(),
|
||||
repov1: opts.repov1,
|
||||
isLogIngestionEnabled: opts.isLogIngestionEnabled,
|
||||
l: &logger,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -231,7 +231,9 @@ func (i *IngestorImpl) putStreamEventV0(ctx context.Context, tenant *dbsqlc.Tena
|
||||
}
|
||||
|
||||
func (i *IngestorImpl) PutLog(ctx context.Context, req *contracts.PutLogRequest) (*contracts.PutLogResponse, error) {
|
||||
i.l.Debug().Str("method", "PutLog").Str("stepRunId", req.StepRunId).Msg("loki-debug: handling PutLog request")
|
||||
tenant := ctx.Value("tenant").(*dbsqlc.Tenant)
|
||||
i.l.Debug().Str("tenantId", sqlchelpers.UUIDToStr(tenant.ID)).Msg("loki-debug: tenant context for PutLog")
|
||||
|
||||
switch tenant.Version {
|
||||
case dbsqlc.TenantMajorEngineVersionV0:
|
||||
|
||||
@@ -62,6 +62,7 @@ func (i *IngestorImpl) getSingleTask(ctx context.Context, tenantId, taskExternal
|
||||
}
|
||||
|
||||
func (i *IngestorImpl) putLogV1(ctx context.Context, tenant *dbsqlc.Tenant, req *contracts.PutLogRequest) (*contracts.PutLogResponse, error) {
|
||||
i.l.Debug().Str("method", "putLogV1").Str("stepRunId", req.StepRunId).Bool("isLogIngestionEnabled", i.isLogIngestionEnabled).Msg("loki-debug: handling putLogV1 call")
|
||||
tenantId := sqlchelpers.UUIDToStr(tenant.ID)
|
||||
|
||||
if !i.isLogIngestionEnabled {
|
||||
@@ -74,6 +75,7 @@ func (i *IngestorImpl) putLogV1(ctx context.Context, tenant *dbsqlc.Tenant, req
|
||||
return nil, err
|
||||
}
|
||||
|
||||
i.l.Debug().Str("taskExternalId", sqlchelpers.UUIDToStr(task.ExternalID)).Msg("loki-debug: retrieved task for log ingestion")
|
||||
var createdAt *time.Time
|
||||
|
||||
if t := req.CreatedAt.AsTime(); !t.IsZero() {
|
||||
|
||||
@@ -367,6 +367,8 @@ func (r *TaskRepositoryImpl) UpdateTablePartitions(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (r *TaskRepositoryImpl) GetTaskByExternalId(ctx context.Context, tenantId, taskExternalId string, skipCache bool) (*sqlcv1.FlattenExternalIdsRow, error) {
|
||||
r.l.Debug().Str("method", "GetTaskByExternalId").Str("taskExternalId", taskExternalId).Bool("skipCache", skipCache).Msg("loki-debug: retrieving task by external id")
|
||||
|
||||
if !skipCache {
|
||||
// check the cache first
|
||||
key := taskExternalIdTenantIdTuple{
|
||||
@@ -385,6 +387,8 @@ func (r *TaskRepositoryImpl) GetTaskByExternalId(ctx context.Context, tenantId,
|
||||
Externalids: []pgtype.UUID{sqlchelpers.UUIDFromStr(taskExternalId)},
|
||||
})
|
||||
|
||||
r.l.Debug().Bool("FlattenExternalIdsSucceeded", err == nil).Int("lenTasks", len(dbTasks)).Msg("loki-debug: executed FlattenExternalIds query")
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user