Deploy HyperDX locally via docker-compose and add traces to task controller (#2058)

* deploy jaegar locally and add traces to task controller

* use jaegar v2

* add SERVER_OTEL_COLLECTOR_AUTH

* fix PR comments

* fix span name
This commit is contained in:
Mohammed Nafees
2025-07-29 16:24:38 +02:00
committed by GitHub
parent 549222c9d2
commit 793df41ccb
10 changed files with 119 additions and 39 deletions

View File

@@ -15,6 +15,7 @@ func init() {
collectorURL := os.Getenv("SERVER_OTEL_COLLECTOR_URL")
insecure := os.Getenv("SERVER_OTEL_INSECURE")
traceIDRatio := os.Getenv("SERVER_OTEL_TRACE_ID_RATIO")
collectorAuth := os.Getenv("SERVER_OTEL_COLLECTOR_AUTH")
var insecureBool bool
@@ -25,10 +26,11 @@ func init() {
// we do this to we get the tracer set globally, which is needed by some of the otel
// integrations for the database before start
_, err := telemetry.InitTracer(&telemetry.TracerOpts{
ServiceName: svcName,
CollectorURL: collectorURL,
TraceIdRatio: traceIDRatio,
Insecure: insecureBool,
ServiceName: svcName,
CollectorURL: collectorURL,
TraceIdRatio: traceIDRatio,
Insecure: insecureBool,
CollectorAuth: collectorAuth,
})
if err != nil {

View File

@@ -48,6 +48,7 @@ func init() {
collectorURL := os.Getenv("SERVER_OTEL_COLLECTOR_URL")
insecure := os.Getenv("SERVER_OTEL_INSECURE")
traceIdRatio := os.Getenv("SERVER_OTEL_TRACE_ID_RATIO")
collectorAuth := os.Getenv("SERVER_OTEL_COLLECTOR_AUTH")
var insecureBool bool
@@ -58,10 +59,11 @@ func init() {
// we do this to we get the tracer set globally, which is needed by some of the otel
// integrations for the database before start
_, err := telemetry.InitTracer(&telemetry.TracerOpts{
ServiceName: svcName,
CollectorURL: collectorURL,
TraceIdRatio: traceIdRatio,
Insecure: insecureBool,
ServiceName: svcName,
CollectorURL: collectorURL,
TraceIdRatio: traceIdRatio,
Insecure: insecureBool,
CollectorAuth: collectorAuth,
})
if err != nil {
@@ -132,10 +134,11 @@ func runV0Config(ctx context.Context, sc *server.ServerConfig) ([]Teardown, erro
var l = sc.Logger
shutdown, err := telemetry.InitTracer(&telemetry.TracerOpts{
ServiceName: sc.OpenTelemetry.ServiceName,
CollectorURL: sc.OpenTelemetry.CollectorURL,
TraceIdRatio: sc.OpenTelemetry.TraceIdRatio,
Insecure: sc.OpenTelemetry.Insecure,
ServiceName: sc.OpenTelemetry.ServiceName,
CollectorURL: sc.OpenTelemetry.CollectorURL,
TraceIdRatio: sc.OpenTelemetry.TraceIdRatio,
Insecure: sc.OpenTelemetry.Insecure,
CollectorAuth: sc.OpenTelemetry.CollectorAuth,
})
if err != nil {
return nil, fmt.Errorf("could not initialize tracer: %w", err)
@@ -614,10 +617,11 @@ func runV1Config(ctx context.Context, sc *server.ServerConfig) ([]Teardown, erro
var l = sc.Logger
shutdown, err := telemetry.InitTracer(&telemetry.TracerOpts{
ServiceName: sc.OpenTelemetry.ServiceName,
CollectorURL: sc.OpenTelemetry.CollectorURL,
TraceIdRatio: sc.OpenTelemetry.TraceIdRatio,
Insecure: sc.OpenTelemetry.Insecure,
ServiceName: sc.OpenTelemetry.ServiceName,
CollectorURL: sc.OpenTelemetry.CollectorURL,
TraceIdRatio: sc.OpenTelemetry.TraceIdRatio,
Insecure: sc.OpenTelemetry.Insecure,
CollectorAuth: sc.OpenTelemetry.CollectorAuth,
})
if err != nil {
return nil, fmt.Errorf("could not initialize tracer: %w", err)

View File

@@ -33,6 +33,19 @@ services:
depends_on:
- prometheus
hyperdx:
image: docker.hyperdx.io/hyperdx/hyperdx-all-in-one:latest
container_name: hatchet-hyperdx
ports:
- "8081:8080" # HyperDX UI
- "4317:4317" # OTLP gRPC
- "4318:4318" # OTLP HTTP
volumes:
- hatchet_hyperdx_data:/data/db
- hatchet_hyperdx_data:/var/lib/clickhouse
- hatchet_hyperdx_data:/var/log/clickhouse-server
volumes:
hatchet_prometheus_data:
hatchet_grafana_data:
hatchet_hyperdx_data:

View File

@@ -32,7 +32,7 @@ func (oc *OLAPControllerImpl) runOLAPTablePartition(ctx context.Context) func()
}
func (oc *OLAPControllerImpl) createTablePartition(ctx context.Context) error {
ctx, span := telemetry.NewSpan(ctx, "create-table-partition")
ctx, span := telemetry.NewSpan(ctx, "OLAPControllerImpl.createTablePartition")
defer span.End()
err := oc.repo.OLAP().UpdateTablePartitions(ctx)

View File

@@ -11,6 +11,7 @@ import (
"github.com/google/uuid"
"github.com/hashicorp/go-multierror"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel/codes"
"golang.org/x/sync/errgroup"
"github.com/hatchet-dev/hatchet/internal/cel"
@@ -20,6 +21,7 @@ import (
"github.com/hatchet-dev/hatchet/internal/services/partition"
"github.com/hatchet-dev/hatchet/internal/services/shared/recoveryutils"
tasktypes "github.com/hatchet-dev/hatchet/internal/services/shared/tasktypes/v1"
"github.com/hatchet-dev/hatchet/internal/telemetry"
"github.com/hatchet-dev/hatchet/pkg/config/server"
"github.com/hatchet-dev/hatchet/pkg/config/shared"
hatcheterrors "github.com/hatchet-dev/hatchet/pkg/errors"
@@ -252,80 +254,120 @@ func (tc *TasksControllerImpl) Start() (func() error, error) {
ctx, cancel := context.WithCancel(context.Background())
spanContext, span := telemetry.NewSpan(ctx, "TasksControllerImpl.Start")
_, err = tc.s.NewJob(
gocron.DurationJob(tc.opsPoolPollInterval),
gocron.NewTask(
tc.runTenantTimeoutTasks(ctx),
tc.runTenantTimeoutTasks(spanContext),
),
)
if err != nil {
wrappedErr := fmt.Errorf("could not schedule step run timeout: %w", err)
cancel()
return nil, fmt.Errorf("could not schedule step run timeout: %w", err)
span.RecordError(err)
span.SetStatus(codes.Error, "could not schedule step run timeout")
span.End()
return nil, wrappedErr
}
_, err = tc.s.NewJob(
gocron.DurationJob(tc.opsPoolPollInterval),
gocron.NewTask(
tc.runTenantSleepEmitter(ctx),
tc.runTenantSleepEmitter(spanContext),
),
)
if err != nil {
wrappedErr := fmt.Errorf("could not schedule step run emit sleep: %w", err)
cancel()
return nil, fmt.Errorf("could not schedule step run emit sleep: %w", err)
span.RecordError(err)
span.SetStatus(codes.Error, "could not schedule step run emit sleep")
span.End()
return nil, wrappedErr
}
_, err = tc.s.NewJob(
gocron.DurationJob(tc.opsPoolPollInterval),
gocron.NewTask(
tc.runTenantReassignTasks(ctx),
tc.runTenantReassignTasks(spanContext),
),
)
if err != nil {
wrappedErr := fmt.Errorf("could not schedule step run reassignment: %w", err)
cancel()
return nil, fmt.Errorf("could not schedule step run reassignment: %w", err)
span.RecordError(err)
span.SetStatus(codes.Error, "could not schedule step run reassignment")
span.End()
return nil, wrappedErr
}
_, err = tc.s.NewJob(
gocron.DurationJob(tc.opsPoolPollInterval),
gocron.NewTask(
tc.runTenantRetryQueueItems(ctx),
tc.runTenantRetryQueueItems(spanContext),
),
)
if err != nil {
wrappedErr := fmt.Errorf("could not schedule step run retry queue items: %w", err)
cancel()
return nil, fmt.Errorf("could not schedule step run reassignment: %w", err)
span.RecordError(err)
span.SetStatus(codes.Error, "could not schedule step run retry queue items")
span.End()
return nil, wrappedErr
}
_, err = tc.s.NewJob(
gocron.DurationJob(time.Minute*15),
gocron.NewTask(
tc.runTaskTablePartition(ctx),
tc.runTaskTablePartition(spanContext),
),
gocron.WithSingletonMode(gocron.LimitModeReschedule),
)
if err != nil {
wrappedErr := fmt.Errorf("could not schedule task partition method: %w", err)
cancel()
return nil, fmt.Errorf("could not schedule task partition method: %w", err)
span.RecordError(err)
span.SetStatus(codes.Error, "could not schedule task partition method")
span.End()
return nil, wrappedErr
}
cleanup := func() error {
cancel()
if err := cleanupBuffer(); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "could not cleanup buffer")
return err
}
tc.pubBuffer.Stop()
if err := tc.s.Shutdown(); err != nil {
return fmt.Errorf("could not shutdown scheduler: %w", err)
err := fmt.Errorf("could not shutdown scheduler: %w", err)
span.RecordError(err)
span.SetStatus(codes.Error, "could not shutdown scheduler")
return err
}
span.End()
return nil
}

View File

@@ -2,21 +2,27 @@ package task
import (
"context"
"fmt"
"time"
"github.com/hatchet-dev/hatchet/internal/telemetry"
"go.opentelemetry.io/otel/codes"
)
func (tc *TasksControllerImpl) runTaskTablePartition(ctx context.Context) func() {
return func() {
ctx, span := telemetry.NewSpan(ctx, "TasksControllerImpl.runTaskTablePartition")
defer span.End()
tc.l.Debug().Msgf("partition: running task table partition")
// get internal tenant
tenant, err := tc.p.GetInternalTenantForController(ctx)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "could not get internal tenant")
tc.l.Error().Err(err).Msg("could not get internal tenant")
return
}
@@ -27,13 +33,15 @@ func (tc *TasksControllerImpl) runTaskTablePartition(ctx context.Context) func()
err = tc.createTablePartition(ctx)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "could not create table partition")
tc.l.Error().Err(err).Msg("could not create table partition")
}
}
}
func (tc *TasksControllerImpl) createTablePartition(ctx context.Context) error {
ctx, span := telemetry.NewSpan(ctx, "create-table-partition")
ctx, span := telemetry.NewSpan(ctx, "TasksControllerImpl.createTablePartition")
defer span.End()
qCtx, qCancel := context.WithTimeout(ctx, 10*time.Minute)
@@ -42,7 +50,9 @@ func (tc *TasksControllerImpl) createTablePartition(ctx context.Context) error {
err := tc.repov1.Tasks().UpdateTablePartitions(qCtx)
if err != nil {
return fmt.Errorf("could not create table partition: %w", err)
span.RecordError(err)
span.SetStatus(codes.Error, "could not create table partition")
return err
}
return nil

View File

@@ -169,6 +169,9 @@ func (p *Partition) StartControllerPartition(ctx context.Context) (func() error,
}
func (p *Partition) GetInternalTenantForController(ctx context.Context) (*dbsqlc.Tenant, error) {
ctx, span := telemetry.NewSpan(ctx, "Partition.GetInternalTenantForController")
defer span.End()
return p.repo.GetInternalTenantForController(ctx, p.GetControllerPartitionId())
}

View File

@@ -20,10 +20,11 @@ import (
)
type TracerOpts struct {
ServiceName string
CollectorURL string
Insecure bool
TraceIdRatio string
ServiceName string
CollectorURL string
Insecure bool
TraceIdRatio string
CollectorAuth string
}
func InitTracer(opts *TracerOpts) (func(context.Context) error, error) {
@@ -47,6 +48,9 @@ func InitTracer(opts *TracerOpts) (func(context.Context) error, error) {
otlptracegrpc.NewClient(
secureOption,
otlptracegrpc.WithEndpoint(opts.CollectorURL),
otlptracegrpc.WithHeaders(map[string]string{
"Authorization": opts.CollectorAuth,
}),
),
)

View File

@@ -754,6 +754,7 @@ func BindAllEnv(v *viper.Viper) {
_ = v.BindEnv("otel.collectorURL", "SERVER_OTEL_COLLECTOR_URL")
_ = v.BindEnv("otel.traceIdRatio", "SERVER_OTEL_TRACE_ID_RATIO")
_ = v.BindEnv("otel.insecure", "SERVER_OTEL_INSECURE")
_ = v.BindEnv("otel.collectorAuth", "SERVER_OTEL_COLLECTOR_AUTH")
// prometheus options
_ = v.BindEnv("prometheus.prometheusServerURL", "SERVER_PROMETHEUS_SERVER_URL")

View File

@@ -20,10 +20,11 @@ type LoggerConfigFile struct {
}
type OpenTelemetryConfigFile struct {
CollectorURL string `mapstructure:"collectorURL" json:"collectorURL,omitempty"`
ServiceName string `mapstructure:"serviceName" json:"serviceName,omitempty" default:"server"`
TraceIdRatio string `mapstructure:"traceIdRatio" json:"traceIdRatio,omitempty" default:"1"`
Insecure bool `mapstructure:"insecure" json:"insecure,omitempty" default:"false"`
CollectorURL string `mapstructure:"collectorURL" json:"collectorURL,omitempty"`
ServiceName string `mapstructure:"serviceName" json:"serviceName,omitempty" default:"server"`
TraceIdRatio string `mapstructure:"traceIdRatio" json:"traceIdRatio,omitempty" default:"1"`
Insecure bool `mapstructure:"insecure" json:"insecure,omitempty" default:"false"`
CollectorAuth string `mapstructure:"collectorAuth" json:"collectorAuth,omitempty"`
}
type PrometheusConfigFile struct {