From b4670af138317691dd4c4cbfbd734b72b9102950 Mon Sep 17 00:00:00 2001 From: Gabe Ruttner Date: Tue, 30 Jul 2024 15:11:10 -0700 Subject: [PATCH] Fix qos otel config (#754) * feat: otel trace id ratio * feat: rabbitmq qos * feat: requeue limit * fix: tests --- cmd/hatchet-engine/engine/run.go | 3 +++ internal/msgqueue/rabbitmq/rabbitmq.go | 12 +++++++++++- internal/msgqueue/rabbitmq/rabbitmq_test.go | 3 +++ internal/telemetry/telemetry.go | 14 +++++++++++++- pkg/config/loader/loader.go | 3 ++- pkg/config/server/server.go | 9 +++++++++ pkg/config/shared/shared.go | 1 + pkg/repository/prisma/repository.go | 4 ++-- pkg/repository/prisma/step_run.go | 9 ++++++--- 9 files changed, 50 insertions(+), 8 deletions(-) diff --git a/cmd/hatchet-engine/engine/run.go b/cmd/hatchet-engine/engine/run.go index 3e24c5aa0..0b05534c6 100644 --- a/cmd/hatchet-engine/engine/run.go +++ b/cmd/hatchet-engine/engine/run.go @@ -34,12 +34,14 @@ type Teardown struct { func init() { svcName := os.Getenv("SERVER_OTEL_SERVICE_NAME") collectorURL := os.Getenv("SERVER_OTEL_COLLECTOR_URL") + traceIdRatio := os.Getenv("SERVER_OTEL_TRACE_ID_RATIO") // we do this to we get the tracer set globally, which is needed by some of the otel // integrations for the database before start _, err := telemetry.InitTracer(&telemetry.TracerOpts{ ServiceName: svcName, CollectorURL: collectorURL, + TraceIdRatio: traceIdRatio, }) if err != nil { @@ -101,6 +103,7 @@ func RunWithConfig(ctx context.Context, sc *server.ServerConfig) ([]Teardown, er shutdown, err := telemetry.InitTracer(&telemetry.TracerOpts{ ServiceName: sc.OpenTelemetry.ServiceName, CollectorURL: sc.OpenTelemetry.CollectorURL, + TraceIdRatio: sc.OpenTelemetry.TraceIdRatio, }) if err != nil { return nil, fmt.Errorf("could not initialize tracer: %w", err) diff --git a/internal/msgqueue/rabbitmq/rabbitmq.go b/internal/msgqueue/rabbitmq/rabbitmq.go index 982aff7a6..9a698dfd2 100644 --- a/internal/msgqueue/rabbitmq/rabbitmq.go +++ b/internal/msgqueue/rabbitmq/rabbitmq.go @@ -41,6 +41,8 @@ type MessageQueueImpl struct { msgs chan *msgWithQueue identity string + qos int + l *zerolog.Logger ready bool @@ -58,6 +60,7 @@ type MessageQueueImplOpt func(*MessageQueueImplOpts) type MessageQueueImplOpts struct { l *zerolog.Logger url string + qos int } func defaultMessageQueueImplOpts() *MessageQueueImplOpts { @@ -80,6 +83,12 @@ func WithURL(url string) MessageQueueImplOpt { } } +func WithQos(qos int) MessageQueueImplOpt { + return func(opts *MessageQueueImplOpts) { + opts.qos = qos + } +} + // New creates a new MessageQueueImpl. func New(fs ...MessageQueueImplOpt) (func() error, *MessageQueueImpl) { ctx, cancel := context.WithCancel(context.Background()) @@ -97,6 +106,7 @@ func New(fs ...MessageQueueImplOpt) (func() error, *MessageQueueImpl) { ctx: ctx, identity: identity(), l: opts.l, + qos: opts.qos, } constructor := func(context.Context) (*amqp.Connection, error) { @@ -407,7 +417,7 @@ func (t *MessageQueueImpl) subscribe( } // We'd like to limit to 1k TPS per engine. The max channels on an instance is 10. - err = sub.Qos(100, 0, false) + err = sub.Qos(t.qos, 0, false) if err != nil { t.l.Error().Msgf("cannot set qos: %v", err) diff --git a/internal/msgqueue/rabbitmq/rabbitmq_test.go b/internal/msgqueue/rabbitmq/rabbitmq_test.go index 89f419573..fc7c257d3 100644 --- a/internal/msgqueue/rabbitmq/rabbitmq_test.go +++ b/internal/msgqueue/rabbitmq/rabbitmq_test.go @@ -29,6 +29,7 @@ func TestMessageQueueIntegration(t *testing.T) { // Initialize the task queue implementation cleanup, tq := rabbitmq.New( rabbitmq.WithURL(url), + rabbitmq.WithQos(100), ) defer cleanup() // nolint: errcheck @@ -111,6 +112,7 @@ func TestDeadLetteringSuccess(t *testing.T) { // Initialize the task queue implementation cleanup, tq := rabbitmq.New( rabbitmq.WithURL(url), + rabbitmq.WithQos(100), ) defer cleanup() // nolint: errcheck @@ -169,6 +171,7 @@ func TestDeadLetteringExceedRetriesFailure(t *testing.T) { // Initialize the task queue implementation cleanup, tq := rabbitmq.New( rabbitmq.WithURL(url), + rabbitmq.WithQos(100), ) defer cleanup() // nolint: errcheck diff --git a/internal/telemetry/telemetry.go b/internal/telemetry/telemetry.go index f7f364863..793c20256 100644 --- a/internal/telemetry/telemetry.go +++ b/internal/telemetry/telemetry.go @@ -3,6 +3,7 @@ package telemetry import ( "context" "fmt" + "strconv" "strings" "time" @@ -21,6 +22,7 @@ type TracerOpts struct { ServiceName string CollectorURL string Insecure bool + TraceIdRatio string } func InitTracer(opts *TracerOpts) (func(context.Context) error, error) { @@ -63,9 +65,19 @@ func InitTracer(opts *TracerOpts) (func(context.Context) error, error) { return nil, fmt.Errorf("failed to set resources: %w", err) } + var traceIdRatio float64 = 1 + + if opts.TraceIdRatio != "" { + traceIdRatio, err = strconv.ParseFloat(opts.TraceIdRatio, 64) + + if err != nil { + return nil, fmt.Errorf("failed to parse traceIdRatio: %w", err) + } + } + otel.SetTracerProvider( sdktrace.NewTracerProvider( - sdktrace.WithSampler(sdktrace.AlwaysSample()), + sdktrace.WithSampler(sdktrace.TraceIDRatioBased(traceIdRatio)), sdktrace.WithBatcher(exporter), sdktrace.WithResource(resources), ), diff --git a/pkg/config/loader/loader.go b/pkg/config/loader/loader.go index bdc65849e..52811f8ac 100644 --- a/pkg/config/loader/loader.go +++ b/pkg/config/loader/loader.go @@ -182,7 +182,7 @@ func GetDatabaseConfigFromConfigFile(cf *database.ConfigFile, runtime *server.Co }, Pool: pool, APIRepository: prisma.NewAPIRepository(c, pool, prisma.WithLogger(&l), prisma.WithCache(ch), prisma.WithMetered(meter)), - EngineRepository: prisma.NewEngineRepository(pool, prisma.WithLogger(&l), prisma.WithCache(ch), prisma.WithMetered(meter)), + EngineRepository: prisma.NewEngineRepository(pool, runtime, prisma.WithLogger(&l), prisma.WithCache(ch), prisma.WithMetered(meter)), EntitlementRepository: entitlementRepo, Seed: cf.Seed, }, nil @@ -220,6 +220,7 @@ func GetServerConfigFromConfigfile(dc *database.Config, cf *server.ServerConfigF cleanup1, mq = rabbitmq.New( rabbitmq.WithURL(cf.MessageQueue.RabbitMQ.URL), rabbitmq.WithLogger(&l), + rabbitmq.WithQos(cf.MessageQueue.RabbitMQ.Qos), ) ing, err = ingestor.NewIngestor( diff --git a/pkg/config/server/server.go b/pkg/config/server/server.go index 973b71850..de3a15103 100644 --- a/pkg/config/server/server.go +++ b/pkg/config/server/server.go @@ -84,6 +84,9 @@ type ConfigFileRuntime struct { // Default limit values Limits LimitConfigFile `mapstructure:"limits" json:"limits,omitempty"` + // RequeueLimit is the number of times a message will be requeued in each attempt + RequeueLimit int `mapstructure:"requeueLimit" json:"requeueLimit,omitempty" default:"100"` + // Allow new tenants to be created AllowSignup bool `mapstructure:"allowSignup" json:"allowSignup,omitempty" default:"true"` @@ -267,6 +270,7 @@ type MessageQueueConfigFile struct { type RabbitMQConfigFile struct { URL string `mapstructure:"url" json:"url,omitempty" validate:"required" default:"amqp://user:password@localhost:5672/"` + Qos int `mapstructure:"qos" json:"qos,omitempty" default:"100"` } type ConfigFileEmail struct { @@ -448,6 +452,10 @@ func BindAllEnv(v *viper.Viper) { _ = v.BindEnv("msgQueue.kind", "SERVER_MSGQUEUE_KIND") _ = v.BindEnv("msgQueue.rabbitmq.url", "SERVER_MSGQUEUE_RABBITMQ_URL") + // throughput options + _ = v.BindEnv("msgQueue.rabbitmq.qos", "SERVER_MSGQUEUE_RABBITMQ_QOS") + _ = v.BindEnv("runtime.requeueLimit", "SERVER_REQUEUE_LIMIT") + // tls options _ = v.BindEnv("tls.tlsStrategy", "SERVER_TLS_STRATEGY") _ = v.BindEnv("tls.tlsCert", "SERVER_TLS_CERT") @@ -465,6 +473,7 @@ func BindAllEnv(v *viper.Viper) { // otel options _ = v.BindEnv("otel.serviceName", "SERVER_OTEL_SERVICE_NAME") _ = v.BindEnv("otel.collectorURL", "SERVER_OTEL_COLLECTOR_URL") + _ = v.BindEnv("otel.traceIdRatio", "SERVER_OTEL_TRACE_ID_RATIO") // tenant alerting options _ = v.BindEnv("tenantAlerting.slack.enabled", "SERVER_TENANT_ALERTING_SLACK_ENABLED") diff --git a/pkg/config/shared/shared.go b/pkg/config/shared/shared.go index a91150faa..2c9b0e02a 100644 --- a/pkg/config/shared/shared.go +++ b/pkg/config/shared/shared.go @@ -22,4 +22,5 @@ type LoggerConfigFile struct { type OpenTelemetryConfigFile struct { CollectorURL string `mapstructure:"collectorURL" json:"collectorURL,omitempty"` ServiceName string `mapstructure:"serviceName" json:"serviceName,omitempty" default:"server"` + TraceIdRatio string `mapstructure:"traceIdRatio" json:"traceIdRatio,omitempty" default:"1"` } diff --git a/pkg/repository/prisma/repository.go b/pkg/repository/prisma/repository.go index 7b9f9186a..9bb74fbee 100644 --- a/pkg/repository/prisma/repository.go +++ b/pkg/repository/prisma/repository.go @@ -276,7 +276,7 @@ func (r *engineRepository) WebhookWorker() repository.WebhookWorkerEngineReposit return r.webhookWorker } -func NewEngineRepository(pool *pgxpool.Pool, fs ...PrismaRepositoryOpt) repository.EngineRepository { +func NewEngineRepository(pool *pgxpool.Pool, cf *server.ConfigFileRuntime, fs ...PrismaRepositoryOpt) repository.EngineRepository { opts := defaultPrismaRepositoryOpts() for _, f := range fs { @@ -297,7 +297,7 @@ func NewEngineRepository(pool *pgxpool.Pool, fs ...PrismaRepositoryOpt) reposito event: NewEventEngineRepository(pool, opts.v, opts.l, opts.metered), getGroupKeyRun: NewGetGroupKeyRunRepository(pool, opts.v, opts.l), jobRun: NewJobRunEngineRepository(pool, opts.v, opts.l), - stepRun: NewStepRunEngineRepository(pool, opts.v, opts.l), + stepRun: NewStepRunEngineRepository(pool, opts.v, opts.l, cf), tenant: NewTenantEngineRepository(pool, opts.v, opts.l, opts.cache), tenantAlerting: NewTenantAlertingEngineRepository(pool, opts.v, opts.l, opts.cache), ticker: NewTickerRepository(pool, opts.v, opts.l), diff --git a/pkg/repository/prisma/step_run.go b/pkg/repository/prisma/step_run.go index cd9ff8da3..addcc7b15 100644 --- a/pkg/repository/prisma/step_run.go +++ b/pkg/repository/prisma/step_run.go @@ -15,6 +15,7 @@ import ( "github.com/rs/zerolog" "github.com/hatchet-dev/hatchet/internal/telemetry" + "github.com/hatchet-dev/hatchet/pkg/config/server" "github.com/hatchet-dev/hatchet/pkg/repository" "github.com/hatchet-dev/hatchet/pkg/repository/prisma/db" "github.com/hatchet-dev/hatchet/pkg/repository/prisma/dbsqlc" @@ -200,9 +201,10 @@ type stepRunEngineRepository struct { v validator.Validator l *zerolog.Logger queries *dbsqlc.Queries + cf *server.ConfigFileRuntime } -func NewStepRunEngineRepository(pool *pgxpool.Pool, v validator.Validator, l *zerolog.Logger) repository.StepRunEngineRepository { +func NewStepRunEngineRepository(pool *pgxpool.Pool, v validator.Validator, l *zerolog.Logger, cf *server.ConfigFileRuntime) repository.StepRunEngineRepository { queries := dbsqlc.New() return &stepRunEngineRepository{ @@ -210,6 +212,7 @@ func NewStepRunEngineRepository(pool *pgxpool.Pool, v validator.Validator, l *ze v: v, l: l, queries: queries, + cf: cf, } } @@ -332,8 +335,8 @@ func (s *stepRunEngineRepository) ListStepRunsToRequeue(ctx context.Context, ten return nil, err } - if limit > 100 { - limit = 100 + if limit > int32(s.cf.RequeueLimit) { + limit = int32(s.cf.RequeueLimit) } // get the step run and make sure it's still in pending