mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2025-12-19 07:40:16 -06:00
Revert "feat: configurable max queue size and export batch size"
This reverts commit 7725ec3bab.
This commit is contained in:
@@ -3,7 +3,6 @@ package api
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/hatchet-dev/hatchet/api/v1/server/run"
|
||||
@@ -17,8 +16,6 @@ func init() {
|
||||
insecure := os.Getenv("SERVER_OTEL_INSECURE")
|
||||
traceIDRatio := os.Getenv("SERVER_OTEL_TRACE_ID_RATIO")
|
||||
collectorAuth := os.Getenv("SERVER_OTEL_COLLECTOR_AUTH")
|
||||
unparsedMaxQueueSize := os.Getenv("SERVER_OTEL_EXPORTER_MAX_QUEUE_SIZE")
|
||||
unparsedMaxExportBatchSize := os.Getenv("SERVER_OTEL_EXPORTER_MAX_EXPORT_BATCH_SIZE")
|
||||
|
||||
var insecureBool bool
|
||||
|
||||
@@ -26,33 +23,14 @@ func init() {
|
||||
insecureBool = true
|
||||
}
|
||||
|
||||
var maxQueueSize, maxExportBatchSize *int
|
||||
if unparsedMaxQueueSize != "" {
|
||||
maxQueueSizeInt, err := strconv.Atoi(unparsedMaxQueueSize)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("could not parse SERVER_OTEL_EXPORTER_MAX_QUEUE_SIZE: %w", err))
|
||||
}
|
||||
maxQueueSize = &maxQueueSizeInt
|
||||
}
|
||||
|
||||
if unparsedMaxExportBatchSize != "" {
|
||||
maxExportBatchSizeInt, err := strconv.Atoi(unparsedMaxExportBatchSize)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("could not parse SERVER_OTEL_EXPORTER_MAX_EXPORT_BATCH_SIZE: %w", err))
|
||||
}
|
||||
maxExportBatchSize = &maxExportBatchSizeInt
|
||||
}
|
||||
|
||||
// we do this to we get the tracer set globally, which is needed by some of the otel
|
||||
// integrations for the database before start
|
||||
_, err := telemetry.InitTracer(&telemetry.TracerOpts{
|
||||
ServiceName: svcName,
|
||||
CollectorURL: collectorURL,
|
||||
TraceIdRatio: traceIDRatio,
|
||||
Insecure: insecureBool,
|
||||
CollectorAuth: collectorAuth,
|
||||
MaxQueueSize: maxQueueSize,
|
||||
MaxExportBatchSize: maxExportBatchSize,
|
||||
ServiceName: svcName,
|
||||
CollectorURL: collectorURL,
|
||||
TraceIdRatio: traceIDRatio,
|
||||
Insecure: insecureBool,
|
||||
CollectorAuth: collectorAuth,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -50,8 +49,6 @@ func init() {
|
||||
insecure := os.Getenv("SERVER_OTEL_INSECURE")
|
||||
traceIdRatio := os.Getenv("SERVER_OTEL_TRACE_ID_RATIO")
|
||||
collectorAuth := os.Getenv("SERVER_OTEL_COLLECTOR_AUTH")
|
||||
unparsedMaxQueueSize := os.Getenv("SERVER_OTEL_EXPORTER_MAX_QUEUE_SIZE")
|
||||
unparsedMaxExportBatchSize := os.Getenv("SERVER_OTEL_EXPORTER_MAX_EXPORT_BATCH_SIZE")
|
||||
|
||||
var insecureBool bool
|
||||
|
||||
@@ -59,33 +56,14 @@ func init() {
|
||||
insecureBool = true
|
||||
}
|
||||
|
||||
var maxQueueSize, maxExportBatchSize *int
|
||||
if unparsedMaxQueueSize != "" {
|
||||
maxQueueSizeInt, err := strconv.Atoi(unparsedMaxQueueSize)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("could not parse SERVER_OTEL_EXPORTER_MAX_QUEUE_SIZE: %w", err))
|
||||
}
|
||||
maxQueueSize = &maxQueueSizeInt
|
||||
}
|
||||
|
||||
if unparsedMaxExportBatchSize != "" {
|
||||
maxExportBatchSizeInt, err := strconv.Atoi(unparsedMaxExportBatchSize)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("could not parse SERVER_OTEL_EXPORTER_MAX_EXPORT_BATCH_SIZE: %w", err))
|
||||
}
|
||||
maxExportBatchSize = &maxExportBatchSizeInt
|
||||
}
|
||||
|
||||
// we do this to we get the tracer set globally, which is needed by some of the otel
|
||||
// integrations for the database before start
|
||||
_, err := telemetry.InitTracer(&telemetry.TracerOpts{
|
||||
ServiceName: svcName,
|
||||
CollectorURL: collectorURL,
|
||||
TraceIdRatio: traceIdRatio,
|
||||
Insecure: insecureBool,
|
||||
CollectorAuth: collectorAuth,
|
||||
MaxQueueSize: maxQueueSize,
|
||||
MaxExportBatchSize: maxExportBatchSize,
|
||||
ServiceName: svcName,
|
||||
CollectorURL: collectorURL,
|
||||
TraceIdRatio: traceIdRatio,
|
||||
Insecure: insecureBool,
|
||||
CollectorAuth: collectorAuth,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
@@ -156,13 +134,11 @@ func runV0Config(ctx context.Context, sc *server.ServerConfig) ([]Teardown, erro
|
||||
var l = sc.Logger
|
||||
|
||||
shutdown, err := telemetry.InitTracer(&telemetry.TracerOpts{
|
||||
ServiceName: sc.OpenTelemetry.ServiceName,
|
||||
CollectorURL: sc.OpenTelemetry.CollectorURL,
|
||||
TraceIdRatio: sc.OpenTelemetry.TraceIdRatio,
|
||||
Insecure: sc.OpenTelemetry.Insecure,
|
||||
CollectorAuth: sc.OpenTelemetry.CollectorAuth,
|
||||
MaxQueueSize: sc.OpenTelemetry.ExporterMaxQueueSize,
|
||||
MaxExportBatchSize: sc.OpenTelemetry.ExporterMaxExportBatchSize,
|
||||
ServiceName: sc.OpenTelemetry.ServiceName,
|
||||
CollectorURL: sc.OpenTelemetry.CollectorURL,
|
||||
TraceIdRatio: sc.OpenTelemetry.TraceIdRatio,
|
||||
Insecure: sc.OpenTelemetry.Insecure,
|
||||
CollectorAuth: sc.OpenTelemetry.CollectorAuth,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not initialize tracer: %w", err)
|
||||
@@ -641,13 +617,11 @@ func runV1Config(ctx context.Context, sc *server.ServerConfig) ([]Teardown, erro
|
||||
var l = sc.Logger
|
||||
|
||||
shutdown, err := telemetry.InitTracer(&telemetry.TracerOpts{
|
||||
ServiceName: sc.OpenTelemetry.ServiceName,
|
||||
CollectorURL: sc.OpenTelemetry.CollectorURL,
|
||||
TraceIdRatio: sc.OpenTelemetry.TraceIdRatio,
|
||||
Insecure: sc.OpenTelemetry.Insecure,
|
||||
CollectorAuth: sc.OpenTelemetry.CollectorAuth,
|
||||
MaxQueueSize: sc.OpenTelemetry.ExporterMaxQueueSize,
|
||||
MaxExportBatchSize: sc.OpenTelemetry.ExporterMaxExportBatchSize,
|
||||
ServiceName: sc.OpenTelemetry.ServiceName,
|
||||
CollectorURL: sc.OpenTelemetry.CollectorURL,
|
||||
TraceIdRatio: sc.OpenTelemetry.TraceIdRatio,
|
||||
Insecure: sc.OpenTelemetry.Insecure,
|
||||
CollectorAuth: sc.OpenTelemetry.CollectorAuth,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not initialize tracer: %w", err)
|
||||
|
||||
@@ -20,13 +20,11 @@ import (
|
||||
)
|
||||
|
||||
type TracerOpts struct {
|
||||
ServiceName string
|
||||
CollectorURL string
|
||||
Insecure bool
|
||||
TraceIdRatio string
|
||||
CollectorAuth string
|
||||
MaxQueueSize *int
|
||||
MaxExportBatchSize *int
|
||||
ServiceName string
|
||||
CollectorURL string
|
||||
Insecure bool
|
||||
TraceIdRatio string
|
||||
CollectorAuth string
|
||||
}
|
||||
|
||||
func InitTracer(opts *TracerOpts) (func(context.Context) error, error) {
|
||||
@@ -82,26 +80,13 @@ func InitTracer(opts *TracerOpts) (func(context.Context) error, error) {
|
||||
}
|
||||
}
|
||||
|
||||
maxQueueSize := sdktrace.DefaultMaxQueueSize
|
||||
|
||||
if opts.MaxQueueSize != nil {
|
||||
maxQueueSize = *opts.MaxQueueSize
|
||||
}
|
||||
|
||||
maxExportBatchSize := sdktrace.DefaultMaxExportBatchSize
|
||||
|
||||
if opts.MaxExportBatchSize != nil {
|
||||
maxExportBatchSize = *opts.MaxExportBatchSize
|
||||
}
|
||||
|
||||
otel.SetTracerProvider(
|
||||
sdktrace.NewTracerProvider(
|
||||
sdktrace.WithSampler(sdktrace.TraceIDRatioBased(traceIdRatio)),
|
||||
sdktrace.WithBatcher(
|
||||
exporter,
|
||||
// Increasing queue size might help prevent spans from being dropped with smaller export batch sizes
|
||||
sdktrace.WithMaxQueueSize(maxQueueSize),
|
||||
sdktrace.WithMaxExportBatchSize(maxExportBatchSize),
|
||||
sdktrace.WithMaxQueueSize(sdktrace.DefaultMaxQueueSize*10),
|
||||
sdktrace.WithMaxExportBatchSize(sdktrace.DefaultMaxExportBatchSize*10),
|
||||
),
|
||||
sdktrace.WithResource(resources),
|
||||
),
|
||||
|
||||
@@ -20,13 +20,11 @@ type LoggerConfigFile struct {
|
||||
}
|
||||
|
||||
type OpenTelemetryConfigFile struct {
|
||||
CollectorURL string `mapstructure:"collectorURL" json:"collectorURL,omitempty"`
|
||||
ServiceName string `mapstructure:"serviceName" json:"serviceName,omitempty" default:"server"`
|
||||
TraceIdRatio string `mapstructure:"traceIdRatio" json:"traceIdRatio,omitempty" default:"1"`
|
||||
Insecure bool `mapstructure:"insecure" json:"insecure,omitempty" default:"false"`
|
||||
CollectorAuth string `mapstructure:"collectorAuth" json:"collectorAuth,omitempty"`
|
||||
ExporterMaxQueueSize *int `mapstructure:"maxQueueSize" json:"exporterMaxQueueSize,omitempty"`
|
||||
ExporterMaxExportBatchSize *int `mapstructure:"maxExportBatchSize" json:"exporterMaxExportBatchSize,omitempty"`
|
||||
CollectorURL string `mapstructure:"collectorURL" json:"collectorURL,omitempty"`
|
||||
ServiceName string `mapstructure:"serviceName" json:"serviceName,omitempty" default:"server"`
|
||||
TraceIdRatio string `mapstructure:"traceIdRatio" json:"traceIdRatio,omitempty" default:"1"`
|
||||
Insecure bool `mapstructure:"insecure" json:"insecure,omitempty" default:"false"`
|
||||
CollectorAuth string `mapstructure:"collectorAuth" json:"collectorAuth,omitempty"`
|
||||
}
|
||||
|
||||
type PrometheusConfigFile struct {
|
||||
|
||||
Reference in New Issue
Block a user