feat: add grpc otel spans, better tx debugging (#2474)

* feat: add grpc otel spans, better tx debugging

* fix: ctx
This commit is contained in:
abelanger5
2025-10-31 18:55:42 +01:00
committed by GitHub
parent 3a4f225788
commit 7603b5ef39
5 changed files with 27 additions and 3 deletions
+1
View File
@@ -153,6 +153,7 @@ require (
github.com/x448/float16 v0.8.4 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.63.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.62.0 // indirect
go.opentelemetry.io/otel/metric v1.38.0 // indirect
go.opentelemetry.io/proto/otlp v1.7.1 // indirect
+2
View File
@@ -403,6 +403,8 @@ github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.63.0 h1:YH4g8lQroajqUwWbq/tr2QX1JFmEXaDLgG+ew9bLMWo=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.63.0/go.mod h1:fvPi2qXDqFs8M4B4fmJhE92TyQs9Ydjlg3RvfUp+NbQ=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.62.0 h1:Hf9xI/XLML9ElpiHVDNwvqI0hIFlzV8dgIr35kV1kRU=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.62.0/go.mod h1:NfchwuyNoMcZ5MLHwPrODwUF1HWCXWrL31s8gSAdIKY=
go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8=
+10 -3
View File
@@ -23,6 +23,7 @@ import (
"github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers"
v1 "github.com/hatchet-dev/hatchet/pkg/repository/v1"
"github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1"
"github.com/hatchet-dev/hatchet/pkg/telemetry"
msgqueue "github.com/hatchet-dev/hatchet/internal/msgqueue/v1"
tasktypes "github.com/hatchet-dev/hatchet/internal/services/shared/tasktypes/v1"
@@ -328,7 +329,10 @@ func (s *DispatcherImpl) subscribeToWorkflowRunsV1(server contracts.Dispatcher_S
ringIndex := 0
ringMu := sync.Mutex{}
sendEvent := func(e *contracts.WorkflowRunEvent) error {
sendEvent := func(ctx context.Context, e *contracts.WorkflowRunEvent) error {
_, sendEventSpan := telemetry.NewSpan(ctx, "subscribe_to_workflow_runs_v1.send_event")
defer sendEventSpan.End()
results := s.cleanResults(e.Results)
if results == nil {
@@ -361,6 +365,9 @@ func (s *DispatcherImpl) subscribeToWorkflowRunsV1(server contracts.Dispatcher_S
return nil
}
iterCtx, iterSpan := telemetry.NewSpan(ctx, "subscribe_to_workflow_runs_v1.iter")
defer iterSpan.End()
bufferSize := 1000
if len(workflowRunIds) > bufferSize {
@@ -386,7 +393,7 @@ func (s *DispatcherImpl) subscribeToWorkflowRunsV1(server contracts.Dispatcher_S
start := time.Now()
finalizedWorkflowRuns, err := s.repov1.Tasks().ListFinalizedWorkflowRuns(ctx, tenantId, workflowRunIds)
finalizedWorkflowRuns, err := s.repov1.Tasks().ListFinalizedWorkflowRuns(iterCtx, tenantId, workflowRunIds)
if err != nil {
s.l.Error().Err(err).Msg("could not list finalized workflow runs")
@@ -405,7 +412,7 @@ func (s *DispatcherImpl) subscribeToWorkflowRunsV1(server contracts.Dispatcher_S
}
for _, event := range events {
err := sendEvent(event)
err := sendEvent(iterCtx, event)
if err != nil {
return err
+6
View File
@@ -35,6 +35,8 @@ import (
"github.com/hatchet-dev/hatchet/pkg/config/server"
"github.com/hatchet-dev/hatchet/pkg/errors"
"github.com/hatchet-dev/hatchet/pkg/logger"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
)
type Server struct {
@@ -311,6 +313,10 @@ func (s *Server) startGRPC() (func() error, error) {
s.config.Runtime.GRPCStaticStreamWindowSize,
))
serverOpts = append(serverOpts, grpc.StatsHandler(
otelgrpc.NewServerHandler(),
))
grpcServer := grpc.NewServer(serverOpts...)
if s.ingestor != nil {
@@ -18,6 +18,14 @@ func PrepareTx(ctx context.Context, pool *pgxpool.Pool, l *zerolog.Logger, timeo
tx, err := pool.Begin(ctx)
if err != nil {
if sinceStart := time.Since(start); sinceStart > 100*time.Millisecond {
l.Error().Dur(
"duration", sinceStart,
).Int(
"acquired_connections", int(pool.Stat().AcquiredConns()),
).Caller(1).Msgf("long transaction start with error: %v", err)
}
return nil, nil, nil, err
}