diff --git a/Taskfile.yaml b/Taskfile.yaml index 31fad59a2..ddffa51ea 100644 --- a/Taskfile.yaml +++ b/Taskfile.yaml @@ -249,3 +249,6 @@ tasks: EXTRA_ARGS: '{{.EXTRA_ARGS | default ""}}' cmds: - poetry run pytest -n auto --retries 3 --retry-delay 5 + start-telemetry: + cmds: + - docker compose -f docker-compose.infra.yml up -d diff --git a/api/v1/server/handlers/v1/webhooks/receive.go b/api/v1/server/handlers/v1/webhooks/receive.go index 8de1d2fd6..4d0800c6d 100644 --- a/api/v1/server/handlers/v1/webhooks/receive.go +++ b/api/v1/server/handlers/v1/webhooks/receive.go @@ -124,7 +124,8 @@ func (w *V1WebhooksService) V1WebhookReceive(ctx echo.Context, request gen.V1Web * See: https://api.slack.com/interactivity/slash-commands * For GENERIC webhooks, we convert all form fields directly to the payload map */ - if webhook.SourceName == sqlcv1.V1IncomingWebhookSourceNameSLACK { + switch webhook.SourceName { + case sqlcv1.V1IncomingWebhookSourceNameSLACK: payloadValue := formData.Get("payload") if payloadValue != "" { /* Interactive components: parse the payload parameter as JSON */ @@ -153,14 +154,14 @@ func (w *V1WebhooksService) V1WebhookReceive(ctx echo.Context, request gen.V1Web } } } - } else if webhook.SourceName == sqlcv1.V1IncomingWebhookSourceNameGENERIC { + case sqlcv1.V1IncomingWebhookSourceNameGENERIC: /* For GENERIC webhooks, convert all form fields to the payload map */ for key, values := range formData { if len(values) > 0 { payloadMap[key] = values[0] } } - } else { + default: /* For other webhook sources, form-encoded data is unexpected - return error */ return gen.V1WebhookReceive400JSONResponse{ Errors: []gen.APIError{ @@ -210,11 +211,13 @@ func (w *V1WebhooksService) V1WebhookReceive(ctx echo.Context, request gen.V1Web if err != nil { var errorMsg string - if strings.Contains(err.Error(), "did not evaluate to a string") { + errStr := err.Error() + switch { + case strings.Contains(errStr, "did not evaluate to a string"): errorMsg = "Event key expression must evaluate to a string" - } else if eventKey == "" { + case eventKey == "": errorMsg = "Event key evaluated to an empty string" - } else { + default: errorMsg = "Failed to evaluate event key expression" } diff --git a/cmd/hatchet-admin/cli/seed/seed-cypress.go b/cmd/hatchet-admin/cli/seed/seed-cypress.go index a5d180fdc..979603a37 100644 --- a/cmd/hatchet-admin/cli/seed/seed-cypress.go +++ b/cmd/hatchet-admin/cli/seed/seed-cypress.go @@ -354,7 +354,7 @@ export type SeededUser = (typeof seededUsers)[SeededUserKey]; return err } - if err := os.WriteFile(targetFile, []byte(contents), 0o644); err != nil { + if err := os.WriteFile(targetFile, []byte(contents), 0o600); err != nil { return err } diff --git a/cmd/hatchet-engine/engine/run.go b/cmd/hatchet-engine/engine/run.go index 97b516f56..7edaf10f4 100644 --- a/cmd/hatchet-engine/engine/run.go +++ b/cmd/hatchet-engine/engine/run.go @@ -14,6 +14,7 @@ import ( adminv1 "github.com/hatchet-dev/hatchet/internal/services/admin/v1" "github.com/hatchet-dev/hatchet/internal/services/controllers/events" "github.com/hatchet-dev/hatchet/internal/services/controllers/jobs" + metricscontroller "github.com/hatchet-dev/hatchet/internal/services/controllers/metrics" "github.com/hatchet-dev/hatchet/internal/services/controllers/retention" "github.com/hatchet-dev/hatchet/internal/services/controllers/v1/olap" "github.com/hatchet-dev/hatchet/internal/services/controllers/v1/task" @@ -70,6 +71,18 @@ func init() { if err != nil { panic(fmt.Errorf("could not initialize tracer: %w", err)) } + + // Initialize the meter provider for metrics + _, err = telemetry.InitMeter(&telemetry.TracerOpts{ + ServiceName: svcName, + CollectorURL: collectorURL, + Insecure: insecureBool, + CollectorAuth: collectorAuth, + }) + + if err != nil { + panic(fmt.Errorf("could not initialize meter: %w", err)) + } } func Run(ctx context.Context, cf *loader.ConfigLoader, version string) error { @@ -890,6 +903,31 @@ func runV1Config(ctx context.Context, sc *server.ServerConfig) ([]Teardown, erro Name: "webhook worker", Fn: cleanup2, }) + + if sc.OpenTelemetry.MetricsEnabled && sc.OpenTelemetry.CollectorURL != "" { + mc, err := metricscontroller.New( + metricscontroller.WithLogger(sc.Logger), + metricscontroller.WithRepository(sc.V1), + metricscontroller.WithAlerter(sc.Alerter), + metricscontroller.WithPartition(p), + metricscontroller.WithIntervals(sc.CronOperations), + ) + if err != nil { + return nil, fmt.Errorf("could not create metrics collector: %w", err) + } + + cleanupMetrics, err := mc.Start() + if err != nil { + return nil, fmt.Errorf("could not start metrics collector: %w", err) + } + + teardown = append(teardown, Teardown{ + Name: "metrics collector", + Fn: cleanupMetrics, + }) + + l.Info().Msg("metrics collector started") + } } if sc.HasService("all") || sc.HasService("grpc-api") { diff --git a/frontend/docs/pages/self-hosting/configuration-options.mdx b/frontend/docs/pages/self-hosting/configuration-options.mdx index d64c06bf0..8aa7775be 100644 --- a/frontend/docs/pages/self-hosting/configuration-options.mdx +++ b/frontend/docs/pages/self-hosting/configuration-options.mdx @@ -307,6 +307,7 @@ Variables marked with ⚠️ are conditionally required when specific features a | `SERVER_OTEL_INSECURE` | Whether to use an insecure connection to the collector URL | | | `SERVER_OTEL_TRACE_ID_RATIO` | OpenTelemetry trace ID ratio | | | `SERVER_OTEL_COLLECTOR_AUTH` | OpenTelemetry Collector Authorization header value | | +| `SERVER_OTEL_METRICS_ENABLED` | Enable OpenTelemetry metrics collection | `false` | | `SERVER_PROMETHEUS_ENABLED` | Enable Prometheus | `false` | | `SERVER_PROMETHEUS_ADDRESS` | Prometheus address | `:9090` | | `SERVER_PROMETHEUS_PATH` | Prometheus metrics path | `/metrics` | @@ -338,36 +339,41 @@ Variables marked with ⚠️ are conditionally required when specific features a ## Cron Operations Configuration -| Variable | Description | Default Value | -| --------------------------------------------------------- | ----------------------------------------------------- | ------------- | -| `SERVER_CRON_OPERATIONS_TASK_ANALYZE_CRON_INTERVAL` | Interval for running ANALYZE on task-related tables | `3h` | -| `SERVER_CRON_OPERATIONS_OLAP_ANALYZE_CRON_INTERVAL` | Interval for running ANALYZE on OLAP/analytics tables | `3h` | -| `SERVER_WAIT_FOR_FLUSH` | Default wait for flush | `1ms` | -| `SERVER_MAX_CONCURRENT` | Default max concurrent | `50` | -| `SERVER_FLUSH_PERIOD_MILLISECONDS` | Default flush period | `10ms` | -| `SERVER_FLUSH_ITEMS_THRESHOLD` | Default flush threshold | `100` | -| `SERVER_FLUSH_STRATEGY` | Default flush strategy | `DYNAMIC` | -| `SERVER_WORKFLOWRUNBUFFER_WAIT_FOR_FLUSH` | Workflow run buffer wait for flush | | -| `SERVER_WORKFLOWRUNBUFFER_MAX_CONCURRENT` | Max concurrent workflow run buffer ops | | -| `SERVER_WORKFLOWRUNBUFFER_FLUSH_PERIOD_MILLISECONDS` | Flush period for workflow run buffer | | -| `SERVER_WORKFLOWRUNBUFFER_FLUSH_ITEMS_THRESHOLD` | Items threshold for workflow run buffer | | -| `SERVER_WORKFLOWRUNBUFFER_FLUSH_STRATEGY` | Flush strategy for workflow run buffer | | -| `SERVER_EVENTBUFFER_WAIT_FOR_FLUSH` | Event buffer wait for flush | | -| `SERVER_EVENTBUFFER_MAX_CONCURRENT` | Max concurrent event buffer ops | | -| `SERVER_EVENTBUFFER_FLUSH_PERIOD_MILLISECONDS` | Flush period for event buffer | | -| `SERVER_EVENTBUFFER_FLUSH_ITEMS_THRESHOLD` | Items threshold for event buffer | | -| `SERVER_EVENTBUFFER_SERIAL_BUFFER` | Event buffer serial mode | | -| `SERVER_EVENTBUFFER_FLUSH_STRATEGY` | Flush strategy for event buffer | | -| `SERVER_RELEASESEMAPHOREBUFFER_WAIT_FOR_FLUSH` | Release semaphore buffer wait for flush | | -| `SERVER_RELEASESEMAPHOREBUFFER_MAX_CONCURRENT` | Max concurrent release semaphore buffer ops | | -| `SERVER_RELEASESEMAPHOREBUFFER_FLUSH_PERIOD_MILLISECONDS` | Flush period for release semaphore buffer | | -| `SERVER_RELEASESEMAPHOREBUFFER_FLUSH_ITEMS_THRESHOLD` | Items threshold for release semaphore buffer | | -| `SERVER_RELEASESEMAPHOREBUFFER_FLUSH_STRATEGY` | Flush strategy for release semaphore buffer | | -| `SERVER_QUEUESTEPRUNBUFFER_WAIT_FOR_FLUSH` | Queue step run buffer wait for flush | | -| `SERVER_QUEUESTEPRUNBUFFER_MAX_CONCURRENT` | Max concurrent queue step run buffer ops | | -| `SERVER_QUEUESTEPRUNBUFFER_FLUSH_PERIOD_MILLISECONDS` | Flush period for queue step run buffer | | -| `SERVER_QUEUESTEPRUNBUFFER_FLUSH_ITEMS_THRESHOLD` | Items threshold for queue step run buffer | | -| `SERVER_QUEUESTEPRUNBUFFER_FLUSH_STRATEGY` | Flush strategy for queue step run buffer | | +| Variable | Description | Default Value | +| --------------------------------------------------------- | --------------------------------------------------------------------- | ------------- | +| `SERVER_CRON_OPERATIONS_TASK_ANALYZE_CRON_INTERVAL` | Interval for running ANALYZE on task-related tables | `3h` | +| `SERVER_CRON_OPERATIONS_OLAP_ANALYZE_CRON_INTERVAL` | Interval for running ANALYZE on OLAP/analytics tables | `3h` | +| `SERVER_CRON_OPERATIONS_DB_HEALTH_METRICS_INTERVAL` | Interval for collecting database health metrics (OTel) | `60s` | +| `SERVER_CRON_OPERATIONS_OLAP_METRICS_INTERVAL` | Interval for collecting OLAP metrics (OTel) | `5m` | +| `SERVER_CRON_OPERATIONS_WORKER_METRICS_INTERVAL` | Interval for collecting worker metrics (OTel) | `60s` | +| `SERVER_CRON_OPERATIONS_YESTERDAY_RUN_COUNT_HOUR` | Hour (0-23) at which to collect yesterday's workflow run count (OTel) | `0` | +| `SERVER_CRON_OPERATIONS_YESTERDAY_RUN_COUNT_MINUTE` | Minute (0-59) at which to collect yesterday's workflow run count | `5` | +| `SERVER_WAIT_FOR_FLUSH` | Default wait for flush | `1ms` | +| `SERVER_MAX_CONCURRENT` | Default max concurrent | `50` | +| `SERVER_FLUSH_PERIOD_MILLISECONDS` | Default flush period | `10ms` | +| `SERVER_FLUSH_ITEMS_THRESHOLD` | Default flush threshold | `100` | +| `SERVER_FLUSH_STRATEGY` | Default flush strategy | `DYNAMIC` | +| `SERVER_WORKFLOWRUNBUFFER_WAIT_FOR_FLUSH` | Workflow run buffer wait for flush | | +| `SERVER_WORKFLOWRUNBUFFER_MAX_CONCURRENT` | Max concurrent workflow run buffer ops | | +| `SERVER_WORKFLOWRUNBUFFER_FLUSH_PERIOD_MILLISECONDS` | Flush period for workflow run buffer | | +| `SERVER_WORKFLOWRUNBUFFER_FLUSH_ITEMS_THRESHOLD` | Items threshold for workflow run buffer | | +| `SERVER_WORKFLOWRUNBUFFER_FLUSH_STRATEGY` | Flush strategy for workflow run buffer | | +| `SERVER_EVENTBUFFER_WAIT_FOR_FLUSH` | Event buffer wait for flush | | +| `SERVER_EVENTBUFFER_MAX_CONCURRENT` | Max concurrent event buffer ops | | +| `SERVER_EVENTBUFFER_FLUSH_PERIOD_MILLISECONDS` | Flush period for event buffer | | +| `SERVER_EVENTBUFFER_FLUSH_ITEMS_THRESHOLD` | Items threshold for event buffer | | +| `SERVER_EVENTBUFFER_SERIAL_BUFFER` | Event buffer serial mode | | +| `SERVER_EVENTBUFFER_FLUSH_STRATEGY` | Flush strategy for event buffer | | +| `SERVER_RELEASESEMAPHOREBUFFER_WAIT_FOR_FLUSH` | Release semaphore buffer wait for flush | | +| `SERVER_RELEASESEMAPHOREBUFFER_MAX_CONCURRENT` | Max concurrent release semaphore buffer ops | | +| `SERVER_RELEASESEMAPHOREBUFFER_FLUSH_PERIOD_MILLISECONDS` | Flush period for release semaphore buffer | | +| `SERVER_RELEASESEMAPHOREBUFFER_FLUSH_ITEMS_THRESHOLD` | Items threshold for release semaphore buffer | | +| `SERVER_RELEASESEMAPHOREBUFFER_FLUSH_STRATEGY` | Flush strategy for release semaphore buffer | | +| `SERVER_QUEUESTEPRUNBUFFER_WAIT_FOR_FLUSH` | Queue step run buffer wait for flush | | +| `SERVER_QUEUESTEPRUNBUFFER_MAX_CONCURRENT` | Max concurrent queue step run buffer ops | | +| `SERVER_QUEUESTEPRUNBUFFER_FLUSH_PERIOD_MILLISECONDS` | Flush period for queue step run buffer | | +| `SERVER_QUEUESTEPRUNBUFFER_FLUSH_ITEMS_THRESHOLD` | Items threshold for queue step run buffer | | +| `SERVER_QUEUESTEPRUNBUFFER_FLUSH_STRATEGY` | Flush strategy for queue step run buffer | | ## OLAP Database Configuration diff --git a/go.mod b/go.mod index 29686c7a7..1f7bde029 100644 --- a/go.mod +++ b/go.mod @@ -37,9 +37,11 @@ require ( go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho v0.64.0 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.64.0 go.opentelemetry.io/otel v1.39.0 + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.39.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.39.0 go.opentelemetry.io/otel/sdk v1.39.0 + go.opentelemetry.io/otel/sdk/metric v1.39.0 go.opentelemetry.io/otel/trace v1.39.0 go.uber.org/goleak v1.3.0 golang.org/x/time v0.14.0 diff --git a/go.sum b/go.sum index cca70c9f2..88f243914 100644 --- a/go.sum +++ b/go.sum @@ -413,6 +413,8 @@ go.opentelemetry.io/contrib/propagators/b3 v1.39.0 h1:PI7pt9pkSnimWcp5sQhUA9OzLb go.opentelemetry.io/contrib/propagators/b3 v1.39.0/go.mod h1:5gV/EzPnfYIwjzj+6y8tbGW2PKWhcsz5e/7twptRVQY= go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48= go.opentelemetry.io/otel v1.39.0/go.mod h1:kLlFTywNWrFyEdH0oj2xK0bFYZtHRYUdv1NklR/tgc8= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.39.0 h1:cEf8jF6WbuGQWUVcqgyWtTR0kOOAWY1DYZ+UhvdmQPw= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.39.0/go.mod h1:k1lzV5n5U3HkGvTCJHraTAGJ7MqsgL1wrGwTj1Isfiw= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0 h1:f0cb2XPmrqn4XMy9PNliTgRKJgS5WcL/u0/WRYGz4t0= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0/go.mod h1:vnakAaFckOMiMtOIhFI2MNH4FYrZzXCYxmb1LlhoGz8= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.39.0 h1:in9O8ESIOlwJAEGTkkf34DesGRAc/Pn8qJ7k3r/42LM= diff --git a/internal/msgqueue/v1/rabbitmq/rabbitmq.go b/internal/msgqueue/v1/rabbitmq/rabbitmq.go index f6620bf89..53a4a2b69 100644 --- a/internal/msgqueue/v1/rabbitmq/rabbitmq.go +++ b/internal/msgqueue/v1/rabbitmq/rabbitmq.go @@ -455,18 +455,6 @@ func (t *MessageQueueImpl) pubMessage(ctx context.Context, q msgqueue.Queue, msg return nil } -func getMessageSize(m *msgqueue.Message) int { - payloadSize := getPayloadSize(m.Payloads) - - size := payloadSize + len(m.TenantID) + len(m.ID) + 4 // 4 bytes for other fields - - for k, v := range m.OtelCarrier { - size += len(k) + len(v) - } - - return size -} - // Subscribe subscribes to the msg queue. func (t *MessageQueueImpl) Subscribe( q msgqueue.Queue, diff --git a/internal/services/controllers/metrics/collector.go b/internal/services/controllers/metrics/collector.go new file mode 100644 index 000000000..700df0734 --- /dev/null +++ b/internal/services/controllers/metrics/collector.go @@ -0,0 +1,508 @@ +package metrics + +import ( + "context" + "fmt" + "time" + + "github.com/go-co-op/gocron/v2" + "github.com/rs/zerolog" + + "github.com/hatchet-dev/hatchet/internal/services/partition" + "github.com/hatchet-dev/hatchet/pkg/config/server" + hatcheterrors "github.com/hatchet-dev/hatchet/pkg/errors" + "github.com/hatchet-dev/hatchet/pkg/logger" + v1 "github.com/hatchet-dev/hatchet/pkg/repository/v1" + "github.com/hatchet-dev/hatchet/pkg/telemetry" +) + +// MetricsCollector collects and reports database and system metrics to OTel +type MetricsCollector interface { + Start() (func() error, error) +} + +type MetricsCollectorImpl struct { + l *zerolog.Logger + repo v1.Repository + recorder *telemetry.MetricsRecorder + s gocron.Scheduler + a *hatcheterrors.Wrapped + p *partition.Partition + dbHealthInterval time.Duration + olapInterval time.Duration + workerInterval time.Duration + yesterdayRunCountHour uint + yesterdayRunCountMinute uint +} + +type MetricsCollectorOpt func(*MetricsCollectorOpts) + +type MetricsCollectorOpts struct { + l *zerolog.Logger + repo v1.Repository + alerter hatcheterrors.Alerter + p *partition.Partition + dbHealthInterval time.Duration + olapInterval time.Duration + workerInterval time.Duration + yesterdayRunCountHour uint + yesterdayRunCountMinute uint +} + +func defaultMetricsCollectorOpts() *MetricsCollectorOpts { + l := logger.NewDefaultLogger("metrics-collector") + alerter := hatcheterrors.NoOpAlerter{} + + return &MetricsCollectorOpts{ + l: &l, + alerter: alerter, + dbHealthInterval: 60 * time.Second, + olapInterval: 5 * time.Minute, + workerInterval: 60 * time.Second, + yesterdayRunCountHour: 0, + yesterdayRunCountMinute: 5, + } +} + +func WithLogger(l *zerolog.Logger) MetricsCollectorOpt { + return func(opts *MetricsCollectorOpts) { + opts.l = l + } +} + +func WithRepository(r v1.Repository) MetricsCollectorOpt { + return func(opts *MetricsCollectorOpts) { + opts.repo = r + } +} + +func WithAlerter(a hatcheterrors.Alerter) MetricsCollectorOpt { + return func(opts *MetricsCollectorOpts) { + opts.alerter = a + } +} + +func WithPartition(p *partition.Partition) MetricsCollectorOpt { + return func(opts *MetricsCollectorOpts) { + opts.p = p + } +} + +func WithIntervals(config server.CronOperationsConfigFile) MetricsCollectorOpt { + return func(opts *MetricsCollectorOpts) { + opts.dbHealthInterval = config.DBHealthMetricsInterval + opts.olapInterval = config.OLAPMetricsInterval + opts.workerInterval = config.WorkerMetricsInterval + opts.yesterdayRunCountHour = config.YesterdayRunCountHour + opts.yesterdayRunCountMinute = config.YesterdayRunCountMinute + } +} + +func New(fs ...MetricsCollectorOpt) (*MetricsCollectorImpl, error) { + opts := defaultMetricsCollectorOpts() + + for _, f := range fs { + f(opts) + } + + if opts.repo == nil { + return nil, fmt.Errorf("repository is required. use WithRepository") + } + + if opts.p == nil { + return nil, fmt.Errorf("partition is required. use WithPartition") + } + + newLogger := opts.l.With().Str("service", "metrics-collector").Logger() + opts.l = &newLogger + + recorder, err := telemetry.NewMetricsRecorder(context.Background()) + if err != nil { + return nil, fmt.Errorf("could not create metrics recorder: %w", err) + } + + s, err := gocron.NewScheduler(gocron.WithLocation(time.UTC)) + if err != nil { + return nil, fmt.Errorf("could not create scheduler: %w", err) + } + + a := hatcheterrors.NewWrapped(opts.alerter) + a.WithData(map[string]interface{}{"service": "metrics-collector"}) + + return &MetricsCollectorImpl{ + l: opts.l, + repo: opts.repo, + recorder: recorder, + s: s, + a: a, + p: opts.p, + dbHealthInterval: opts.dbHealthInterval, + olapInterval: opts.olapInterval, + workerInterval: opts.workerInterval, + yesterdayRunCountHour: opts.yesterdayRunCountHour, + yesterdayRunCountMinute: opts.yesterdayRunCountMinute, + }, nil +} + +func (mc *MetricsCollectorImpl) Start() (func() error, error) { + mc.s.Start() + + ctx := context.Background() + + // Collect database health metrics + _, err := mc.s.NewJob( + gocron.DurationJob(mc.dbHealthInterval), + gocron.NewTask(mc.collectDatabaseHealthMetrics(ctx)), + gocron.WithSingletonMode(gocron.LimitModeReschedule), + ) + if err != nil { + return nil, fmt.Errorf("could not schedule database health metrics collection: %w", err) + } + mc.l.Info().Str("interval", mc.dbHealthInterval.String()).Msg("scheduled database health metrics collection") + + // Collect OLAP metrics + _, err = mc.s.NewJob( + gocron.DurationJob(mc.olapInterval), + gocron.NewTask(mc.collectOLAPMetrics(ctx)), + gocron.WithSingletonMode(gocron.LimitModeReschedule), + ) + if err != nil { + return nil, fmt.Errorf("could not schedule OLAP metrics collection: %w", err) + } + mc.l.Info().Str("interval", mc.olapInterval.String()).Msg("scheduled OLAP metrics collection") + + // Collect worker metrics + _, err = mc.s.NewJob( + gocron.DurationJob(mc.workerInterval), + gocron.NewTask(mc.collectWorkerMetrics(ctx)), + gocron.WithSingletonMode(gocron.LimitModeReschedule), + ) + if err != nil { + return nil, fmt.Errorf("could not schedule worker metrics collection: %w", err) + } + mc.l.Info().Str("interval", mc.workerInterval.String()).Msg("scheduled worker metrics collection") + + // Collect yesterday's run count once per day + _, err = mc.s.NewJob( + gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(mc.yesterdayRunCountHour, mc.yesterdayRunCountMinute, 0))), + gocron.NewTask(mc.collectYesterdayRunCounts(ctx)), + gocron.WithSingletonMode(gocron.LimitModeReschedule), + ) + if err != nil { + return nil, fmt.Errorf("could not schedule yesterday run counts collection: %w", err) + } + mc.l.Info().Uint("hour", mc.yesterdayRunCountHour).Uint("minute", mc.yesterdayRunCountMinute).Msg("scheduled yesterday run counts collection") + + cleanup := func() error { + if err := mc.s.Shutdown(); err != nil { + return fmt.Errorf("could not shutdown scheduler: %w", err) + } + return nil + } + + return cleanup, nil +} + +func (mc *MetricsCollectorImpl) collectDatabaseHealthMetrics(ctx context.Context) func() { + return func() { + ctx, span := telemetry.NewSpan(ctx, "collect database_health_metrics") + defer span.End() + + // Only run on the engine instance that has control over the internal tenant + tenant, err := mc.p.GetInternalTenantForController(ctx) + if err != nil { + mc.l.Error().Err(err).Msg("could not get internal tenant") + return + } + + if tenant == nil { + // This engine instance doesn't have control over the internal tenant + return + } + + mc.l.Debug().Msg("collecting database health metrics") + + // Check if track_counts is enabled + trackCountsEnabled, err := mc.repo.PGHealth().TrackCountsEnabled(ctx) + if err != nil { + mc.l.Error().Err(err).Msg("failed to check track_counts setting") + } else if !trackCountsEnabled { + mc.l.Error().Msg("track_counts is disabled - database health metrics require track_counts = on. Run 'ALTER SYSTEM SET track_counts = on; SELECT pg_reload_conf();' to enable it.") + } + + // Check bloat + bloatStatus, bloatCount, err := mc.repo.PGHealth().CheckBloat(ctx) + if err != nil { + mc.l.Error().Err(err).Msg("failed to check database bloat") + } else { + mc.recorder.RecordDBBloat(ctx, int64(bloatCount), string(bloatStatus)) + mc.l.Debug().Int("count", bloatCount).Str("status", string(bloatStatus)).Msg("recorded database bloat metric") + } + + // Get detailed bloat metrics per table + bloatDetails, err := mc.repo.PGHealth().GetBloatDetails(ctx) + if err != nil { + mc.l.Error().Err(err).Msg("failed to get bloat details") + } else if len(bloatDetails) > 0 { + mc.l.Info().Int("table_count", len(bloatDetails)).Msg("recording bloat details per table") + for _, row := range bloatDetails { + if row.DeadPct.Valid { + deadPct, err := row.DeadPct.Float64Value() + if err == nil { + tableName := row.Tablename.String + mc.recorder.RecordDBBloatPercent(ctx, tableName, deadPct.Float64) + mc.l.Debug(). + Str("table", tableName). + Float64("dead_pct", deadPct.Float64). + Msg("recorded bloat percent metric") + } + } + } + } + + // Check long-running queries + _, longRunningCount, err := mc.repo.PGHealth().CheckLongRunningQueries(ctx) + if err != nil { + mc.l.Error().Err(err).Msg("failed to check long-running queries") + } else { + mc.recorder.RecordDBLongRunningQueries(ctx, int64(longRunningCount)) + mc.l.Debug().Int("count", longRunningCount).Msg("recorded long-running queries metric") + } + + // Check query cache hit ratios + tables, err := mc.repo.PGHealth().CheckQueryCaches(ctx) + if err != nil { + mc.l.Error().Err(err).Msg("failed to check query cache") + } else if len(tables) == 0 { + mc.l.Info().Msg("no query cache data available (pg_stat_statements may not be enabled or track_counts may be disabled)") + } else { + mc.l.Info().Int("table_count", len(tables)).Msg("recording query cache hit ratios") + for _, table := range tables { + tableName := table.Tablename.String + hitRatio := table.CacheHitRatioPct + mc.recorder.RecordDBQueryCacheHitRatio(ctx, tableName, hitRatio) + mc.l.Debug(). + Str("table", tableName). + Float64("hit_ratio", hitRatio). + Msg("recorded query cache hit ratio metric") + } + } + + // Check long-running vacuum + vacuumStatus, vacuumCount, err := mc.repo.PGHealth().CheckLongRunningVacuum(ctx) + if err != nil { + mc.l.Error().Err(err).Msg("failed to check long-running vacuum") + } else { + mc.recorder.RecordDBLongRunningVacuum(ctx, int64(vacuumCount), string(vacuumStatus)) + mc.l.Debug().Int("count", vacuumCount).Str("status", string(vacuumStatus)).Msg("recorded long-running vacuum metric") + } + + autovacuumRows, err := mc.repo.PGHealth().CheckLastAutovacuumForPartitionedTables(ctx) + if err != nil { + mc.l.Error().Err(err).Msg("failed to check last autovacuum for partitioned tables (OLAP DB)") + } else if len(autovacuumRows) == 0 { + mc.l.Warn().Msg("no partitioned tables found for autovacuum tracking (OLAP DB)") + } else { + mc.l.Info().Int("table_count", len(autovacuumRows)).Msg("recording last autovacuum metrics (OLAP DB)") + validCount := 0 + for _, row := range autovacuumRows { + if row.SecondsSinceLastAutovacuum.Valid { + seconds, err := row.SecondsSinceLastAutovacuum.Float64Value() + if err == nil { + tableName := row.Tablename.String + mc.recorder.RecordDBLastAutovacuumSecondsSince(ctx, tableName, seconds.Float64) + mc.l.Debug(). + Str("table", tableName). + Float64("seconds_since", seconds.Float64). + Msg("recorded last autovacuum metric (OLAP DB)") + validCount++ + } + } + } + if validCount == 0 { + mc.l.Warn().Int("table_count", len(autovacuumRows)).Msg("found partitioned tables but none have been autovacuumed yet (OLAP DB)") + } + } + + autovacuumRowsCoreDB, err := mc.repo.PGHealth().CheckLastAutovacuumForPartitionedTablesCoreDB(ctx) + if err != nil { + mc.l.Error().Err(err).Msg("failed to check last autovacuum for partitioned tables (CORE DB)") + } else if len(autovacuumRowsCoreDB) == 0 { + mc.l.Warn().Msg("no partitioned tables found for autovacuum tracking (CORE DB)") + } else { + mc.l.Info().Int("table_count", len(autovacuumRowsCoreDB)).Msg("recording last autovacuum metrics (CORE DB)") + validCount := 0 + for _, row := range autovacuumRowsCoreDB { + if row.SecondsSinceLastAutovacuum.Valid { + seconds, err := row.SecondsSinceLastAutovacuum.Float64Value() + if err == nil { + tableName := row.Tablename.String + mc.recorder.RecordDBLastAutovacuumSecondsSince(ctx, tableName, seconds.Float64) + mc.l.Debug(). + Str("table", tableName). + Float64("seconds_since", seconds.Float64). + Msg("recorded last autovacuum metric (CORE DB)") + validCount++ + } + } + } + if validCount == 0 { + mc.l.Warn().Int("table_count", len(autovacuumRowsCoreDB)).Msg("found partitioned tables but none have been autovacuumed yet (CORE DB)") + } + } + + mc.l.Debug().Msg("finished collecting database health metrics") + } +} + +func (mc *MetricsCollectorImpl) collectOLAPMetrics(ctx context.Context) func() { + return func() { + ctx, span := telemetry.NewSpan(ctx, "collect olap_metrics") + defer span.End() + + // Only run on the engine instance that has control over the internal tenant + tenant, err := mc.p.GetInternalTenantForController(ctx) + if err != nil { + mc.l.Error().Err(err).Msg("could not get internal tenant") + return + } + + if tenant == nil { + // This engine instance doesn't have control over the internal tenant + return + } + + mc.l.Debug().Msg("collecting OLAP metrics") + + // Count DAG status updates temp table size (instance-wide) + dagSize, err := mc.repo.OLAP().CountOLAPTempTableSizeForDAGStatusUpdates(ctx) + if err != nil { + mc.l.Error().Err(err).Msg("failed to count DAG temp table size") + } else { + mc.recorder.RecordOLAPTempTableSizeDAG(ctx, dagSize) + mc.l.Debug().Int64("size", dagSize).Msg("recorded DAG temp table size metric") + } + + // Count task status updates temp table size (instance-wide) + taskSize, err := mc.repo.OLAP().CountOLAPTempTableSizeForTaskStatusUpdates(ctx) + if err != nil { + mc.l.Error().Err(err).Msg("failed to count task temp table size") + } else { + mc.recorder.RecordOLAPTempTableSizeTask(ctx, taskSize) + mc.l.Debug().Int64("size", taskSize).Msg("recorded task temp table size metric") + } + + mc.l.Debug().Msg("finished collecting OLAP metrics") + } +} + +func (mc *MetricsCollectorImpl) collectYesterdayRunCounts(ctx context.Context) func() { + return func() { + ctx, span := telemetry.NewSpan(ctx, "collect yesterday_run_counts") + defer span.End() + + // Only run on the engine instance that has control over the internal tenant + tenant, err := mc.p.GetInternalTenantForController(ctx) + if err != nil { + mc.l.Error().Err(err).Msg("could not get internal tenant") + return + } + + if tenant == nil { + // This engine instance doesn't have control over the internal tenant + return + } + + mc.l.Debug().Msg("collecting yesterday's run counts") + + // Get yesterday's run counts by status (instance-wide) + runCounts, err := mc.repo.OLAP().ListYesterdayRunCountsByStatus(ctx) + if err != nil { + mc.l.Error().Err(err).Msg("failed to get yesterday's run counts") + return + } + + for status, count := range runCounts { + mc.recorder.RecordYesterdayRunCount(ctx, string(status), count) + mc.l.Debug().Str("status", string(status)).Int64("count", count).Msg("recorded yesterday run count metric") + } + + mc.l.Debug().Msg("finished collecting yesterday's run counts") + } +} + +func (mc *MetricsCollectorImpl) collectWorkerMetrics(ctx context.Context) func() { + return func() { + ctx, span := telemetry.NewSpan(ctx, "collect worker_metrics") + defer span.End() + + // Only run on the engine instance that has control over the internal tenant + tenant, err := mc.p.GetInternalTenantForController(ctx) + if err != nil { + mc.l.Error().Err(err).Msg("could not get internal tenant") + return + } + + if tenant == nil { + // This engine instance doesn't have control over the internal tenant + return + } + + mc.l.Debug().Msg("collecting worker metrics") + + // Count active slots per tenant + activeSlots, err := mc.repo.Workers().CountActiveSlotsPerTenant() + if err != nil { + mc.l.Error().Err(err).Msg("failed to count active slots per tenant") + } else if len(activeSlots) == 0 { + mc.l.Debug().Msg("no active worker slots found") + } else { + mc.l.Info().Int("tenant_count", len(activeSlots)).Msg("recording active slots metrics") + for tenantId, count := range activeSlots { + mc.recorder.RecordActiveSlots(ctx, tenantId, count) + mc.l.Debug().Str("tenant_id", tenantId).Int64("count", count).Msg("recorded active slots metric") + } + } + + // Count active workers per tenant + activeWorkers, err := mc.repo.Workers().CountActiveWorkersPerTenant() + if err != nil { + mc.l.Error().Err(err).Msg("failed to count active workers per tenant") + } else if len(activeWorkers) == 0 { + mc.l.Debug().Msg("no active workers found") + } else { + mc.l.Info().Int("tenant_count", len(activeWorkers)).Msg("recording active workers metrics") + for tenantId, count := range activeWorkers { + mc.recorder.RecordActiveWorkers(ctx, tenantId, count) + mc.l.Debug().Str("tenant_id", tenantId).Int64("count", count).Msg("recorded active workers metric") + } + } + + // Count active SDKs per tenant + activeSDKs, err := mc.repo.Workers().ListActiveSDKsPerTenant() + if err != nil { + mc.l.Error().Err(err).Msg("failed to list active SDKs per tenant") + } else if len(activeSDKs) == 0 { + mc.l.Debug().Msg("no active SDKs found") + } else { + mc.l.Info().Int("sdk_count", len(activeSDKs)).Msg("recording active SDKs metrics") + for tuple, count := range activeSDKs { + sdkInfo := telemetry.SDKInfo{ + OperatingSystem: tuple.SDK.OperatingSystem, + Language: tuple.SDK.Language, + LanguageVersion: tuple.SDK.LanguageVersion, + SdkVersion: tuple.SDK.SdkVersion, + } + mc.recorder.RecordActiveSDKs(ctx, tuple.TenantId, sdkInfo, count) + mc.l.Debug(). + Str("tenant_id", tuple.TenantId). + Int64("count", count). + Str("sdk_language", sdkInfo.Language). + Str("sdk_version", sdkInfo.SdkVersion). + Msg("recorded active SDKs metric") + } + } + + mc.l.Debug().Msg("finished collecting worker metrics") + } +} diff --git a/internal/services/controllers/v1/task/process_payload_wal.go b/internal/services/controllers/v1/task/process_payload_wal.go index a3630b907..e80d59d4e 100644 --- a/internal/services/controllers/v1/task/process_payload_wal.go +++ b/internal/services/controllers/v1/task/process_payload_wal.go @@ -3,8 +3,9 @@ package task import ( "context" - "github.com/hatchet-dev/hatchet/pkg/telemetry" "go.opentelemetry.io/otel/codes" + + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) func (tc *TasksControllerImpl) processPayloadExternalCutovers(ctx context.Context) func() { diff --git a/internal/services/dispatcher/dispatcher_v1.go b/internal/services/dispatcher/dispatcher_v1.go index b2f47fb2f..f1b8f8c4b 100644 --- a/internal/services/dispatcher/dispatcher_v1.go +++ b/internal/services/dispatcher/dispatcher_v1.go @@ -231,12 +231,13 @@ func (d *DispatcherImpl) handleTaskBulkAssignedTask(ctx context.Context, msg *ms ) if err != nil { - multiErr = multierror.Append( - multiErr, - fmt.Errorf("could not create monitoring event for task %d: %w", task.ID, err), - ) + d.l.Error().Err(err).Int64("task_id", task.ID).Msg("could not create monitoring event") } else { - defer d.pubBuffer.Pub(ctx, msgqueuev1.OLAP_QUEUE, msg, false) + defer func() { + if err := d.pubBuffer.Pub(ctx, msgqueuev1.OLAP_QUEUE, msg, false); err != nil { + d.l.Error().Err(err).Msg("could not publish monitoring event") + } + }() } return nil diff --git a/pkg/client/dispatcher.go b/pkg/client/dispatcher.go index adbd44371..73e82cb89 100644 --- a/pkg/client/dispatcher.go +++ b/pkg/client/dispatcher.go @@ -11,9 +11,9 @@ import ( "github.com/rs/zerolog" "google.golang.org/grpc" "google.golang.org/grpc/codes" + _ "google.golang.org/grpc/encoding/gzip" // Register gzip compression codec "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/timestamppb" - _ "google.golang.org/grpc/encoding/gzip" // Register gzip compression codec dispatchercontracts "github.com/hatchet-dev/hatchet/internal/services/dispatcher/contracts" sharedcontracts "github.com/hatchet-dev/hatchet/internal/services/shared/proto/v1" diff --git a/pkg/config/server/server.go b/pkg/config/server/server.go index 5c39909a2..f7c57c4b8 100644 --- a/pkg/config/server/server.go +++ b/pkg/config/server/server.go @@ -132,6 +132,21 @@ type CronOperationsConfigFile struct { // OLAPAnalyzeCronInterval is the interval for the olap analyze cron operation OLAPAnalyzeCronInterval time.Duration `mapstructure:"olapAnalyzeCronInterval" json:"olapAnalyzeCronInterval,omitempty" default:"3h"` + + // DBHealthMetricsInterval is the interval for collecting database health metrics + DBHealthMetricsInterval time.Duration `mapstructure:"dbHealthMetricsInterval" json:"dbHealthMetricsInterval,omitempty" default:"60s"` + + // OLAPMetricsInterval is the interval for collecting OLAP metrics + OLAPMetricsInterval time.Duration `mapstructure:"olapMetricsInterval" json:"olapMetricsInterval,omitempty" default:"5m"` + + // WorkerMetricsInterval is the interval for collecting worker metrics + WorkerMetricsInterval time.Duration `mapstructure:"workerMetricsInterval" json:"workerMetricsInterval,omitempty" default:"60s"` + + // YesterdayRunCountHour is the hour (0-23) at which to collect yesterday's run count metrics + YesterdayRunCountHour uint `mapstructure:"yesterdayRunCountHour" json:"yesterdayRunCountHour,omitempty" default:"0"` + + // YesterdayRunCountMinute is the minute (0-59) at which to collect yesterday's run count metrics + YesterdayRunCountMinute uint `mapstructure:"yesterdayRunCountMinute" json:"yesterdayRunCountMinute,omitempty" default:"5"` } // OLAPStatusUpdateConfigFile is the configuration for OLAP status updates @@ -873,6 +888,7 @@ func BindAllEnv(v *viper.Viper) { _ = v.BindEnv("otel.traceIdRatio", "SERVER_OTEL_TRACE_ID_RATIO") _ = v.BindEnv("otel.insecure", "SERVER_OTEL_INSECURE") _ = v.BindEnv("otel.collectorAuth", "SERVER_OTEL_COLLECTOR_AUTH") + _ = v.BindEnv("otel.metricsEnabled", "SERVER_OTEL_METRICS_ENABLED") // prometheus options _ = v.BindEnv("prometheus.prometheusServerURL", "SERVER_PROMETHEUS_SERVER_URL") diff --git a/pkg/config/shared/shared.go b/pkg/config/shared/shared.go index 22a5d743d..2ef38d85f 100644 --- a/pkg/config/shared/shared.go +++ b/pkg/config/shared/shared.go @@ -25,6 +25,7 @@ type OpenTelemetryConfigFile struct { TraceIdRatio string `mapstructure:"traceIdRatio" json:"traceIdRatio,omitempty" default:"1"` Insecure bool `mapstructure:"insecure" json:"insecure,omitempty" default:"false"` CollectorAuth string `mapstructure:"collectorAuth" json:"collectorAuth,omitempty"` + MetricsEnabled bool `mapstructure:"metricsEnabled" json:"metricsEnabled,omitempty" default:"false"` } type PrometheusConfigFile struct { diff --git a/pkg/repository/v1/olap.go b/pkg/repository/v1/olap.go index 615621766..87575cebe 100644 --- a/pkg/repository/v1/olap.go +++ b/pkg/repository/v1/olap.go @@ -270,6 +270,10 @@ type OLAPRepository interface { ListWorkflowRunExternalIds(ctx context.Context, tenantId string, opts ListWorkflowRunOpts) ([]pgtype.UUID, error) ProcessOLAPPayloadCutovers(ctx context.Context, externalStoreEnabled bool, inlineStoreTTL *time.Duration, externalCutoverBatchSize, externalCutoverNumConcurrentOffloads int32) error + + CountOLAPTempTableSizeForDAGStatusUpdates(ctx context.Context) (int64, error) + CountOLAPTempTableSizeForTaskStatusUpdates(ctx context.Context) (int64, error) + ListYesterdayRunCountsByStatus(ctx context.Context) (map[sqlcv1.V1ReadableStatusOlap]int64, error) } type StatusUpdateBatchSizeLimits struct { @@ -2461,7 +2465,7 @@ func (r *OLAPRepositoryImpl) PutPayloads(ctx context.Context, tx sqlcv1.DBTX, te for i, opt := range putPayloadOpts { storeOpts := OffloadToExternalStoreOpts{ - TenantId: TenantID(tenantId), + TenantId: tenantId, ExternalID: PayloadExternalId(opt.ExternalId.String()), InsertedAt: opt.InsertedAt, Payload: opt.Payload, @@ -2745,6 +2749,30 @@ func (r *OLAPRepositoryImpl) StatusUpdateBatchSizeLimits() StatusUpdateBatchSize return r.statusUpdateBatchSizeLimits } +func (r *OLAPRepositoryImpl) CountOLAPTempTableSizeForDAGStatusUpdates(ctx context.Context) (int64, error) { + return r.queries.CountOLAPTempTableSizeForDAGStatusUpdates(ctx, r.readPool) +} + +func (r *OLAPRepositoryImpl) CountOLAPTempTableSizeForTaskStatusUpdates(ctx context.Context) (int64, error) { + return r.queries.CountOLAPTempTableSizeForTaskStatusUpdates(ctx, r.readPool) +} + +func (r *OLAPRepositoryImpl) ListYesterdayRunCountsByStatus(ctx context.Context) (map[sqlcv1.V1ReadableStatusOlap]int64, error) { + rows, err := r.queries.ListYesterdayRunCountsByStatus(ctx, r.readPool) + + if err != nil { + return nil, err + } + + statusToCount := make(map[sqlcv1.V1ReadableStatusOlap]int64) + + for _, row := range rows { + statusToCount[row.ReadableStatus] = row.Count + } + + return statusToCount, nil +} + type BulkCutOverOLAPPayload struct { TenantID pgtype.UUID InsertedAt pgtype.Timestamptz diff --git a/pkg/repository/v1/payloadstore.go b/pkg/repository/v1/payloadstore.go index dbd903f7f..7d753688a 100644 --- a/pkg/repository/v1/payloadstore.go +++ b/pkg/repository/v1/payloadstore.go @@ -10,15 +10,16 @@ import ( "time" "github.com/google/uuid" - "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" - "github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1" - "github.com/hatchet-dev/hatchet/pkg/telemetry" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" "github.com/rs/zerolog" "go.opentelemetry.io/otel/attribute" "golang.org/x/sync/errgroup" + + "github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers" + "github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1" + "github.com/hatchet-dev/hatchet/pkg/telemetry" ) type StorePayloadOpts struct { diff --git a/pkg/repository/v1/pg_health.go b/pkg/repository/v1/pg_health.go new file mode 100644 index 000000000..379d405c9 --- /dev/null +++ b/pkg/repository/v1/pg_health.go @@ -0,0 +1,273 @@ +package v1 + +import ( + "context" + "time" + + "github.com/hatchet-dev/hatchet/pkg/repository/cache" + "github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1" +) + +// PGHealthRepository provides database health monitoring functionality. +// +// Important: Many health checks rely on PostgreSQL's statistics collector, +// which requires track_counts = on (enabled by default). If track_counts +// is disabled, queries against pg_stat_user_tables and pg_statio_user_tables +// will return empty results, and health checks will report no data available. +// +// To verify track_counts is enabled, run: +// SHOW track_counts; +// +// To enable track_counts (requires superuser or appropriate permissions): +// ALTER SYSTEM SET track_counts = on; +// SELECT pg_reload_conf(); + +type PGHealthError string + +const ( + PGHealthAlert PGHealthError = "alert" + PGHealthWarn PGHealthError = "warn" + PGHealthOK PGHealthError = "ok" +) + +type PGHealthRepository interface { + PGStatStatementsEnabled(ctx context.Context) (bool, error) + TrackCountsEnabled(ctx context.Context) (bool, error) + CheckBloat(ctx context.Context) (PGHealthError, int, error) + GetBloatDetails(ctx context.Context) ([]*sqlcv1.CheckBloatRow, error) + CheckLongRunningQueries(ctx context.Context) (PGHealthError, int, error) + CheckQueryCache(ctx context.Context) (PGHealthError, int, error) + CheckQueryCaches(ctx context.Context) ([]*sqlcv1.CheckQueryCachesRow, error) + CheckLongRunningVacuum(ctx context.Context) (PGHealthError, int, error) + CheckLastAutovacuumForPartitionedTables(ctx context.Context) ([]*sqlcv1.CheckLastAutovacuumForPartitionedTablesRow, error) + CheckLastAutovacuumForPartitionedTablesCoreDB(ctx context.Context) ([]*sqlcv1.CheckLastAutovacuumForPartitionedTablesCoreDBRow, error) +} + +type pgHealthRepository struct { + *sharedRepository + pgStatStatementsCache *cache.Cache + trackCountsCache *cache.Cache +} + +const ( + pgStatStatementsCacheKey = "pg_stat_statements_enabled" + trackCountsCacheKey = "track_counts_enabled" +) + +func newPGHealthRepository(shared *sharedRepository) *pgHealthRepository { + return &pgHealthRepository{ + sharedRepository: shared, + pgStatStatementsCache: cache.New(10 * time.Minute), + trackCountsCache: cache.New(10 * time.Minute), + } +} + +func (h *pgHealthRepository) PGStatStatementsEnabled(ctx context.Context) (bool, error) { + if cached, ok := h.pgStatStatementsCache.Get(pgStatStatementsCacheKey); ok { + if enabled, ok := cached.(bool); ok { + return enabled, nil + } + } + + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + count, err := h.queries.CheckPGStatStatementsEnabled(ctx, h.pool) + if err != nil { + return false, err + } + + enabled := count > 0 + h.pgStatStatementsCache.Set(pgStatStatementsCacheKey, enabled) + + return enabled, nil +} + +func (h *pgHealthRepository) TrackCountsEnabled(ctx context.Context) (bool, error) { + if cached, ok := h.trackCountsCache.Get(trackCountsCacheKey); ok { + if enabled, ok := cached.(bool); ok { + return enabled, nil + } + } + + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + var setting string + err := h.pool.QueryRow(ctx, "SHOW track_counts").Scan(&setting) + if err != nil { + return false, err + } + + enabled := setting == "on" + h.trackCountsCache.Set(trackCountsCacheKey, enabled) + + return enabled, nil +} + +func (h *pgHealthRepository) CheckBloat(ctx context.Context) (PGHealthError, int, error) { + + enabled, err := h.PGStatStatementsEnabled(ctx) + + if err != nil { + return PGHealthOK, 0, err + } + + if !enabled { + return PGHealthOK, 0, nil + } + + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + rows, err := h.queries.CheckBloat(ctx, h.pool) + if err != nil { + return PGHealthOK, 0, err + } + + if len(rows) == 0 { + return PGHealthOK, 0, nil + } + + return PGHealthWarn, len(rows), nil +} + +func (h *pgHealthRepository) GetBloatDetails(ctx context.Context) ([]*sqlcv1.CheckBloatRow, error) { + enabled, err := h.PGStatStatementsEnabled(ctx) + + if err != nil { + return nil, err + } + + if !enabled { + return nil, nil + } + + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + return h.queries.CheckBloat(ctx, h.pool) +} + +func (h *pgHealthRepository) CheckLongRunningQueries(ctx context.Context) (PGHealthError, int, error) { + + enabled, err := h.PGStatStatementsEnabled(ctx) + + if err != nil { + return PGHealthOK, 0, err + } + + if !enabled { + return PGHealthOK, 0, nil + } + + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + rows, err := h.queries.CheckLongRunningQueries(ctx, h.pool) + if err != nil { + return PGHealthOK, 0, err + } + + return PGHealthOK, len(rows), nil +} + +func (h *pgHealthRepository) CheckQueryCache(ctx context.Context) (PGHealthError, int, error) { + enabled, err := h.PGStatStatementsEnabled(ctx) + + if err != nil { + return PGHealthOK, 0, err + } + + if !enabled { + return PGHealthOK, 0, nil + } + + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + tables, err := h.queries.CheckQueryCaches(ctx, h.pool) + if err != nil { + return PGHealthOK, 0, err + } + + problemTables := make(map[string]float64) + + for _, table := range tables { + hitRatio := table.CacheHitRatioPct + + if hitRatio < 95.0 && hitRatio > 10.0 { + problemTables[table.Tablename.String] = hitRatio + } + } + + if len(problemTables) > 0 { + return PGHealthWarn, len(problemTables), nil + } + + return PGHealthOK, 0, nil +} + +func (h *pgHealthRepository) CheckQueryCaches(ctx context.Context) ([]*sqlcv1.CheckQueryCachesRow, error) { + enabled, err := h.PGStatStatementsEnabled(ctx) + + if err != nil { + return nil, err + } + + if !enabled { + return nil, nil + } + + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + return h.queries.CheckQueryCaches(ctx, h.pool) +} + +func (h *pgHealthRepository) CheckLongRunningVacuum(ctx context.Context) (PGHealthError, int, error) { + + enabled, err := h.PGStatStatementsEnabled(ctx) + + if err != nil { + return PGHealthOK, 0, err + } + + if !enabled { + return PGHealthOK, 0, nil + } + + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + rows, err := h.queries.LongRunningVacuum(ctx, h.pool) + if err != nil { + return PGHealthOK, 0, err + } + + if len(rows) == 0 { + return PGHealthOK, 0, nil + } + + for _, row := range rows { + if row.ElapsedTime > int32(time.Hour.Seconds()*10) { + return PGHealthAlert, len(rows), nil + } + } + + return PGHealthWarn, len(rows), nil +} + +func (h *pgHealthRepository) CheckLastAutovacuumForPartitionedTables(ctx context.Context) ([]*sqlcv1.CheckLastAutovacuumForPartitionedTablesRow, error) { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + return h.queries.CheckLastAutovacuumForPartitionedTables(ctx, h.pool) +} + +func (h *pgHealthRepository) CheckLastAutovacuumForPartitionedTablesCoreDB(ctx context.Context) ([]*sqlcv1.CheckLastAutovacuumForPartitionedTablesCoreDBRow, error) { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + return h.queries.CheckLastAutovacuumForPartitionedTablesCoreDB(ctx, h.pool) +} diff --git a/pkg/repository/v1/repository.go b/pkg/repository/v1/repository.go index 1a80b13c3..cd15a1bad 100644 --- a/pkg/repository/v1/repository.go +++ b/pkg/repository/v1/repository.go @@ -35,6 +35,7 @@ type Repository interface { Webhooks() WebhookRepository Idempotency() IdempotencyRepository IntervalSettings() IntervalSettingsRepository + PGHealth() PGHealthRepository } type repositoryImpl struct { @@ -52,6 +53,7 @@ type repositoryImpl struct { payloadStore PayloadStoreRepository idempotency IdempotencyRepository intervals IntervalSettingsRepository + pgHealth PGHealthRepository } func NewRepository( @@ -83,6 +85,7 @@ func NewRepository( payloadStore: shared.payloadStore, idempotency: newIdempotencyRepository(shared), intervals: newIntervalSettingsRepository(shared), + pgHealth: newPGHealthRepository(shared), } return impl, func() error { @@ -157,3 +160,7 @@ func (r *repositoryImpl) Idempotency() IdempotencyRepository { func (r *repositoryImpl) IntervalSettings() IntervalSettingsRepository { return r.intervals } + +func (r *repositoryImpl) PGHealth() PGHealthRepository { + return r.pgHealth +} diff --git a/pkg/repository/v1/sqlcv1/models.go b/pkg/repository/v1/sqlcv1/models.go index 0a7305cde..4251c6429 100644 --- a/pkg/repository/v1/sqlcv1/models.go +++ b/pkg/repository/v1/sqlcv1/models.go @@ -7,6 +7,7 @@ package sqlcv1 import ( "database/sql/driver" "fmt" + "net/netip" "github.com/jackc/pgx/v5/pgtype" ) @@ -2432,6 +2433,102 @@ type MessageQueueItem struct { Status MessageQueueItemStatus `json:"status"` } +type PgAvailableExtensions struct { + Name pgtype.Text `json:"name"` + DefaultVersion pgtype.Text `json:"default_version"` + InstalledVersion pgtype.Text `json:"installed_version"` + Comment pgtype.Text `json:"comment"` +} + +type PgStatActivity struct { + Pid pgtype.Int4 `json:"pid"` + Usename pgtype.Text `json:"usename"` + ApplicationName pgtype.Text `json:"application_name"` + ClientAddr *netip.Addr `json:"client_addr"` + State pgtype.Text `json:"state"` + QueryStart pgtype.Timestamptz `json:"query_start"` + WaitEventType pgtype.Text `json:"wait_event_type"` + WaitEvent pgtype.Text `json:"wait_event"` + Query pgtype.Text `json:"query"` +} + +type PgStatProgressVacuum struct { + Pid pgtype.Int4 `json:"pid"` + Datid pgtype.Uint32 `json:"datid"` + Datname pgtype.Text `json:"datname"` + Relid pgtype.Uint32 `json:"relid"` + Phase pgtype.Text `json:"phase"` + HeapBlksTotal pgtype.Int8 `json:"heap_blks_total"` + HeapBlksScanned pgtype.Int8 `json:"heap_blks_scanned"` + HeapBlksVacuumed pgtype.Int8 `json:"heap_blks_vacuumed"` + HeapBlksFrozen pgtype.Int8 `json:"heap_blks_frozen"` + IndexVacuumCount pgtype.Int8 `json:"index_vacuum_count"` + MaxDeadTuples pgtype.Int8 `json:"max_dead_tuples"` + NumDeadTuples pgtype.Int8 `json:"num_dead_tuples"` +} + +type PgStatStatements struct { + Userid pgtype.Uint32 `json:"userid"` + Dbid pgtype.Uint32 `json:"dbid"` + Queryid pgtype.Int8 `json:"queryid"` + Query pgtype.Text `json:"query"` + Calls pgtype.Int8 `json:"calls"` + TotalExecTime pgtype.Float8 `json:"total_exec_time"` + Rows pgtype.Int8 `json:"rows"` + SharedBlksHit pgtype.Int8 `json:"shared_blks_hit"` + SharedBlksRead pgtype.Int8 `json:"shared_blks_read"` + SharedBlksDirtied pgtype.Int8 `json:"shared_blks_dirtied"` + SharedBlksWritten pgtype.Int8 `json:"shared_blks_written"` + LocalBlksHit pgtype.Int8 `json:"local_blks_hit"` + LocalBlksRead pgtype.Int8 `json:"local_blks_read"` + LocalBlksDirtied pgtype.Int8 `json:"local_blks_dirtied"` + LocalBlksWritten pgtype.Int8 `json:"local_blks_written"` + TempBlksRead pgtype.Int8 `json:"temp_blks_read"` + TempBlksWritten pgtype.Int8 `json:"temp_blks_written"` + BlkReadTime pgtype.Float8 `json:"blk_read_time"` + BlkWriteTime pgtype.Float8 `json:"blk_write_time"` +} + +type PgStatUserTables struct { + Relid pgtype.Uint32 `json:"relid"` + Schemaname pgtype.Text `json:"schemaname"` + Relname pgtype.Text `json:"relname"` + SeqScan pgtype.Int8 `json:"seq_scan"` + SeqTupRead pgtype.Int8 `json:"seq_tup_read"` + IdxScan pgtype.Int8 `json:"idx_scan"` + IdxTupFetch pgtype.Int8 `json:"idx_tup_fetch"` + NTupIns pgtype.Int8 `json:"n_tup_ins"` + NTupUpd pgtype.Int8 `json:"n_tup_upd"` + NTupDel pgtype.Int8 `json:"n_tup_del"` + NTupHotUpd pgtype.Int8 `json:"n_tup_hot_upd"` + NLiveTup pgtype.Int8 `json:"n_live_tup"` + NDeadTup pgtype.Int8 `json:"n_dead_tup"` + NModSinceAnalyze pgtype.Int8 `json:"n_mod_since_analyze"` + NInsSinceVacuum pgtype.Int8 `json:"n_ins_since_vacuum"` + VacuumCount pgtype.Int8 `json:"vacuum_count"` + AutovacuumCount pgtype.Int8 `json:"autovacuum_count"` + AnalyzeCount pgtype.Int8 `json:"analyze_count"` + AutoanalyzeCount pgtype.Int8 `json:"autoanalyze_count"` + LastVacuum pgtype.Timestamptz `json:"last_vacuum"` + LastAutovacuum pgtype.Timestamptz `json:"last_autovacuum"` + LastAnalyze pgtype.Timestamptz `json:"last_analyze"` + LastAutoanalyze pgtype.Timestamptz `json:"last_autoanalyze"` +} + +type PgStatioUserTables struct { + Relid pgtype.Uint32 `json:"relid"` + Schemaname pgtype.Text `json:"schemaname"` + Relname pgtype.Text `json:"relname"` + HeapBlksRead pgtype.Int8 `json:"heap_blks_read"` + HeapBlksHit pgtype.Int8 `json:"heap_blks_hit"` + IdxBlksRead pgtype.Int8 `json:"idx_blks_read"` + IdxBlksHit pgtype.Int8 `json:"idx_blks_hit"` + ToastBlksRead pgtype.Int8 `json:"toast_blks_read"` + ToastBlksHit pgtype.Int8 `json:"toast_blks_hit"` + TidxBlksRead pgtype.Int8 `json:"tidx_blks_read"` + TidxBlksHit pgtype.Int8 `json:"tidx_blks_hit"` +} + type Queue struct { ID int64 `json:"id"` TenantId pgtype.UUID `json:"tenantId"` diff --git a/pkg/repository/v1/sqlcv1/olap.sql b/pkg/repository/v1/sqlcv1/olap.sql index ec7bccfd7..b4c383fec 100644 --- a/pkg/repository/v1/sqlcv1/olap.sql +++ b/pkg/repository/v1/sqlcv1/olap.sql @@ -1989,6 +1989,37 @@ WHERE ) ; +-- name: CountOLAPTempTableSizeForDAGStatusUpdates :one +SELECT COUNT(*) AS total +FROM v1_task_status_updates_tmp +; + +-- name: CountOLAPTempTableSizeForTaskStatusUpdates :one +SELECT COUNT(*) AS total +FROM v1_task_events_olap_tmp +; + +-- name: ListYesterdayRunCountsByStatus :many +SELECT readable_status, COUNT(*) +FROM v1_runs_olap +WHERE inserted_at::DATE = (NOW() - INTERVAL '1 day')::DATE +GROUP BY readable_status +; + +-- name: CheckLastAutovacuumForPartitionedTables :many +SELECT + s.schemaname, + s.relname AS tablename, + s.last_autovacuum, + EXTRACT(EPOCH FROM (NOW() - s.last_autovacuum)) AS seconds_since_last_autovacuum +FROM pg_stat_user_tables s +JOIN pg_catalog.pg_class c ON c.oid = (quote_ident(s.schemaname)||'.'||quote_ident(s.relname))::regclass +WHERE s.schemaname = 'public' + AND c.relispartition = true + AND c.relkind = 'r' +ORDER BY s.last_autovacuum ASC NULLS LAST +; + -- name: ListPaginatedOLAPPayloadsForOffload :many WITH payloads AS ( SELECT diff --git a/pkg/repository/v1/sqlcv1/olap.sql.go b/pkg/repository/v1/sqlcv1/olap.sql.go index 4e2597615..95830ceae 100644 --- a/pkg/repository/v1/sqlcv1/olap.sql.go +++ b/pkg/repository/v1/sqlcv1/olap.sql.go @@ -158,6 +158,52 @@ type BulkCreateEventTriggersParams struct { FilterID pgtype.UUID `json:"filter_id"` } +const checkLastAutovacuumForPartitionedTables = `-- name: CheckLastAutovacuumForPartitionedTables :many +SELECT + s.schemaname, + s.relname AS tablename, + s.last_autovacuum, + EXTRACT(EPOCH FROM (NOW() - s.last_autovacuum)) AS seconds_since_last_autovacuum +FROM pg_stat_user_tables s +JOIN pg_catalog.pg_class c ON c.oid = (quote_ident(s.schemaname)||'.'||quote_ident(s.relname))::regclass +WHERE s.schemaname = 'public' + AND c.relispartition = true + AND c.relkind = 'r' +ORDER BY s.last_autovacuum ASC NULLS LAST +` + +type CheckLastAutovacuumForPartitionedTablesRow struct { + Schemaname pgtype.Text `json:"schemaname"` + Tablename pgtype.Text `json:"tablename"` + LastAutovacuum pgtype.Timestamptz `json:"last_autovacuum"` + SecondsSinceLastAutovacuum pgtype.Numeric `json:"seconds_since_last_autovacuum"` +} + +func (q *Queries) CheckLastAutovacuumForPartitionedTables(ctx context.Context, db DBTX) ([]*CheckLastAutovacuumForPartitionedTablesRow, error) { + rows, err := db.Query(ctx, checkLastAutovacuumForPartitionedTables) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*CheckLastAutovacuumForPartitionedTablesRow + for rows.Next() { + var i CheckLastAutovacuumForPartitionedTablesRow + if err := rows.Scan( + &i.Schemaname, + &i.Tablename, + &i.LastAutovacuum, + &i.SecondsSinceLastAutovacuum, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const cleanUpOLAPCutoverJobOffsets = `-- name: CleanUpOLAPCutoverJobOffsets :exec DELETE FROM v1_payload_cutover_job_offset WHERE NOT key = ANY($1::DATE[]) @@ -255,6 +301,30 @@ func (q *Queries) CountEvents(ctx context.Context, db DBTX, arg CountEventsParam return count, err } +const countOLAPTempTableSizeForDAGStatusUpdates = `-- name: CountOLAPTempTableSizeForDAGStatusUpdates :one +SELECT COUNT(*) AS total +FROM v1_task_status_updates_tmp +` + +func (q *Queries) CountOLAPTempTableSizeForDAGStatusUpdates(ctx context.Context, db DBTX) (int64, error) { + row := db.QueryRow(ctx, countOLAPTempTableSizeForDAGStatusUpdates) + var total int64 + err := row.Scan(&total) + return total, err +} + +const countOLAPTempTableSizeForTaskStatusUpdates = `-- name: CountOLAPTempTableSizeForTaskStatusUpdates :one +SELECT COUNT(*) AS total +FROM v1_task_events_olap_tmp +` + +func (q *Queries) CountOLAPTempTableSizeForTaskStatusUpdates(ctx context.Context, db DBTX) (int64, error) { + row := db.QueryRow(ctx, countOLAPTempTableSizeForTaskStatusUpdates) + var total int64 + err := row.Scan(&total) + return total, err +} + type CreateDAGsOLAPParams struct { TenantID pgtype.UUID `json:"tenant_id"` ID int64 `json:"id"` @@ -1948,6 +2018,38 @@ func (q *Queries) ListWorkflowRunExternalIds(ctx context.Context, db DBTX, arg L return items, nil } +const listYesterdayRunCountsByStatus = `-- name: ListYesterdayRunCountsByStatus :many +SELECT readable_status, COUNT(*) +FROM v1_runs_olap +WHERE inserted_at::DATE = (NOW() - INTERVAL '1 day')::DATE +GROUP BY readable_status +` + +type ListYesterdayRunCountsByStatusRow struct { + ReadableStatus V1ReadableStatusOlap `json:"readable_status"` + Count int64 `json:"count"` +} + +func (q *Queries) ListYesterdayRunCountsByStatus(ctx context.Context, db DBTX) ([]*ListYesterdayRunCountsByStatusRow, error) { + rows, err := db.Query(ctx, listYesterdayRunCountsByStatus) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*ListYesterdayRunCountsByStatusRow + for rows.Next() { + var i ListYesterdayRunCountsByStatusRow + if err := rows.Scan(&i.ReadableStatus, &i.Count); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const markOLAPCutoverJobAsCompleted = `-- name: MarkOLAPCutoverJobAsCompleted :exec UPDATE v1_payloads_olap_cutover_job_offset SET is_completed = TRUE diff --git a/pkg/repository/v1/sqlcv1/pg_health.sql b/pkg/repository/v1/sqlcv1/pg_health.sql new file mode 100644 index 000000000..da830c201 --- /dev/null +++ b/pkg/repository/v1/sqlcv1/pg_health.sql @@ -0,0 +1,95 @@ +-- name: CheckPGStatStatementsEnabled :one +SELECT COUNT(*) FROM pg_available_extensions WHERE name = 'pg_stat_statements'; + +-- name: CheckBloat :many +-- Note: Requires track_counts = on (enabled by default in PostgreSQL) +SELECT + schemaname, + relname AS tablename, + pg_size_pretty(pg_total_relation_size(quote_ident(schemaname)||'.'||quote_ident(relname))) AS total_size, + pg_size_pretty(pg_relation_size(quote_ident(schemaname)||'.'||quote_ident(relname))) AS table_size, + n_live_tup AS live_tuples, + n_dead_tup AS dead_tuples, + ROUND(100 * n_dead_tup::numeric / NULLIF(n_live_tup, 0), 2) AS dead_pct, + last_vacuum, + last_autovacuum, + last_analyze +FROM + pg_stat_user_tables +WHERE + n_dead_tup > 1000 + AND n_live_tup > 1000 + AND ROUND(100 * n_dead_tup::numeric / NULLIF(n_live_tup, 0), 2) > 50 + AND relname NOT IN ( + 'Lease' + ) +ORDER BY + dead_pct DESC NULLS LAST; + +-- name: CheckLongRunningQueries :many +SELECT + pid, + usename, + application_name, + client_addr, + state, + now() - query_start AS duration, + query +FROM + pg_stat_activity +WHERE + state = 'active' + AND now() - query_start > interval '5 minutes' + AND query NOT LIKE 'autovacuum:%' + AND query NOT LIKE '%pg_stat_activity%' +ORDER BY + query_start; + +-- name: CheckQueryCaches :many +-- Note: Requires track_counts = on (enabled by default in PostgreSQL) +SELECT + schemaname, + relname AS tablename, + heap_blks_read, + heap_blks_hit, + heap_blks_hit + heap_blks_read AS total_reads, + ROUND( + 100.0 * heap_blks_hit / NULLIF(heap_blks_hit + heap_blks_read, 0), + 2 + )::float AS cache_hit_ratio_pct, + pg_size_pretty(pg_total_relation_size(quote_ident(schemaname)||'.'||quote_ident(relname))) AS total_size +FROM + pg_statio_user_tables +WHERE + heap_blks_read + heap_blks_hit > 0 +ORDER BY + heap_blks_read DESC +LIMIT 20; + +-- name: LongRunningVacuum :many +SELECT + p.pid, + p.relid::regclass AS table_name, + pg_size_pretty(pg_total_relation_size(p.relid)) AS table_size, + p.phase, + p.heap_blks_total AS total_blocks, + p.heap_blks_scanned AS scanned_blocks, + ROUND(100.0 * p.heap_blks_scanned / NULLIF(p.heap_blks_total, 0), 2) AS pct_scanned, + p.heap_blks_vacuumed AS vacuumed_blocks, + p.index_vacuum_count, + now() - a.query_start AS elapsed_time, + CASE + WHEN p.heap_blks_scanned > 0 THEN + ((now() - a.query_start) * (p.heap_blks_total - p.heap_blks_scanned) / p.heap_blks_scanned) + ELSE interval '0' + END AS estimated_time_remaining, + a.wait_event_type, + a.wait_event, + a.query +FROM + pg_stat_progress_vacuum p + JOIN pg_stat_activity a ON p.pid = a.pid +WHERE + now() - a.query_start > interval '3 hours' +ORDER BY + a.query_start; diff --git a/pkg/repository/v1/sqlcv1/pg_health.sql.go b/pkg/repository/v1/sqlcv1/pg_health.sql.go new file mode 100644 index 000000000..33beb1f71 --- /dev/null +++ b/pkg/repository/v1/sqlcv1/pg_health.sql.go @@ -0,0 +1,293 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.29.0 +// source: pg_health.sql + +package sqlcv1 + +import ( + "context" + "net/netip" + + "github.com/jackc/pgx/v5/pgtype" +) + +const checkBloat = `-- name: CheckBloat :many +SELECT + schemaname, + relname AS tablename, + pg_size_pretty(pg_total_relation_size(quote_ident(schemaname)||'.'||quote_ident(relname))) AS total_size, + pg_size_pretty(pg_relation_size(quote_ident(schemaname)||'.'||quote_ident(relname))) AS table_size, + n_live_tup AS live_tuples, + n_dead_tup AS dead_tuples, + ROUND(100 * n_dead_tup::numeric / NULLIF(n_live_tup, 0), 2) AS dead_pct, + last_vacuum, + last_autovacuum, + last_analyze +FROM + pg_stat_user_tables +WHERE + n_dead_tup > 1000 + AND n_live_tup > 1000 + AND ROUND(100 * n_dead_tup::numeric / NULLIF(n_live_tup, 0), 2) > 50 + AND relname NOT IN ( + 'Lease' + ) +ORDER BY + dead_pct DESC NULLS LAST +` + +type CheckBloatRow struct { + Schemaname pgtype.Text `json:"schemaname"` + Tablename pgtype.Text `json:"tablename"` + TotalSize string `json:"total_size"` + TableSize string `json:"table_size"` + LiveTuples pgtype.Int8 `json:"live_tuples"` + DeadTuples pgtype.Int8 `json:"dead_tuples"` + DeadPct pgtype.Numeric `json:"dead_pct"` + LastVacuum pgtype.Timestamptz `json:"last_vacuum"` + LastAutovacuum pgtype.Timestamptz `json:"last_autovacuum"` + LastAnalyze pgtype.Timestamptz `json:"last_analyze"` +} + +// Note: Requires track_counts = on (enabled by default in PostgreSQL) +func (q *Queries) CheckBloat(ctx context.Context, db DBTX) ([]*CheckBloatRow, error) { + rows, err := db.Query(ctx, checkBloat) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*CheckBloatRow + for rows.Next() { + var i CheckBloatRow + if err := rows.Scan( + &i.Schemaname, + &i.Tablename, + &i.TotalSize, + &i.TableSize, + &i.LiveTuples, + &i.DeadTuples, + &i.DeadPct, + &i.LastVacuum, + &i.LastAutovacuum, + &i.LastAnalyze, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const checkLongRunningQueries = `-- name: CheckLongRunningQueries :many +SELECT + pid, + usename, + application_name, + client_addr, + state, + now() - query_start AS duration, + query +FROM + pg_stat_activity +WHERE + state = 'active' + AND now() - query_start > interval '5 minutes' + AND query NOT LIKE 'autovacuum:%' + AND query NOT LIKE '%pg_stat_activity%' +ORDER BY + query_start +` + +type CheckLongRunningQueriesRow struct { + Pid pgtype.Int4 `json:"pid"` + Usename pgtype.Text `json:"usename"` + ApplicationName pgtype.Text `json:"application_name"` + ClientAddr *netip.Addr `json:"client_addr"` + State pgtype.Text `json:"state"` + Duration int32 `json:"duration"` + Query pgtype.Text `json:"query"` +} + +func (q *Queries) CheckLongRunningQueries(ctx context.Context, db DBTX) ([]*CheckLongRunningQueriesRow, error) { + rows, err := db.Query(ctx, checkLongRunningQueries) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*CheckLongRunningQueriesRow + for rows.Next() { + var i CheckLongRunningQueriesRow + if err := rows.Scan( + &i.Pid, + &i.Usename, + &i.ApplicationName, + &i.ClientAddr, + &i.State, + &i.Duration, + &i.Query, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const checkPGStatStatementsEnabled = `-- name: CheckPGStatStatementsEnabled :one +SELECT COUNT(*) FROM pg_available_extensions WHERE name = 'pg_stat_statements' +` + +func (q *Queries) CheckPGStatStatementsEnabled(ctx context.Context, db DBTX) (int64, error) { + row := db.QueryRow(ctx, checkPGStatStatementsEnabled) + var count int64 + err := row.Scan(&count) + return count, err +} + +const checkQueryCaches = `-- name: CheckQueryCaches :many +SELECT + schemaname, + relname AS tablename, + heap_blks_read, + heap_blks_hit, + heap_blks_hit + heap_blks_read AS total_reads, + ROUND( + 100.0 * heap_blks_hit / NULLIF(heap_blks_hit + heap_blks_read, 0), + 2 + )::float AS cache_hit_ratio_pct, + pg_size_pretty(pg_total_relation_size(quote_ident(schemaname)||'.'||quote_ident(relname))) AS total_size +FROM + pg_statio_user_tables +WHERE + heap_blks_read + heap_blks_hit > 0 +ORDER BY + heap_blks_read DESC +LIMIT 20 +` + +type CheckQueryCachesRow struct { + Schemaname pgtype.Text `json:"schemaname"` + Tablename pgtype.Text `json:"tablename"` + HeapBlksRead pgtype.Int8 `json:"heap_blks_read"` + HeapBlksHit pgtype.Int8 `json:"heap_blks_hit"` + TotalReads int32 `json:"total_reads"` + CacheHitRatioPct float64 `json:"cache_hit_ratio_pct"` + TotalSize string `json:"total_size"` +} + +// Note: Requires track_counts = on (enabled by default in PostgreSQL) +func (q *Queries) CheckQueryCaches(ctx context.Context, db DBTX) ([]*CheckQueryCachesRow, error) { + rows, err := db.Query(ctx, checkQueryCaches) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*CheckQueryCachesRow + for rows.Next() { + var i CheckQueryCachesRow + if err := rows.Scan( + &i.Schemaname, + &i.Tablename, + &i.HeapBlksRead, + &i.HeapBlksHit, + &i.TotalReads, + &i.CacheHitRatioPct, + &i.TotalSize, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const longRunningVacuum = `-- name: LongRunningVacuum :many +SELECT + p.pid, + p.relid::regclass AS table_name, + pg_size_pretty(pg_total_relation_size(p.relid)) AS table_size, + p.phase, + p.heap_blks_total AS total_blocks, + p.heap_blks_scanned AS scanned_blocks, + ROUND(100.0 * p.heap_blks_scanned / NULLIF(p.heap_blks_total, 0), 2) AS pct_scanned, + p.heap_blks_vacuumed AS vacuumed_blocks, + p.index_vacuum_count, + now() - a.query_start AS elapsed_time, + CASE + WHEN p.heap_blks_scanned > 0 THEN + ((now() - a.query_start) * (p.heap_blks_total - p.heap_blks_scanned) / p.heap_blks_scanned) + ELSE interval '0' + END AS estimated_time_remaining, + a.wait_event_type, + a.wait_event, + a.query +FROM + pg_stat_progress_vacuum p + JOIN pg_stat_activity a ON p.pid = a.pid +WHERE + now() - a.query_start > interval '3 hours' +ORDER BY + a.query_start +` + +type LongRunningVacuumRow struct { + Pid pgtype.Int4 `json:"pid"` + TableName interface{} `json:"table_name"` + TableSize string `json:"table_size"` + Phase pgtype.Text `json:"phase"` + TotalBlocks pgtype.Int8 `json:"total_blocks"` + ScannedBlocks pgtype.Int8 `json:"scanned_blocks"` + PctScanned pgtype.Numeric `json:"pct_scanned"` + VacuumedBlocks pgtype.Int8 `json:"vacuumed_blocks"` + IndexVacuumCount pgtype.Int8 `json:"index_vacuum_count"` + ElapsedTime int32 `json:"elapsed_time"` + EstimatedTimeRemaining pgtype.Interval `json:"estimated_time_remaining"` + WaitEventType pgtype.Text `json:"wait_event_type"` + WaitEvent pgtype.Text `json:"wait_event"` + Query pgtype.Text `json:"query"` +} + +func (q *Queries) LongRunningVacuum(ctx context.Context, db DBTX) ([]*LongRunningVacuumRow, error) { + rows, err := db.Query(ctx, longRunningVacuum) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*LongRunningVacuumRow + for rows.Next() { + var i LongRunningVacuumRow + if err := rows.Scan( + &i.Pid, + &i.TableName, + &i.TableSize, + &i.Phase, + &i.TotalBlocks, + &i.ScannedBlocks, + &i.PctScanned, + &i.VacuumedBlocks, + &i.IndexVacuumCount, + &i.ElapsedTime, + &i.EstimatedTimeRemaining, + &i.WaitEventType, + &i.WaitEvent, + &i.Query, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} diff --git a/pkg/repository/v1/sqlcv1/sqlc.yaml b/pkg/repository/v1/sqlcv1/sqlc.yaml index a6ba90f7a..f22848ca1 100644 --- a/pkg/repository/v1/sqlcv1/sqlc.yaml +++ b/pkg/repository/v1/sqlcv1/sqlc.yaml @@ -23,10 +23,12 @@ sql: - webhooks.sql - idempotency-keys.sql - interval_settings.sql + - pg_health.sql schema: - ../../../../sql/schema/v0.sql - ../../../../sql/schema/v1-core.sql - ../../../../sql/schema/v1-olap.sql + - ../../../../sql/schema/pg-stubs.sql - ./concurrency-additional-tables.sql strict_order_by: false gen: diff --git a/pkg/repository/v1/sqlcv1/tasks.sql b/pkg/repository/v1/sqlcv1/tasks.sql index d2f9b9191..6122e825f 100644 --- a/pkg/repository/v1/sqlcv1/tasks.sql +++ b/pkg/repository/v1/sqlcv1/tasks.sql @@ -1124,3 +1124,29 @@ SELECT count, oldest::TIMESTAMPTZ FROM running_tasks; + +-- name: FindOldestRunningTask :one +SELECT * +FROM v1_task_runtime +ORDER BY task_id, task_inserted_at +LIMIT 1; + +-- name: FindOldestTask :one +SELECT * +FROM v1_task +ORDER BY id, inserted_at +LIMIT 1; + +-- name: CheckLastAutovacuumForPartitionedTablesCoreDB :many +SELECT + s.schemaname, + s.relname AS tablename, + s.last_autovacuum, + EXTRACT(EPOCH FROM (NOW() - s.last_autovacuum)) AS seconds_since_last_autovacuum +FROM pg_stat_user_tables s +JOIN pg_catalog.pg_class c ON c.oid = (quote_ident(s.schemaname)||'.'||quote_ident(s.relname))::regclass +WHERE s.schemaname = 'public' + AND c.relispartition = true + AND c.relkind = 'r' +ORDER BY s.last_autovacuum ASC NULLS LAST +; diff --git a/pkg/repository/v1/sqlcv1/tasks.sql.go b/pkg/repository/v1/sqlcv1/tasks.sql.go index efdf6767d..0d069f730 100644 --- a/pkg/repository/v1/sqlcv1/tasks.sql.go +++ b/pkg/repository/v1/sqlcv1/tasks.sql.go @@ -39,6 +39,52 @@ func (q *Queries) AnalyzeV1TaskEvent(ctx context.Context, db DBTX) error { return err } +const checkLastAutovacuumForPartitionedTablesCoreDB = `-- name: CheckLastAutovacuumForPartitionedTablesCoreDB :many +SELECT + s.schemaname, + s.relname AS tablename, + s.last_autovacuum, + EXTRACT(EPOCH FROM (NOW() - s.last_autovacuum)) AS seconds_since_last_autovacuum +FROM pg_stat_user_tables s +JOIN pg_catalog.pg_class c ON c.oid = (quote_ident(s.schemaname)||'.'||quote_ident(s.relname))::regclass +WHERE s.schemaname = 'public' + AND c.relispartition = true + AND c.relkind = 'r' +ORDER BY s.last_autovacuum ASC NULLS LAST +` + +type CheckLastAutovacuumForPartitionedTablesCoreDBRow struct { + Schemaname pgtype.Text `json:"schemaname"` + Tablename pgtype.Text `json:"tablename"` + LastAutovacuum pgtype.Timestamptz `json:"last_autovacuum"` + SecondsSinceLastAutovacuum pgtype.Numeric `json:"seconds_since_last_autovacuum"` +} + +func (q *Queries) CheckLastAutovacuumForPartitionedTablesCoreDB(ctx context.Context, db DBTX) ([]*CheckLastAutovacuumForPartitionedTablesCoreDBRow, error) { + rows, err := db.Query(ctx, checkLastAutovacuumForPartitionedTablesCoreDB) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*CheckLastAutovacuumForPartitionedTablesCoreDBRow + for rows.Next() { + var i CheckLastAutovacuumForPartitionedTablesCoreDBRow + if err := rows.Scan( + &i.Schemaname, + &i.Tablename, + &i.LastAutovacuum, + &i.SecondsSinceLastAutovacuum, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const cleanupV1ConcurrencySlot = `-- name: CleanupV1ConcurrencySlot :execresult WITH locked_cs AS ( SELECT cs.task_id, cs.task_inserted_at, cs.task_retry_count @@ -439,6 +485,79 @@ func (q *Queries) FailTaskInternalFailure(ctx context.Context, db DBTX, arg Fail return items, nil } +const findOldestRunningTask = `-- name: FindOldestRunningTask :one +SELECT task_id, task_inserted_at, retry_count, worker_id, tenant_id, timeout_at +FROM v1_task_runtime +ORDER BY task_id, task_inserted_at +LIMIT 1 +` + +func (q *Queries) FindOldestRunningTask(ctx context.Context, db DBTX) (*V1TaskRuntime, error) { + row := db.QueryRow(ctx, findOldestRunningTask) + var i V1TaskRuntime + err := row.Scan( + &i.TaskID, + &i.TaskInsertedAt, + &i.RetryCount, + &i.WorkerID, + &i.TenantID, + &i.TimeoutAt, + ) + return &i, err +} + +const findOldestTask = `-- name: FindOldestTask :one +SELECT id, inserted_at, tenant_id, queue, action_id, step_id, step_readable_id, workflow_id, workflow_version_id, workflow_run_id, schedule_timeout, step_timeout, priority, sticky, desired_worker_id, external_id, display_name, input, retry_count, internal_retry_count, app_retry_count, step_index, additional_metadata, dag_id, dag_inserted_at, parent_task_external_id, parent_task_id, parent_task_inserted_at, child_index, child_key, initial_state, initial_state_reason, concurrency_parent_strategy_ids, concurrency_strategy_ids, concurrency_keys, retry_backoff_factor, retry_max_backoff +FROM v1_task +ORDER BY id, inserted_at +LIMIT 1 +` + +func (q *Queries) FindOldestTask(ctx context.Context, db DBTX) (*V1Task, error) { + row := db.QueryRow(ctx, findOldestTask) + var i V1Task + err := row.Scan( + &i.ID, + &i.InsertedAt, + &i.TenantID, + &i.Queue, + &i.ActionID, + &i.StepID, + &i.StepReadableID, + &i.WorkflowID, + &i.WorkflowVersionID, + &i.WorkflowRunID, + &i.ScheduleTimeout, + &i.StepTimeout, + &i.Priority, + &i.Sticky, + &i.DesiredWorkerID, + &i.ExternalID, + &i.DisplayName, + &i.Input, + &i.RetryCount, + &i.InternalRetryCount, + &i.AppRetryCount, + &i.StepIndex, + &i.AdditionalMetadata, + &i.DagID, + &i.DagInsertedAt, + &i.ParentTaskExternalID, + &i.ParentTaskID, + &i.ParentTaskInsertedAt, + &i.ChildIndex, + &i.ChildKey, + &i.InitialState, + &i.InitialStateReason, + &i.ConcurrencyParentStrategyIds, + &i.ConcurrencyStrategyIds, + &i.ConcurrencyKeys, + &i.RetryBackoffFactor, + &i.RetryMaxBackoff, + ) + return &i, err +} + const flattenExternalIds = `-- name: FlattenExternalIds :many WITH lookup_rows AS ( SELECT diff --git a/pkg/repository/v1/sqlcv1/workers.sql b/pkg/repository/v1/sqlcv1/workers.sql index 3d3863786..5e0333458 100644 --- a/pkg/repository/v1/sqlcv1/workers.sql +++ b/pkg/repository/v1/sqlcv1/workers.sql @@ -83,3 +83,42 @@ WHERE AND runtime.worker_id = @workerId::uuid LIMIT COALESCE(sqlc.narg('limit')::int, 100); + +-- name: ListTotalActiveSlotsPerTenant :many +SELECT "tenantId", SUM("maxRuns") AS "totalActiveSlots" +FROM "Worker" +WHERE + "dispatcherId" IS NOT NULL + AND "lastHeartbeatAt" > NOW() - INTERVAL '5 seconds' + AND "isActive" = true + AND "isPaused" = false +GROUP BY "tenantId" +; + +-- name: ListActiveSDKsPerTenant :many +SELECT + "tenantId", + COALESCE("language"::TEXT, 'unknown')::TEXT AS "language", + COALESCE("languageVersion", 'unknown') AS "languageVersion", + COALESCE("sdkVersion", 'unknown') AS "sdkVersion", + COALESCE("os", 'unknown') AS "os", + COUNT(*) AS "count" +FROM "Worker" +WHERE + "dispatcherId" IS NOT NULL + AND "lastHeartbeatAt" > NOW() - INTERVAL '5 seconds' + AND "isActive" = true + AND "isPaused" = false +GROUP BY "tenantId", "language", "languageVersion", "sdkVersion", "os" +; + +-- name: ListActiveWorkersPerTenant :many +SELECT "tenantId", COUNT(*) +FROM "Worker" +WHERE + "dispatcherId" IS NOT NULL + AND "lastHeartbeatAt" > NOW() - INTERVAL '5 seconds' + AND "isActive" = true + AND "isPaused" = false +GROUP BY "tenantId" +; diff --git a/pkg/repository/v1/sqlcv1/workers.sql.go b/pkg/repository/v1/sqlcv1/workers.sql.go index 2590fdd46..7c12a0770 100644 --- a/pkg/repository/v1/sqlcv1/workers.sql.go +++ b/pkg/repository/v1/sqlcv1/workers.sql.go @@ -65,6 +65,95 @@ func (q *Queries) GetWorkerById(ctx context.Context, db DBTX, id pgtype.UUID) (* return &i, err } +const listActiveSDKsPerTenant = `-- name: ListActiveSDKsPerTenant :many +SELECT + "tenantId", + COALESCE("language"::TEXT, 'unknown')::TEXT AS "language", + COALESCE("languageVersion", 'unknown') AS "languageVersion", + COALESCE("sdkVersion", 'unknown') AS "sdkVersion", + COALESCE("os", 'unknown') AS "os", + COUNT(*) AS "count" +FROM "Worker" +WHERE + "dispatcherId" IS NOT NULL + AND "lastHeartbeatAt" > NOW() - INTERVAL '5 seconds' + AND "isActive" = true + AND "isPaused" = false +GROUP BY "tenantId", "language", "languageVersion", "sdkVersion", "os" +` + +type ListActiveSDKsPerTenantRow struct { + TenantId pgtype.UUID `json:"tenantId"` + Language string `json:"language"` + LanguageVersion string `json:"languageVersion"` + SdkVersion string `json:"sdkVersion"` + Os string `json:"os"` + Count int64 `json:"count"` +} + +func (q *Queries) ListActiveSDKsPerTenant(ctx context.Context, db DBTX) ([]*ListActiveSDKsPerTenantRow, error) { + rows, err := db.Query(ctx, listActiveSDKsPerTenant) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*ListActiveSDKsPerTenantRow + for rows.Next() { + var i ListActiveSDKsPerTenantRow + if err := rows.Scan( + &i.TenantId, + &i.Language, + &i.LanguageVersion, + &i.SdkVersion, + &i.Os, + &i.Count, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const listActiveWorkersPerTenant = `-- name: ListActiveWorkersPerTenant :many +SELECT "tenantId", COUNT(*) +FROM "Worker" +WHERE + "dispatcherId" IS NOT NULL + AND "lastHeartbeatAt" > NOW() - INTERVAL '5 seconds' + AND "isActive" = true + AND "isPaused" = false +GROUP BY "tenantId" +` + +type ListActiveWorkersPerTenantRow struct { + TenantId pgtype.UUID `json:"tenantId"` + Count int64 `json:"count"` +} + +func (q *Queries) ListActiveWorkersPerTenant(ctx context.Context, db DBTX) ([]*ListActiveWorkersPerTenantRow, error) { + rows, err := db.Query(ctx, listActiveWorkersPerTenant) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*ListActiveWorkersPerTenantRow + for rows.Next() { + var i ListActiveWorkersPerTenantRow + if err := rows.Scan(&i.TenantId, &i.Count); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const listManyWorkerLabels = `-- name: ListManyWorkerLabels :many SELECT "id", @@ -246,6 +335,42 @@ func (q *Queries) ListSemaphoreSlotsWithStateForWorker(ctx context.Context, db D return items, nil } +const listTotalActiveSlotsPerTenant = `-- name: ListTotalActiveSlotsPerTenant :many +SELECT "tenantId", SUM("maxRuns") AS "totalActiveSlots" +FROM "Worker" +WHERE + "dispatcherId" IS NOT NULL + AND "lastHeartbeatAt" > NOW() - INTERVAL '5 seconds' + AND "isActive" = true + AND "isPaused" = false +GROUP BY "tenantId" +` + +type ListTotalActiveSlotsPerTenantRow struct { + TenantId pgtype.UUID `json:"tenantId"` + TotalActiveSlots int64 `json:"totalActiveSlots"` +} + +func (q *Queries) ListTotalActiveSlotsPerTenant(ctx context.Context, db DBTX) ([]*ListTotalActiveSlotsPerTenantRow, error) { + rows, err := db.Query(ctx, listTotalActiveSlotsPerTenant) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*ListTotalActiveSlotsPerTenantRow + for rows.Next() { + var i ListTotalActiveSlotsPerTenantRow + if err := rows.Scan(&i.TenantId, &i.TotalActiveSlots); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const listWorkersWithSlotCount = `-- name: ListWorkersWithSlotCount :many SELECT workers.id, workers."createdAt", workers."updatedAt", workers."deletedAt", workers."tenantId", workers."lastHeartbeatAt", workers.name, workers."dispatcherId", workers."maxRuns", workers."isActive", workers."lastListenerEstablished", workers."isPaused", workers.type, workers."webhookId", workers.language, workers."languageVersion", workers.os, workers."runtimeExtra", workers."sdkVersion", diff --git a/pkg/repository/v1/task.go b/pkg/repository/v1/task.go index 4d68f525d..1ef39e28f 100644 --- a/pkg/repository/v1/task.go +++ b/pkg/repository/v1/task.go @@ -262,6 +262,9 @@ type TaskRepository interface { Cleanup(ctx context.Context) (bool, error) GetTaskStats(ctx context.Context, tenantId string) (map[string]TaskStat, error) + + FindOldestRunningTaskInsertedAt(ctx context.Context) (*time.Time, error) + FindOldestTaskInsertedAt(ctx context.Context) (*time.Time, error) } type TaskRepositoryImpl struct { @@ -3861,3 +3864,31 @@ func (r *TaskRepositoryImpl) GetTaskStats(ctx context.Context, tenantId string) return result, nil } + +func (r *TaskRepositoryImpl) FindOldestRunningTaskInsertedAt(ctx context.Context) (*time.Time, error) { + t, err := r.queries.FindOldestRunningTask(ctx, r.pool) + + if err != nil { + return nil, err + } + + if t == nil { + return nil, nil + } + + return &t.TaskInsertedAt.Time, nil +} + +func (r *TaskRepositoryImpl) FindOldestTaskInsertedAt(ctx context.Context) (*time.Time, error) { + t, err := r.queries.FindOldestTask(ctx, r.pool) + + if err != nil { + return nil, err + } + + if t == nil { + return nil, nil + } + + return &t.InsertedAt.Time, nil +} diff --git a/pkg/repository/v1/worker.go b/pkg/repository/v1/worker.go index 365849a4a..9cbb36117 100644 --- a/pkg/repository/v1/worker.go +++ b/pkg/repository/v1/worker.go @@ -18,6 +18,9 @@ type WorkerRepository interface { ListWorkers(tenantId string, opts *repository.ListWorkersOpts) ([]*sqlcv1.ListWorkersWithSlotCountRow, error) GetWorkerById(workerId string) (*sqlcv1.GetWorkerByIdRow, error) ListWorkerState(tenantId, workerId string, maxRuns int) ([]*sqlcv1.ListSemaphoreSlotsWithStateForWorkerRow, []*dbsqlc.GetStepRunForEngineRow, error) + CountActiveSlotsPerTenant() (map[string]int64, error) + CountActiveWorkersPerTenant() (map[string]int64, error) + ListActiveSDKsPerTenant() (map[TenantIdSDKTuple]int64, error) } type workerRepository struct { @@ -89,3 +92,74 @@ func (w *workerRepository) ListWorkerState(tenantId, workerId string, maxRuns in return slots, []*dbsqlc.GetStepRunForEngineRow{}, nil } + +func (w *workerRepository) CountActiveSlotsPerTenant() (map[string]int64, error) { + slots, err := w.queries.ListTotalActiveSlotsPerTenant(context.Background(), w.pool) + + if err != nil { + return nil, fmt.Errorf("could not list active slots per tenant: %w", err) + } + + tenantToSlots := make(map[string]int64) + + for _, slot := range slots { + tenantToSlots[slot.TenantId.String()] = slot.TotalActiveSlots + } + + return tenantToSlots, nil +} + +type SDK struct { + OperatingSystem string + Language string + LanguageVersion string + SdkVersion string +} + +type TenantIdSDKTuple struct { + TenantId string + SDK SDK +} + +func (w *workerRepository) ListActiveSDKsPerTenant() (map[TenantIdSDKTuple]int64, error) { + sdks, err := w.queries.ListActiveSDKsPerTenant(context.Background(), w.pool) + + if err != nil { + return nil, fmt.Errorf("could not list active sdks per tenant: %w", err) + } + + tenantIdSDKTupleToCount := make(map[TenantIdSDKTuple]int64) + + for _, sdk := range sdks { + tenantId := sdk.TenantId.String() + tenantIdSdkTuple := TenantIdSDKTuple{ + TenantId: tenantId, + SDK: SDK{ + OperatingSystem: sdk.Os, + Language: sdk.Language, + LanguageVersion: sdk.LanguageVersion, + SdkVersion: sdk.SdkVersion, + }, + } + + tenantIdSDKTupleToCount[tenantIdSdkTuple] = sdk.Count + } + + return tenantIdSDKTupleToCount, nil +} + +func (w *workerRepository) CountActiveWorkersPerTenant() (map[string]int64, error) { + workers, err := w.queries.ListActiveWorkersPerTenant(context.Background(), w.pool) + + if err != nil { + return nil, fmt.Errorf("could not list active workers per tenant: %w", err) + } + + tenantToWorkers := make(map[string]int64) + + for _, worker := range workers { + tenantToWorkers[worker.TenantId.String()] = worker.Count + } + + return tenantToWorkers, nil +} diff --git a/pkg/telemetry/metrics.go b/pkg/telemetry/metrics.go new file mode 100644 index 000000000..2225b3756 --- /dev/null +++ b/pkg/telemetry/metrics.go @@ -0,0 +1,236 @@ +package telemetry + +import ( + "context" + "fmt" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +// MetricsRecorder provides a centralized way to record OTel metrics +type MetricsRecorder struct { + meter metric.Meter + + // Database health metrics + dbBloatGauge metric.Int64Gauge + dbBloatPercentGauge metric.Float64Gauge + dbLongRunningQueriesGauge metric.Int64Gauge + dbQueryCacheHitRatioGauge metric.Float64Gauge + dbLongRunningVacuumGauge metric.Int64Gauge + dbLastAutovacuumSecondsSinceGauge metric.Float64Gauge + + // OLAP metrics + olapTempTableSizeDAGGauge metric.Int64Gauge + olapTempTableSizeTaskGauge metric.Int64Gauge + yesterdayRunCountGauge metric.Int64Gauge + + // Worker metrics + activeSlotsGauge metric.Int64Gauge + activeWorkersGauge metric.Int64Gauge + activeSDKsGauge metric.Int64Gauge +} + +// NewMetricsRecorder creates a new metrics recorder with all instruments registered +func NewMetricsRecorder(ctx context.Context) (*MetricsRecorder, error) { + meter := otel.Meter("hatchet.run/metrics") + + // Database health metrics + dbBloatGauge, err := meter.Int64Gauge( + "hatchet.db.bloat.count", + metric.WithDescription("Number of bloated tables detected in the database"), + ) + if err != nil { + return nil, fmt.Errorf("failed to create db bloat gauge: %w", err) + } + + dbBloatPercentGauge, err := meter.Float64Gauge( + "hatchet.db.bloat.dead_tuple_percent", + metric.WithDescription("Percentage of dead tuples per table"), + ) + if err != nil { + return nil, fmt.Errorf("failed to create db bloat percent gauge: %w", err) + } + + dbLongRunningQueriesGauge, err := meter.Int64Gauge( + "hatchet.db.long_running_queries.count", + metric.WithDescription("Number of long-running queries detected in the database"), + ) + if err != nil { + return nil, fmt.Errorf("failed to create long running queries gauge: %w", err) + } + + dbQueryCacheHitRatioGauge, err := meter.Float64Gauge( + "hatchet.db.query_cache.hit_ratio", + metric.WithDescription("Query cache hit ratio percentage for tables"), + ) + if err != nil { + return nil, fmt.Errorf("failed to create query cache hit ratio gauge: %w", err) + } + + dbLongRunningVacuumGauge, err := meter.Int64Gauge( + "hatchet.db.long_running_vacuum.count", + metric.WithDescription("Number of long-running vacuum operations detected in the database"), + ) + if err != nil { + return nil, fmt.Errorf("failed to create long running vacuum gauge: %w", err) + } + + dbLastAutovacuumSecondsSinceGauge, err := meter.Float64Gauge( + "hatchet.db.last_autovacuum.seconds_since", + metric.WithDescription("Seconds since last autovacuum for partitioned tables"), + ) + if err != nil { + return nil, fmt.Errorf("failed to create last autovacuum gauge: %w", err) + } + + // OLAP metrics (instance-wide) + olapTempTableSizeDAGGauge, err := meter.Int64Gauge( + "hatchet.olap.temp_table_size.dag_status_updates", + metric.WithDescription("Size of temporary table for DAG status updates (instance-wide)"), + ) + if err != nil { + return nil, fmt.Errorf("failed to create OLAP DAG temp table size gauge: %w", err) + } + + olapTempTableSizeTaskGauge, err := meter.Int64Gauge( + "hatchet.olap.temp_table_size.task_status_updates", + metric.WithDescription("Size of temporary table for task status updates (instance-wide)"), + ) + if err != nil { + return nil, fmt.Errorf("failed to create OLAP task temp table size gauge: %w", err) + } + + yesterdayRunCountGauge, err := meter.Int64Gauge( + "hatchet.olap.yesterday_run_count", + metric.WithDescription("Number of workflow runs from yesterday by status (instance-wide)"), + ) + if err != nil { + return nil, fmt.Errorf("failed to create yesterday run count gauge: %w", err) + } + + // Worker metrics + activeSlotsGauge, err := meter.Int64Gauge( + "hatchet.workers.active_slots", + metric.WithDescription("Number of active worker slots per tenant"), + ) + if err != nil { + return nil, fmt.Errorf("failed to create active slots gauge: %w", err) + } + + activeWorkersGauge, err := meter.Int64Gauge( + "hatchet.workers.active_count", + metric.WithDescription("Number of active workers per tenant"), + ) + if err != nil { + return nil, fmt.Errorf("failed to create active workers gauge: %w", err) + } + + activeSDKsGauge, err := meter.Int64Gauge( + "hatchet.workers.active_sdks", + metric.WithDescription("Number of active SDKs per tenant and SDK version"), + ) + if err != nil { + return nil, fmt.Errorf("failed to create active SDKs gauge: %w", err) + } + + return &MetricsRecorder{ + meter: meter, + dbBloatGauge: dbBloatGauge, + dbBloatPercentGauge: dbBloatPercentGauge, + dbLongRunningQueriesGauge: dbLongRunningQueriesGauge, + dbQueryCacheHitRatioGauge: dbQueryCacheHitRatioGauge, + dbLongRunningVacuumGauge: dbLongRunningVacuumGauge, + dbLastAutovacuumSecondsSinceGauge: dbLastAutovacuumSecondsSinceGauge, + olapTempTableSizeDAGGauge: olapTempTableSizeDAGGauge, + olapTempTableSizeTaskGauge: olapTempTableSizeTaskGauge, + yesterdayRunCountGauge: yesterdayRunCountGauge, + activeSlotsGauge: activeSlotsGauge, + activeWorkersGauge: activeWorkersGauge, + activeSDKsGauge: activeSDKsGauge, + }, nil +} + +// RecordDBBloat records the number of bloated tables detected +func (m *MetricsRecorder) RecordDBBloat(ctx context.Context, count int64, healthStatus string) { + m.dbBloatGauge.Record(ctx, count, + metric.WithAttributes(attribute.String("health_status", healthStatus))) +} + +// RecordDBBloatPercent records the dead tuple percentage for a specific table +func (m *MetricsRecorder) RecordDBBloatPercent(ctx context.Context, tableName string, deadPercent float64) { + m.dbBloatPercentGauge.Record(ctx, deadPercent, + metric.WithAttributes(attribute.String("table_name", tableName))) +} + +// RecordDBLongRunningQueries records the number of long-running queries +func (m *MetricsRecorder) RecordDBLongRunningQueries(ctx context.Context, count int64) { + m.dbLongRunningQueriesGauge.Record(ctx, count) +} + +// RecordDBQueryCacheHitRatio records the query cache hit ratio for a table +func (m *MetricsRecorder) RecordDBQueryCacheHitRatio(ctx context.Context, tableName string, hitRatio float64) { + m.dbQueryCacheHitRatioGauge.Record(ctx, hitRatio, + metric.WithAttributes(attribute.String("table_name", tableName))) +} + +// RecordDBLongRunningVacuum records the number of long-running vacuum operations +func (m *MetricsRecorder) RecordDBLongRunningVacuum(ctx context.Context, count int64, healthStatus string) { + m.dbLongRunningVacuumGauge.Record(ctx, count, + metric.WithAttributes(attribute.String("health_status", healthStatus))) +} + +// RecordDBLastAutovacuumSecondsSince records seconds since last autovacuum for a partitioned table +func (m *MetricsRecorder) RecordDBLastAutovacuumSecondsSince(ctx context.Context, tableName string, seconds float64) { + m.dbLastAutovacuumSecondsSinceGauge.Record(ctx, seconds, + metric.WithAttributes(attribute.String("table_name", tableName))) +} + +// RecordOLAPTempTableSizeDAG records the size of the OLAP DAG status updates temp table (instance-wide) +func (m *MetricsRecorder) RecordOLAPTempTableSizeDAG(ctx context.Context, size int64) { + m.olapTempTableSizeDAGGauge.Record(ctx, size) +} + +// RecordOLAPTempTableSizeTask records the size of the OLAP task status updates temp table (instance-wide) +func (m *MetricsRecorder) RecordOLAPTempTableSizeTask(ctx context.Context, size int64) { + m.olapTempTableSizeTaskGauge.Record(ctx, size) +} + +// RecordYesterdayRunCount records the number of workflow runs from yesterday (instance-wide) +func (m *MetricsRecorder) RecordYesterdayRunCount(ctx context.Context, status string, count int64) { + m.yesterdayRunCountGauge.Record(ctx, count, + metric.WithAttributes(attribute.String("status", status))) +} + +// RecordActiveSlots records the number of active worker slots +func (m *MetricsRecorder) RecordActiveSlots(ctx context.Context, tenantId string, count int64) { + m.activeSlotsGauge.Record(ctx, count, + metric.WithAttributes(attribute.String("tenant_id", tenantId))) +} + +// RecordActiveWorkers records the number of active workers +func (m *MetricsRecorder) RecordActiveWorkers(ctx context.Context, tenantId string, count int64) { + m.activeWorkersGauge.Record(ctx, count, + metric.WithAttributes(attribute.String("tenant_id", tenantId))) +} + +// RecordActiveSDKs records the number of active SDKs +func (m *MetricsRecorder) RecordActiveSDKs(ctx context.Context, tenantId string, sdk SDKInfo, count int64) { + m.activeSDKsGauge.Record(ctx, count, + metric.WithAttributes( + attribute.String("tenant_id", tenantId), + attribute.String("sdk_language", sdk.Language), + attribute.String("sdk_version", sdk.SdkVersion), + attribute.String("sdk_os", sdk.OperatingSystem), + attribute.String("sdk_language_version", sdk.LanguageVersion), + )) +} + +// SDKInfo contains information about an SDK +type SDKInfo struct { + OperatingSystem string + Language string + LanguageVersion string + SdkVersion string +} diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go index bb891b58a..3d186d5e0 100644 --- a/pkg/telemetry/telemetry.go +++ b/pkg/telemetry/telemetry.go @@ -11,9 +11,11 @@ import ( "github.com/google/uuid" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/trace" @@ -106,6 +108,89 @@ func InitTracer(opts *TracerOpts) (func(context.Context) error, error) { return exporter.Shutdown, nil } +func InitMeter(opts *TracerOpts) (func(context.Context) error, error) { + if opts.CollectorURL == "" { + // no-op + return func(context.Context) error { + return nil + }, nil + } + + var secureOption otlpmetricgrpc.Option + + if !opts.Insecure { + secureOption = otlpmetricgrpc.WithTLSCredentials(credentials.NewClientTLSFromCert(nil, "")) + } else { + secureOption = otlpmetricgrpc.WithInsecure() + } + + exporter, err := otlpmetricgrpc.New( + context.Background(), + secureOption, + otlpmetricgrpc.WithEndpoint(opts.CollectorURL), + otlpmetricgrpc.WithHeaders(map[string]string{ + "Authorization": opts.CollectorAuth, + }), + ) + + if err != nil { + return nil, fmt.Errorf("failed to create exporter: %w", err) + } + + resourceAttrs := []attribute.KeyValue{ + attribute.String("service.name", opts.ServiceName), + attribute.String("library.language", "go"), + } + + // Add Kubernetes pod information if available + if podName := os.Getenv("K8S_POD_NAME"); podName != "" { + resourceAttrs = append(resourceAttrs, attribute.String("k8s.pod.name", podName)) + } + if podNamespace := os.Getenv("K8S_POD_NAMESPACE"); podNamespace != "" { + resourceAttrs = append(resourceAttrs, attribute.String("k8s.namespace.name", podNamespace)) + } + + resources, err := resource.New( + context.Background(), + resource.WithAttributes(resourceAttrs...), + ) + + if err != nil { + return nil, fmt.Errorf("failed to set resources: %w", err) + } + + meterProvider := metric.NewMeterProvider( + metric.WithReader( + metric.NewPeriodicReader( + exporter, + metric.WithInterval(3*time.Second), + ), + ), + metric.WithResource(resources), + ) + + otel.SetMeterProvider( + meterProvider, + ) + + return func(ctx context.Context) error { + var shutdownErr error + + if err := meterProvider.Shutdown(ctx); err != nil { + shutdownErr = fmt.Errorf("failed to shutdown meter provider: %w", err) + } + + if err := exporter.Shutdown(ctx); err != nil { + if shutdownErr != nil { + shutdownErr = fmt.Errorf("%v; failed to shutdown exporter: %w", shutdownErr, err) + } else { + shutdownErr = fmt.Errorf("failed to shutdown exporter: %w", err) + } + } + return shutdownErr + }, nil +} + func NewSpan(ctx context.Context, name string) (context.Context, trace.Span) { ctx, span := otel.Tracer("").Start(ctx, prefixSpanKey(name)) return ctx, span diff --git a/sql/schema/pg-stubs.sql b/sql/schema/pg-stubs.sql new file mode 100644 index 000000000..e72c66f97 --- /dev/null +++ b/sql/schema/pg-stubs.sql @@ -0,0 +1,109 @@ +-- these are stub views to satisfy the sqlc generator for pg features + +CREATE VIEW pg_stat_user_tables AS +SELECT + NULL::oid AS relid, + NULL::name AS schemaname, + NULL::name AS relname, + NULL::bigint AS seq_scan, + NULL::bigint AS seq_tup_read, + NULL::bigint AS idx_scan, + NULL::bigint AS idx_tup_fetch, + NULL::bigint AS n_tup_ins, + NULL::bigint AS n_tup_upd, + NULL::bigint AS n_tup_del, + NULL::bigint AS n_tup_hot_upd, + NULL::bigint AS n_live_tup, + NULL::bigint AS n_dead_tup, + NULL::bigint AS n_mod_since_analyze, + NULL::bigint AS n_ins_since_vacuum, + NULL::bigint AS vacuum_count, + NULL::bigint AS autovacuum_count, + NULL::bigint AS analyze_count, + NULL::bigint AS autoanalyze_count, + NULL::timestamp with time zone AS last_vacuum, + NULL::timestamp with time zone AS last_autovacuum, + NULL::timestamp with time zone AS last_analyze, + NULL::timestamp with time zone AS last_autoanalyze +WHERE + false; + +CREATE VIEW pg_statio_user_tables AS +SELECT + NULL::oid AS relid, + NULL::name AS schemaname, + NULL::name AS relname, + NULL::bigint AS heap_blks_read, + NULL::bigint AS heap_blks_hit, + NULL::bigint AS idx_blks_read, + NULL::bigint AS idx_blks_hit, + NULL::bigint AS toast_blks_read, + NULL::bigint AS toast_blks_hit, + NULL::bigint AS tidx_blks_read, + NULL::bigint AS tidx_blks_hit +WHERE + false; + +CREATE VIEW pg_available_extensions AS +SELECT + NULL::name AS name, + NULL::text AS default_version, + NULL::text AS installed_version, + NULL::text AS comment +WHERE + false; + +CREATE VIEW pg_stat_statements AS +SELECT + NULL::oid AS userid, + NULL::oid AS dbid, + NULL::bigint AS queryid, + NULL::text AS query, + NULL::bigint AS calls, + NULL::double precision AS total_exec_time, + NULL::bigint AS rows, + NULL::bigint AS shared_blks_hit, + NULL::bigint AS shared_blks_read, + NULL::bigint AS shared_blks_dirtied, + NULL::bigint AS shared_blks_written, + NULL::bigint AS local_blks_hit, + NULL::bigint AS local_blks_read, + NULL::bigint AS local_blks_dirtied, + NULL::bigint AS local_blks_written, + NULL::bigint AS temp_blks_read, + NULL::bigint AS temp_blks_written, + NULL::double precision AS blk_read_time, + NULL::double precision AS blk_write_time +WHERE + false; + +CREATE VIEW pg_stat_progress_vacuum AS +SELECT + NULL::integer AS pid, + NULL::oid AS datid, + NULL::name AS datname, + NULL::oid AS relid, + NULL::text AS phase, + NULL::bigint AS heap_blks_total, + NULL::bigint AS heap_blks_scanned, + NULL::bigint AS heap_blks_vacuumed, + NULL::bigint AS heap_blks_frozen, + NULL::bigint AS index_vacuum_count, + NULL::bigint AS max_dead_tuples, + NULL::bigint AS num_dead_tuples +WHERE + false; + +CREATE VIEW pg_stat_activity AS +SELECT + NULL::integer AS pid, + NULL::name AS usename, + NULL::text AS application_name, + NULL::inet AS client_addr, + NULL::text AS state, + NULL::timestamp with time zone AS query_start, + NULL::text AS wait_event_type, + NULL::text AS wait_event, + NULL::text AS query +WHERE + false;