Feat: Hatchet Metrics Monitoring, I (#2480)

* feat: queries + task methods for oldest running task and oldest task

* feat: worker slot and sdk metrics

* feat: wal metrics

* repository stub

* feat: add meter provider thingy

* pg queries

* fix: add task

* feat: repo methods for worker metrics

* feat: active workers query, fix where clauses

* fix: aliasing

* fix: sql, cleanup

* chore: cast

* feat: olap queries

* feat: olap queries

* feat: finish wiring up olap status update metrics

* chore: lint

* chore: lint

* fix: dupes, other code review comments

* send metrics to OTel collector

* last autovac

* flag

* logging updates

* address PR comments

---------

Co-authored-by: gabriel ruttner <gabriel.ruttner@gmail.com>
Co-authored-by: Mohammed Nafees <hello@mnafees.me>
This commit is contained in:
matt
2025-12-22 14:34:02 -05:00
committed by GitHub
parent a4e7584c18
commit fdc075ec6f
33 changed files with 2402 additions and 60 deletions

View File

@@ -249,3 +249,6 @@ tasks:
EXTRA_ARGS: '{{.EXTRA_ARGS | default ""}}'
cmds:
- poetry run pytest -n auto --retries 3 --retry-delay 5
start-telemetry:
cmds:
- docker compose -f docker-compose.infra.yml up -d

View File

@@ -124,7 +124,8 @@ func (w *V1WebhooksService) V1WebhookReceive(ctx echo.Context, request gen.V1Web
* See: https://api.slack.com/interactivity/slash-commands
* For GENERIC webhooks, we convert all form fields directly to the payload map
*/
if webhook.SourceName == sqlcv1.V1IncomingWebhookSourceNameSLACK {
switch webhook.SourceName {
case sqlcv1.V1IncomingWebhookSourceNameSLACK:
payloadValue := formData.Get("payload")
if payloadValue != "" {
/* Interactive components: parse the payload parameter as JSON */
@@ -153,14 +154,14 @@ func (w *V1WebhooksService) V1WebhookReceive(ctx echo.Context, request gen.V1Web
}
}
}
} else if webhook.SourceName == sqlcv1.V1IncomingWebhookSourceNameGENERIC {
case sqlcv1.V1IncomingWebhookSourceNameGENERIC:
/* For GENERIC webhooks, convert all form fields to the payload map */
for key, values := range formData {
if len(values) > 0 {
payloadMap[key] = values[0]
}
}
} else {
default:
/* For other webhook sources, form-encoded data is unexpected - return error */
return gen.V1WebhookReceive400JSONResponse{
Errors: []gen.APIError{
@@ -210,11 +211,13 @@ func (w *V1WebhooksService) V1WebhookReceive(ctx echo.Context, request gen.V1Web
if err != nil {
var errorMsg string
if strings.Contains(err.Error(), "did not evaluate to a string") {
errStr := err.Error()
switch {
case strings.Contains(errStr, "did not evaluate to a string"):
errorMsg = "Event key expression must evaluate to a string"
} else if eventKey == "" {
case eventKey == "":
errorMsg = "Event key evaluated to an empty string"
} else {
default:
errorMsg = "Failed to evaluate event key expression"
}

View File

@@ -354,7 +354,7 @@ export type SeededUser = (typeof seededUsers)[SeededUserKey];
return err
}
if err := os.WriteFile(targetFile, []byte(contents), 0o644); err != nil {
if err := os.WriteFile(targetFile, []byte(contents), 0o600); err != nil {
return err
}

View File

@@ -14,6 +14,7 @@ import (
adminv1 "github.com/hatchet-dev/hatchet/internal/services/admin/v1"
"github.com/hatchet-dev/hatchet/internal/services/controllers/events"
"github.com/hatchet-dev/hatchet/internal/services/controllers/jobs"
metricscontroller "github.com/hatchet-dev/hatchet/internal/services/controllers/metrics"
"github.com/hatchet-dev/hatchet/internal/services/controllers/retention"
"github.com/hatchet-dev/hatchet/internal/services/controllers/v1/olap"
"github.com/hatchet-dev/hatchet/internal/services/controllers/v1/task"
@@ -70,6 +71,18 @@ func init() {
if err != nil {
panic(fmt.Errorf("could not initialize tracer: %w", err))
}
// Initialize the meter provider for metrics
_, err = telemetry.InitMeter(&telemetry.TracerOpts{
ServiceName: svcName,
CollectorURL: collectorURL,
Insecure: insecureBool,
CollectorAuth: collectorAuth,
})
if err != nil {
panic(fmt.Errorf("could not initialize meter: %w", err))
}
}
func Run(ctx context.Context, cf *loader.ConfigLoader, version string) error {
@@ -890,6 +903,31 @@ func runV1Config(ctx context.Context, sc *server.ServerConfig) ([]Teardown, erro
Name: "webhook worker",
Fn: cleanup2,
})
if sc.OpenTelemetry.MetricsEnabled && sc.OpenTelemetry.CollectorURL != "" {
mc, err := metricscontroller.New(
metricscontroller.WithLogger(sc.Logger),
metricscontroller.WithRepository(sc.V1),
metricscontroller.WithAlerter(sc.Alerter),
metricscontroller.WithPartition(p),
metricscontroller.WithIntervals(sc.CronOperations),
)
if err != nil {
return nil, fmt.Errorf("could not create metrics collector: %w", err)
}
cleanupMetrics, err := mc.Start()
if err != nil {
return nil, fmt.Errorf("could not start metrics collector: %w", err)
}
teardown = append(teardown, Teardown{
Name: "metrics collector",
Fn: cleanupMetrics,
})
l.Info().Msg("metrics collector started")
}
}
if sc.HasService("all") || sc.HasService("grpc-api") {

View File

@@ -307,6 +307,7 @@ Variables marked with ⚠️ are conditionally required when specific features a
| `SERVER_OTEL_INSECURE` | Whether to use an insecure connection to the collector URL | |
| `SERVER_OTEL_TRACE_ID_RATIO` | OpenTelemetry trace ID ratio | |
| `SERVER_OTEL_COLLECTOR_AUTH` | OpenTelemetry Collector Authorization header value | |
| `SERVER_OTEL_METRICS_ENABLED` | Enable OpenTelemetry metrics collection | `false` |
| `SERVER_PROMETHEUS_ENABLED` | Enable Prometheus | `false` |
| `SERVER_PROMETHEUS_ADDRESS` | Prometheus address | `:9090` |
| `SERVER_PROMETHEUS_PATH` | Prometheus metrics path | `/metrics` |
@@ -338,36 +339,41 @@ Variables marked with ⚠️ are conditionally required when specific features a
## Cron Operations Configuration
| Variable | Description | Default Value |
| --------------------------------------------------------- | ----------------------------------------------------- | ------------- |
| `SERVER_CRON_OPERATIONS_TASK_ANALYZE_CRON_INTERVAL` | Interval for running ANALYZE on task-related tables | `3h` |
| `SERVER_CRON_OPERATIONS_OLAP_ANALYZE_CRON_INTERVAL` | Interval for running ANALYZE on OLAP/analytics tables | `3h` |
| `SERVER_WAIT_FOR_FLUSH` | Default wait for flush | `1ms` |
| `SERVER_MAX_CONCURRENT` | Default max concurrent | `50` |
| `SERVER_FLUSH_PERIOD_MILLISECONDS` | Default flush period | `10ms` |
| `SERVER_FLUSH_ITEMS_THRESHOLD` | Default flush threshold | `100` |
| `SERVER_FLUSH_STRATEGY` | Default flush strategy | `DYNAMIC` |
| `SERVER_WORKFLOWRUNBUFFER_WAIT_FOR_FLUSH` | Workflow run buffer wait for flush | |
| `SERVER_WORKFLOWRUNBUFFER_MAX_CONCURRENT` | Max concurrent workflow run buffer ops | |
| `SERVER_WORKFLOWRUNBUFFER_FLUSH_PERIOD_MILLISECONDS` | Flush period for workflow run buffer | |
| `SERVER_WORKFLOWRUNBUFFER_FLUSH_ITEMS_THRESHOLD` | Items threshold for workflow run buffer | |
| `SERVER_WORKFLOWRUNBUFFER_FLUSH_STRATEGY` | Flush strategy for workflow run buffer | |
| `SERVER_EVENTBUFFER_WAIT_FOR_FLUSH` | Event buffer wait for flush | |
| `SERVER_EVENTBUFFER_MAX_CONCURRENT` | Max concurrent event buffer ops | |
| `SERVER_EVENTBUFFER_FLUSH_PERIOD_MILLISECONDS` | Flush period for event buffer | |
| `SERVER_EVENTBUFFER_FLUSH_ITEMS_THRESHOLD` | Items threshold for event buffer | |
| `SERVER_EVENTBUFFER_SERIAL_BUFFER` | Event buffer serial mode | |
| `SERVER_EVENTBUFFER_FLUSH_STRATEGY` | Flush strategy for event buffer | |
| `SERVER_RELEASESEMAPHOREBUFFER_WAIT_FOR_FLUSH` | Release semaphore buffer wait for flush | |
| `SERVER_RELEASESEMAPHOREBUFFER_MAX_CONCURRENT` | Max concurrent release semaphore buffer ops | |
| `SERVER_RELEASESEMAPHOREBUFFER_FLUSH_PERIOD_MILLISECONDS` | Flush period for release semaphore buffer | |
| `SERVER_RELEASESEMAPHOREBUFFER_FLUSH_ITEMS_THRESHOLD` | Items threshold for release semaphore buffer | |
| `SERVER_RELEASESEMAPHOREBUFFER_FLUSH_STRATEGY` | Flush strategy for release semaphore buffer | |
| `SERVER_QUEUESTEPRUNBUFFER_WAIT_FOR_FLUSH` | Queue step run buffer wait for flush | |
| `SERVER_QUEUESTEPRUNBUFFER_MAX_CONCURRENT` | Max concurrent queue step run buffer ops | |
| `SERVER_QUEUESTEPRUNBUFFER_FLUSH_PERIOD_MILLISECONDS` | Flush period for queue step run buffer | |
| `SERVER_QUEUESTEPRUNBUFFER_FLUSH_ITEMS_THRESHOLD` | Items threshold for queue step run buffer | |
| `SERVER_QUEUESTEPRUNBUFFER_FLUSH_STRATEGY` | Flush strategy for queue step run buffer | |
| Variable | Description | Default Value |
| --------------------------------------------------------- | --------------------------------------------------------------------- | ------------- |
| `SERVER_CRON_OPERATIONS_TASK_ANALYZE_CRON_INTERVAL` | Interval for running ANALYZE on task-related tables | `3h` |
| `SERVER_CRON_OPERATIONS_OLAP_ANALYZE_CRON_INTERVAL` | Interval for running ANALYZE on OLAP/analytics tables | `3h` |
| `SERVER_CRON_OPERATIONS_DB_HEALTH_METRICS_INTERVAL` | Interval for collecting database health metrics (OTel) | `60s` |
| `SERVER_CRON_OPERATIONS_OLAP_METRICS_INTERVAL` | Interval for collecting OLAP metrics (OTel) | `5m` |
| `SERVER_CRON_OPERATIONS_WORKER_METRICS_INTERVAL` | Interval for collecting worker metrics (OTel) | `60s` |
| `SERVER_CRON_OPERATIONS_YESTERDAY_RUN_COUNT_HOUR` | Hour (0-23) at which to collect yesterday's workflow run count (OTel) | `0` |
| `SERVER_CRON_OPERATIONS_YESTERDAY_RUN_COUNT_MINUTE` | Minute (0-59) at which to collect yesterday's workflow run count | `5` |
| `SERVER_WAIT_FOR_FLUSH` | Default wait for flush | `1ms` |
| `SERVER_MAX_CONCURRENT` | Default max concurrent | `50` |
| `SERVER_FLUSH_PERIOD_MILLISECONDS` | Default flush period | `10ms` |
| `SERVER_FLUSH_ITEMS_THRESHOLD` | Default flush threshold | `100` |
| `SERVER_FLUSH_STRATEGY` | Default flush strategy | `DYNAMIC` |
| `SERVER_WORKFLOWRUNBUFFER_WAIT_FOR_FLUSH` | Workflow run buffer wait for flush | |
| `SERVER_WORKFLOWRUNBUFFER_MAX_CONCURRENT` | Max concurrent workflow run buffer ops | |
| `SERVER_WORKFLOWRUNBUFFER_FLUSH_PERIOD_MILLISECONDS` | Flush period for workflow run buffer | |
| `SERVER_WORKFLOWRUNBUFFER_FLUSH_ITEMS_THRESHOLD` | Items threshold for workflow run buffer | |
| `SERVER_WORKFLOWRUNBUFFER_FLUSH_STRATEGY` | Flush strategy for workflow run buffer | |
| `SERVER_EVENTBUFFER_WAIT_FOR_FLUSH` | Event buffer wait for flush | |
| `SERVER_EVENTBUFFER_MAX_CONCURRENT` | Max concurrent event buffer ops | |
| `SERVER_EVENTBUFFER_FLUSH_PERIOD_MILLISECONDS` | Flush period for event buffer | |
| `SERVER_EVENTBUFFER_FLUSH_ITEMS_THRESHOLD` | Items threshold for event buffer | |
| `SERVER_EVENTBUFFER_SERIAL_BUFFER` | Event buffer serial mode | |
| `SERVER_EVENTBUFFER_FLUSH_STRATEGY` | Flush strategy for event buffer | |
| `SERVER_RELEASESEMAPHOREBUFFER_WAIT_FOR_FLUSH` | Release semaphore buffer wait for flush | |
| `SERVER_RELEASESEMAPHOREBUFFER_MAX_CONCURRENT` | Max concurrent release semaphore buffer ops | |
| `SERVER_RELEASESEMAPHOREBUFFER_FLUSH_PERIOD_MILLISECONDS` | Flush period for release semaphore buffer | |
| `SERVER_RELEASESEMAPHOREBUFFER_FLUSH_ITEMS_THRESHOLD` | Items threshold for release semaphore buffer | |
| `SERVER_RELEASESEMAPHOREBUFFER_FLUSH_STRATEGY` | Flush strategy for release semaphore buffer | |
| `SERVER_QUEUESTEPRUNBUFFER_WAIT_FOR_FLUSH` | Queue step run buffer wait for flush | |
| `SERVER_QUEUESTEPRUNBUFFER_MAX_CONCURRENT` | Max concurrent queue step run buffer ops | |
| `SERVER_QUEUESTEPRUNBUFFER_FLUSH_PERIOD_MILLISECONDS` | Flush period for queue step run buffer | |
| `SERVER_QUEUESTEPRUNBUFFER_FLUSH_ITEMS_THRESHOLD` | Items threshold for queue step run buffer | |
| `SERVER_QUEUESTEPRUNBUFFER_FLUSH_STRATEGY` | Flush strategy for queue step run buffer | |
## OLAP Database Configuration

2
go.mod
View File

@@ -37,9 +37,11 @@ require (
go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho v0.64.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.64.0
go.opentelemetry.io/otel v1.39.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.39.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.39.0
go.opentelemetry.io/otel/sdk v1.39.0
go.opentelemetry.io/otel/sdk/metric v1.39.0
go.opentelemetry.io/otel/trace v1.39.0
go.uber.org/goleak v1.3.0
golang.org/x/time v0.14.0

2
go.sum
View File

@@ -413,6 +413,8 @@ go.opentelemetry.io/contrib/propagators/b3 v1.39.0 h1:PI7pt9pkSnimWcp5sQhUA9OzLb
go.opentelemetry.io/contrib/propagators/b3 v1.39.0/go.mod h1:5gV/EzPnfYIwjzj+6y8tbGW2PKWhcsz5e/7twptRVQY=
go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48=
go.opentelemetry.io/otel v1.39.0/go.mod h1:kLlFTywNWrFyEdH0oj2xK0bFYZtHRYUdv1NklR/tgc8=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.39.0 h1:cEf8jF6WbuGQWUVcqgyWtTR0kOOAWY1DYZ+UhvdmQPw=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.39.0/go.mod h1:k1lzV5n5U3HkGvTCJHraTAGJ7MqsgL1wrGwTj1Isfiw=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0 h1:f0cb2XPmrqn4XMy9PNliTgRKJgS5WcL/u0/WRYGz4t0=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0/go.mod h1:vnakAaFckOMiMtOIhFI2MNH4FYrZzXCYxmb1LlhoGz8=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.39.0 h1:in9O8ESIOlwJAEGTkkf34DesGRAc/Pn8qJ7k3r/42LM=

View File

@@ -455,18 +455,6 @@ func (t *MessageQueueImpl) pubMessage(ctx context.Context, q msgqueue.Queue, msg
return nil
}
func getMessageSize(m *msgqueue.Message) int {
payloadSize := getPayloadSize(m.Payloads)
size := payloadSize + len(m.TenantID) + len(m.ID) + 4 // 4 bytes for other fields
for k, v := range m.OtelCarrier {
size += len(k) + len(v)
}
return size
}
// Subscribe subscribes to the msg queue.
func (t *MessageQueueImpl) Subscribe(
q msgqueue.Queue,

View File

@@ -0,0 +1,508 @@
package metrics
import (
"context"
"fmt"
"time"
"github.com/go-co-op/gocron/v2"
"github.com/rs/zerolog"
"github.com/hatchet-dev/hatchet/internal/services/partition"
"github.com/hatchet-dev/hatchet/pkg/config/server"
hatcheterrors "github.com/hatchet-dev/hatchet/pkg/errors"
"github.com/hatchet-dev/hatchet/pkg/logger"
v1 "github.com/hatchet-dev/hatchet/pkg/repository/v1"
"github.com/hatchet-dev/hatchet/pkg/telemetry"
)
// MetricsCollector collects and reports database and system metrics to OTel
type MetricsCollector interface {
Start() (func() error, error)
}
type MetricsCollectorImpl struct {
l *zerolog.Logger
repo v1.Repository
recorder *telemetry.MetricsRecorder
s gocron.Scheduler
a *hatcheterrors.Wrapped
p *partition.Partition
dbHealthInterval time.Duration
olapInterval time.Duration
workerInterval time.Duration
yesterdayRunCountHour uint
yesterdayRunCountMinute uint
}
type MetricsCollectorOpt func(*MetricsCollectorOpts)
type MetricsCollectorOpts struct {
l *zerolog.Logger
repo v1.Repository
alerter hatcheterrors.Alerter
p *partition.Partition
dbHealthInterval time.Duration
olapInterval time.Duration
workerInterval time.Duration
yesterdayRunCountHour uint
yesterdayRunCountMinute uint
}
func defaultMetricsCollectorOpts() *MetricsCollectorOpts {
l := logger.NewDefaultLogger("metrics-collector")
alerter := hatcheterrors.NoOpAlerter{}
return &MetricsCollectorOpts{
l: &l,
alerter: alerter,
dbHealthInterval: 60 * time.Second,
olapInterval: 5 * time.Minute,
workerInterval: 60 * time.Second,
yesterdayRunCountHour: 0,
yesterdayRunCountMinute: 5,
}
}
func WithLogger(l *zerolog.Logger) MetricsCollectorOpt {
return func(opts *MetricsCollectorOpts) {
opts.l = l
}
}
func WithRepository(r v1.Repository) MetricsCollectorOpt {
return func(opts *MetricsCollectorOpts) {
opts.repo = r
}
}
func WithAlerter(a hatcheterrors.Alerter) MetricsCollectorOpt {
return func(opts *MetricsCollectorOpts) {
opts.alerter = a
}
}
func WithPartition(p *partition.Partition) MetricsCollectorOpt {
return func(opts *MetricsCollectorOpts) {
opts.p = p
}
}
func WithIntervals(config server.CronOperationsConfigFile) MetricsCollectorOpt {
return func(opts *MetricsCollectorOpts) {
opts.dbHealthInterval = config.DBHealthMetricsInterval
opts.olapInterval = config.OLAPMetricsInterval
opts.workerInterval = config.WorkerMetricsInterval
opts.yesterdayRunCountHour = config.YesterdayRunCountHour
opts.yesterdayRunCountMinute = config.YesterdayRunCountMinute
}
}
func New(fs ...MetricsCollectorOpt) (*MetricsCollectorImpl, error) {
opts := defaultMetricsCollectorOpts()
for _, f := range fs {
f(opts)
}
if opts.repo == nil {
return nil, fmt.Errorf("repository is required. use WithRepository")
}
if opts.p == nil {
return nil, fmt.Errorf("partition is required. use WithPartition")
}
newLogger := opts.l.With().Str("service", "metrics-collector").Logger()
opts.l = &newLogger
recorder, err := telemetry.NewMetricsRecorder(context.Background())
if err != nil {
return nil, fmt.Errorf("could not create metrics recorder: %w", err)
}
s, err := gocron.NewScheduler(gocron.WithLocation(time.UTC))
if err != nil {
return nil, fmt.Errorf("could not create scheduler: %w", err)
}
a := hatcheterrors.NewWrapped(opts.alerter)
a.WithData(map[string]interface{}{"service": "metrics-collector"})
return &MetricsCollectorImpl{
l: opts.l,
repo: opts.repo,
recorder: recorder,
s: s,
a: a,
p: opts.p,
dbHealthInterval: opts.dbHealthInterval,
olapInterval: opts.olapInterval,
workerInterval: opts.workerInterval,
yesterdayRunCountHour: opts.yesterdayRunCountHour,
yesterdayRunCountMinute: opts.yesterdayRunCountMinute,
}, nil
}
func (mc *MetricsCollectorImpl) Start() (func() error, error) {
mc.s.Start()
ctx := context.Background()
// Collect database health metrics
_, err := mc.s.NewJob(
gocron.DurationJob(mc.dbHealthInterval),
gocron.NewTask(mc.collectDatabaseHealthMetrics(ctx)),
gocron.WithSingletonMode(gocron.LimitModeReschedule),
)
if err != nil {
return nil, fmt.Errorf("could not schedule database health metrics collection: %w", err)
}
mc.l.Info().Str("interval", mc.dbHealthInterval.String()).Msg("scheduled database health metrics collection")
// Collect OLAP metrics
_, err = mc.s.NewJob(
gocron.DurationJob(mc.olapInterval),
gocron.NewTask(mc.collectOLAPMetrics(ctx)),
gocron.WithSingletonMode(gocron.LimitModeReschedule),
)
if err != nil {
return nil, fmt.Errorf("could not schedule OLAP metrics collection: %w", err)
}
mc.l.Info().Str("interval", mc.olapInterval.String()).Msg("scheduled OLAP metrics collection")
// Collect worker metrics
_, err = mc.s.NewJob(
gocron.DurationJob(mc.workerInterval),
gocron.NewTask(mc.collectWorkerMetrics(ctx)),
gocron.WithSingletonMode(gocron.LimitModeReschedule),
)
if err != nil {
return nil, fmt.Errorf("could not schedule worker metrics collection: %w", err)
}
mc.l.Info().Str("interval", mc.workerInterval.String()).Msg("scheduled worker metrics collection")
// Collect yesterday's run count once per day
_, err = mc.s.NewJob(
gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(mc.yesterdayRunCountHour, mc.yesterdayRunCountMinute, 0))),
gocron.NewTask(mc.collectYesterdayRunCounts(ctx)),
gocron.WithSingletonMode(gocron.LimitModeReschedule),
)
if err != nil {
return nil, fmt.Errorf("could not schedule yesterday run counts collection: %w", err)
}
mc.l.Info().Uint("hour", mc.yesterdayRunCountHour).Uint("minute", mc.yesterdayRunCountMinute).Msg("scheduled yesterday run counts collection")
cleanup := func() error {
if err := mc.s.Shutdown(); err != nil {
return fmt.Errorf("could not shutdown scheduler: %w", err)
}
return nil
}
return cleanup, nil
}
func (mc *MetricsCollectorImpl) collectDatabaseHealthMetrics(ctx context.Context) func() {
return func() {
ctx, span := telemetry.NewSpan(ctx, "collect database_health_metrics")
defer span.End()
// Only run on the engine instance that has control over the internal tenant
tenant, err := mc.p.GetInternalTenantForController(ctx)
if err != nil {
mc.l.Error().Err(err).Msg("could not get internal tenant")
return
}
if tenant == nil {
// This engine instance doesn't have control over the internal tenant
return
}
mc.l.Debug().Msg("collecting database health metrics")
// Check if track_counts is enabled
trackCountsEnabled, err := mc.repo.PGHealth().TrackCountsEnabled(ctx)
if err != nil {
mc.l.Error().Err(err).Msg("failed to check track_counts setting")
} else if !trackCountsEnabled {
mc.l.Error().Msg("track_counts is disabled - database health metrics require track_counts = on. Run 'ALTER SYSTEM SET track_counts = on; SELECT pg_reload_conf();' to enable it.")
}
// Check bloat
bloatStatus, bloatCount, err := mc.repo.PGHealth().CheckBloat(ctx)
if err != nil {
mc.l.Error().Err(err).Msg("failed to check database bloat")
} else {
mc.recorder.RecordDBBloat(ctx, int64(bloatCount), string(bloatStatus))
mc.l.Debug().Int("count", bloatCount).Str("status", string(bloatStatus)).Msg("recorded database bloat metric")
}
// Get detailed bloat metrics per table
bloatDetails, err := mc.repo.PGHealth().GetBloatDetails(ctx)
if err != nil {
mc.l.Error().Err(err).Msg("failed to get bloat details")
} else if len(bloatDetails) > 0 {
mc.l.Info().Int("table_count", len(bloatDetails)).Msg("recording bloat details per table")
for _, row := range bloatDetails {
if row.DeadPct.Valid {
deadPct, err := row.DeadPct.Float64Value()
if err == nil {
tableName := row.Tablename.String
mc.recorder.RecordDBBloatPercent(ctx, tableName, deadPct.Float64)
mc.l.Debug().
Str("table", tableName).
Float64("dead_pct", deadPct.Float64).
Msg("recorded bloat percent metric")
}
}
}
}
// Check long-running queries
_, longRunningCount, err := mc.repo.PGHealth().CheckLongRunningQueries(ctx)
if err != nil {
mc.l.Error().Err(err).Msg("failed to check long-running queries")
} else {
mc.recorder.RecordDBLongRunningQueries(ctx, int64(longRunningCount))
mc.l.Debug().Int("count", longRunningCount).Msg("recorded long-running queries metric")
}
// Check query cache hit ratios
tables, err := mc.repo.PGHealth().CheckQueryCaches(ctx)
if err != nil {
mc.l.Error().Err(err).Msg("failed to check query cache")
} else if len(tables) == 0 {
mc.l.Info().Msg("no query cache data available (pg_stat_statements may not be enabled or track_counts may be disabled)")
} else {
mc.l.Info().Int("table_count", len(tables)).Msg("recording query cache hit ratios")
for _, table := range tables {
tableName := table.Tablename.String
hitRatio := table.CacheHitRatioPct
mc.recorder.RecordDBQueryCacheHitRatio(ctx, tableName, hitRatio)
mc.l.Debug().
Str("table", tableName).
Float64("hit_ratio", hitRatio).
Msg("recorded query cache hit ratio metric")
}
}
// Check long-running vacuum
vacuumStatus, vacuumCount, err := mc.repo.PGHealth().CheckLongRunningVacuum(ctx)
if err != nil {
mc.l.Error().Err(err).Msg("failed to check long-running vacuum")
} else {
mc.recorder.RecordDBLongRunningVacuum(ctx, int64(vacuumCount), string(vacuumStatus))
mc.l.Debug().Int("count", vacuumCount).Str("status", string(vacuumStatus)).Msg("recorded long-running vacuum metric")
}
autovacuumRows, err := mc.repo.PGHealth().CheckLastAutovacuumForPartitionedTables(ctx)
if err != nil {
mc.l.Error().Err(err).Msg("failed to check last autovacuum for partitioned tables (OLAP DB)")
} else if len(autovacuumRows) == 0 {
mc.l.Warn().Msg("no partitioned tables found for autovacuum tracking (OLAP DB)")
} else {
mc.l.Info().Int("table_count", len(autovacuumRows)).Msg("recording last autovacuum metrics (OLAP DB)")
validCount := 0
for _, row := range autovacuumRows {
if row.SecondsSinceLastAutovacuum.Valid {
seconds, err := row.SecondsSinceLastAutovacuum.Float64Value()
if err == nil {
tableName := row.Tablename.String
mc.recorder.RecordDBLastAutovacuumSecondsSince(ctx, tableName, seconds.Float64)
mc.l.Debug().
Str("table", tableName).
Float64("seconds_since", seconds.Float64).
Msg("recorded last autovacuum metric (OLAP DB)")
validCount++
}
}
}
if validCount == 0 {
mc.l.Warn().Int("table_count", len(autovacuumRows)).Msg("found partitioned tables but none have been autovacuumed yet (OLAP DB)")
}
}
autovacuumRowsCoreDB, err := mc.repo.PGHealth().CheckLastAutovacuumForPartitionedTablesCoreDB(ctx)
if err != nil {
mc.l.Error().Err(err).Msg("failed to check last autovacuum for partitioned tables (CORE DB)")
} else if len(autovacuumRowsCoreDB) == 0 {
mc.l.Warn().Msg("no partitioned tables found for autovacuum tracking (CORE DB)")
} else {
mc.l.Info().Int("table_count", len(autovacuumRowsCoreDB)).Msg("recording last autovacuum metrics (CORE DB)")
validCount := 0
for _, row := range autovacuumRowsCoreDB {
if row.SecondsSinceLastAutovacuum.Valid {
seconds, err := row.SecondsSinceLastAutovacuum.Float64Value()
if err == nil {
tableName := row.Tablename.String
mc.recorder.RecordDBLastAutovacuumSecondsSince(ctx, tableName, seconds.Float64)
mc.l.Debug().
Str("table", tableName).
Float64("seconds_since", seconds.Float64).
Msg("recorded last autovacuum metric (CORE DB)")
validCount++
}
}
}
if validCount == 0 {
mc.l.Warn().Int("table_count", len(autovacuumRowsCoreDB)).Msg("found partitioned tables but none have been autovacuumed yet (CORE DB)")
}
}
mc.l.Debug().Msg("finished collecting database health metrics")
}
}
func (mc *MetricsCollectorImpl) collectOLAPMetrics(ctx context.Context) func() {
return func() {
ctx, span := telemetry.NewSpan(ctx, "collect olap_metrics")
defer span.End()
// Only run on the engine instance that has control over the internal tenant
tenant, err := mc.p.GetInternalTenantForController(ctx)
if err != nil {
mc.l.Error().Err(err).Msg("could not get internal tenant")
return
}
if tenant == nil {
// This engine instance doesn't have control over the internal tenant
return
}
mc.l.Debug().Msg("collecting OLAP metrics")
// Count DAG status updates temp table size (instance-wide)
dagSize, err := mc.repo.OLAP().CountOLAPTempTableSizeForDAGStatusUpdates(ctx)
if err != nil {
mc.l.Error().Err(err).Msg("failed to count DAG temp table size")
} else {
mc.recorder.RecordOLAPTempTableSizeDAG(ctx, dagSize)
mc.l.Debug().Int64("size", dagSize).Msg("recorded DAG temp table size metric")
}
// Count task status updates temp table size (instance-wide)
taskSize, err := mc.repo.OLAP().CountOLAPTempTableSizeForTaskStatusUpdates(ctx)
if err != nil {
mc.l.Error().Err(err).Msg("failed to count task temp table size")
} else {
mc.recorder.RecordOLAPTempTableSizeTask(ctx, taskSize)
mc.l.Debug().Int64("size", taskSize).Msg("recorded task temp table size metric")
}
mc.l.Debug().Msg("finished collecting OLAP metrics")
}
}
func (mc *MetricsCollectorImpl) collectYesterdayRunCounts(ctx context.Context) func() {
return func() {
ctx, span := telemetry.NewSpan(ctx, "collect yesterday_run_counts")
defer span.End()
// Only run on the engine instance that has control over the internal tenant
tenant, err := mc.p.GetInternalTenantForController(ctx)
if err != nil {
mc.l.Error().Err(err).Msg("could not get internal tenant")
return
}
if tenant == nil {
// This engine instance doesn't have control over the internal tenant
return
}
mc.l.Debug().Msg("collecting yesterday's run counts")
// Get yesterday's run counts by status (instance-wide)
runCounts, err := mc.repo.OLAP().ListYesterdayRunCountsByStatus(ctx)
if err != nil {
mc.l.Error().Err(err).Msg("failed to get yesterday's run counts")
return
}
for status, count := range runCounts {
mc.recorder.RecordYesterdayRunCount(ctx, string(status), count)
mc.l.Debug().Str("status", string(status)).Int64("count", count).Msg("recorded yesterday run count metric")
}
mc.l.Debug().Msg("finished collecting yesterday's run counts")
}
}
func (mc *MetricsCollectorImpl) collectWorkerMetrics(ctx context.Context) func() {
return func() {
ctx, span := telemetry.NewSpan(ctx, "collect worker_metrics")
defer span.End()
// Only run on the engine instance that has control over the internal tenant
tenant, err := mc.p.GetInternalTenantForController(ctx)
if err != nil {
mc.l.Error().Err(err).Msg("could not get internal tenant")
return
}
if tenant == nil {
// This engine instance doesn't have control over the internal tenant
return
}
mc.l.Debug().Msg("collecting worker metrics")
// Count active slots per tenant
activeSlots, err := mc.repo.Workers().CountActiveSlotsPerTenant()
if err != nil {
mc.l.Error().Err(err).Msg("failed to count active slots per tenant")
} else if len(activeSlots) == 0 {
mc.l.Debug().Msg("no active worker slots found")
} else {
mc.l.Info().Int("tenant_count", len(activeSlots)).Msg("recording active slots metrics")
for tenantId, count := range activeSlots {
mc.recorder.RecordActiveSlots(ctx, tenantId, count)
mc.l.Debug().Str("tenant_id", tenantId).Int64("count", count).Msg("recorded active slots metric")
}
}
// Count active workers per tenant
activeWorkers, err := mc.repo.Workers().CountActiveWorkersPerTenant()
if err != nil {
mc.l.Error().Err(err).Msg("failed to count active workers per tenant")
} else if len(activeWorkers) == 0 {
mc.l.Debug().Msg("no active workers found")
} else {
mc.l.Info().Int("tenant_count", len(activeWorkers)).Msg("recording active workers metrics")
for tenantId, count := range activeWorkers {
mc.recorder.RecordActiveWorkers(ctx, tenantId, count)
mc.l.Debug().Str("tenant_id", tenantId).Int64("count", count).Msg("recorded active workers metric")
}
}
// Count active SDKs per tenant
activeSDKs, err := mc.repo.Workers().ListActiveSDKsPerTenant()
if err != nil {
mc.l.Error().Err(err).Msg("failed to list active SDKs per tenant")
} else if len(activeSDKs) == 0 {
mc.l.Debug().Msg("no active SDKs found")
} else {
mc.l.Info().Int("sdk_count", len(activeSDKs)).Msg("recording active SDKs metrics")
for tuple, count := range activeSDKs {
sdkInfo := telemetry.SDKInfo{
OperatingSystem: tuple.SDK.OperatingSystem,
Language: tuple.SDK.Language,
LanguageVersion: tuple.SDK.LanguageVersion,
SdkVersion: tuple.SDK.SdkVersion,
}
mc.recorder.RecordActiveSDKs(ctx, tuple.TenantId, sdkInfo, count)
mc.l.Debug().
Str("tenant_id", tuple.TenantId).
Int64("count", count).
Str("sdk_language", sdkInfo.Language).
Str("sdk_version", sdkInfo.SdkVersion).
Msg("recorded active SDKs metric")
}
}
mc.l.Debug().Msg("finished collecting worker metrics")
}
}

View File

@@ -3,8 +3,9 @@ package task
import (
"context"
"github.com/hatchet-dev/hatchet/pkg/telemetry"
"go.opentelemetry.io/otel/codes"
"github.com/hatchet-dev/hatchet/pkg/telemetry"
)
func (tc *TasksControllerImpl) processPayloadExternalCutovers(ctx context.Context) func() {

View File

@@ -231,12 +231,13 @@ func (d *DispatcherImpl) handleTaskBulkAssignedTask(ctx context.Context, msg *ms
)
if err != nil {
multiErr = multierror.Append(
multiErr,
fmt.Errorf("could not create monitoring event for task %d: %w", task.ID, err),
)
d.l.Error().Err(err).Int64("task_id", task.ID).Msg("could not create monitoring event")
} else {
defer d.pubBuffer.Pub(ctx, msgqueuev1.OLAP_QUEUE, msg, false)
defer func() {
if err := d.pubBuffer.Pub(ctx, msgqueuev1.OLAP_QUEUE, msg, false); err != nil {
d.l.Error().Err(err).Msg("could not publish monitoring event")
}
}()
}
return nil

View File

@@ -11,9 +11,9 @@ import (
"github.com/rs/zerolog"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
_ "google.golang.org/grpc/encoding/gzip" // Register gzip compression codec
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
_ "google.golang.org/grpc/encoding/gzip" // Register gzip compression codec
dispatchercontracts "github.com/hatchet-dev/hatchet/internal/services/dispatcher/contracts"
sharedcontracts "github.com/hatchet-dev/hatchet/internal/services/shared/proto/v1"

View File

@@ -132,6 +132,21 @@ type CronOperationsConfigFile struct {
// OLAPAnalyzeCronInterval is the interval for the olap analyze cron operation
OLAPAnalyzeCronInterval time.Duration `mapstructure:"olapAnalyzeCronInterval" json:"olapAnalyzeCronInterval,omitempty" default:"3h"`
// DBHealthMetricsInterval is the interval for collecting database health metrics
DBHealthMetricsInterval time.Duration `mapstructure:"dbHealthMetricsInterval" json:"dbHealthMetricsInterval,omitempty" default:"60s"`
// OLAPMetricsInterval is the interval for collecting OLAP metrics
OLAPMetricsInterval time.Duration `mapstructure:"olapMetricsInterval" json:"olapMetricsInterval,omitempty" default:"5m"`
// WorkerMetricsInterval is the interval for collecting worker metrics
WorkerMetricsInterval time.Duration `mapstructure:"workerMetricsInterval" json:"workerMetricsInterval,omitempty" default:"60s"`
// YesterdayRunCountHour is the hour (0-23) at which to collect yesterday's run count metrics
YesterdayRunCountHour uint `mapstructure:"yesterdayRunCountHour" json:"yesterdayRunCountHour,omitempty" default:"0"`
// YesterdayRunCountMinute is the minute (0-59) at which to collect yesterday's run count metrics
YesterdayRunCountMinute uint `mapstructure:"yesterdayRunCountMinute" json:"yesterdayRunCountMinute,omitempty" default:"5"`
}
// OLAPStatusUpdateConfigFile is the configuration for OLAP status updates
@@ -873,6 +888,7 @@ func BindAllEnv(v *viper.Viper) {
_ = v.BindEnv("otel.traceIdRatio", "SERVER_OTEL_TRACE_ID_RATIO")
_ = v.BindEnv("otel.insecure", "SERVER_OTEL_INSECURE")
_ = v.BindEnv("otel.collectorAuth", "SERVER_OTEL_COLLECTOR_AUTH")
_ = v.BindEnv("otel.metricsEnabled", "SERVER_OTEL_METRICS_ENABLED")
// prometheus options
_ = v.BindEnv("prometheus.prometheusServerURL", "SERVER_PROMETHEUS_SERVER_URL")

View File

@@ -25,6 +25,7 @@ type OpenTelemetryConfigFile struct {
TraceIdRatio string `mapstructure:"traceIdRatio" json:"traceIdRatio,omitempty" default:"1"`
Insecure bool `mapstructure:"insecure" json:"insecure,omitempty" default:"false"`
CollectorAuth string `mapstructure:"collectorAuth" json:"collectorAuth,omitempty"`
MetricsEnabled bool `mapstructure:"metricsEnabled" json:"metricsEnabled,omitempty" default:"false"`
}
type PrometheusConfigFile struct {

View File

@@ -270,6 +270,10 @@ type OLAPRepository interface {
ListWorkflowRunExternalIds(ctx context.Context, tenantId string, opts ListWorkflowRunOpts) ([]pgtype.UUID, error)
ProcessOLAPPayloadCutovers(ctx context.Context, externalStoreEnabled bool, inlineStoreTTL *time.Duration, externalCutoverBatchSize, externalCutoverNumConcurrentOffloads int32) error
CountOLAPTempTableSizeForDAGStatusUpdates(ctx context.Context) (int64, error)
CountOLAPTempTableSizeForTaskStatusUpdates(ctx context.Context) (int64, error)
ListYesterdayRunCountsByStatus(ctx context.Context) (map[sqlcv1.V1ReadableStatusOlap]int64, error)
}
type StatusUpdateBatchSizeLimits struct {
@@ -2461,7 +2465,7 @@ func (r *OLAPRepositoryImpl) PutPayloads(ctx context.Context, tx sqlcv1.DBTX, te
for i, opt := range putPayloadOpts {
storeOpts := OffloadToExternalStoreOpts{
TenantId: TenantID(tenantId),
TenantId: tenantId,
ExternalID: PayloadExternalId(opt.ExternalId.String()),
InsertedAt: opt.InsertedAt,
Payload: opt.Payload,
@@ -2745,6 +2749,30 @@ func (r *OLAPRepositoryImpl) StatusUpdateBatchSizeLimits() StatusUpdateBatchSize
return r.statusUpdateBatchSizeLimits
}
func (r *OLAPRepositoryImpl) CountOLAPTempTableSizeForDAGStatusUpdates(ctx context.Context) (int64, error) {
return r.queries.CountOLAPTempTableSizeForDAGStatusUpdates(ctx, r.readPool)
}
func (r *OLAPRepositoryImpl) CountOLAPTempTableSizeForTaskStatusUpdates(ctx context.Context) (int64, error) {
return r.queries.CountOLAPTempTableSizeForTaskStatusUpdates(ctx, r.readPool)
}
func (r *OLAPRepositoryImpl) ListYesterdayRunCountsByStatus(ctx context.Context) (map[sqlcv1.V1ReadableStatusOlap]int64, error) {
rows, err := r.queries.ListYesterdayRunCountsByStatus(ctx, r.readPool)
if err != nil {
return nil, err
}
statusToCount := make(map[sqlcv1.V1ReadableStatusOlap]int64)
for _, row := range rows {
statusToCount[row.ReadableStatus] = row.Count
}
return statusToCount, nil
}
type BulkCutOverOLAPPayload struct {
TenantID pgtype.UUID
InsertedAt pgtype.Timestamptz

View File

@@ -10,15 +10,16 @@ import (
"time"
"github.com/google/uuid"
"github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers"
"github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1"
"github.com/hatchet-dev/hatchet/pkg/telemetry"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel/attribute"
"golang.org/x/sync/errgroup"
"github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers"
"github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1"
"github.com/hatchet-dev/hatchet/pkg/telemetry"
)
type StorePayloadOpts struct {

View File

@@ -0,0 +1,273 @@
package v1
import (
"context"
"time"
"github.com/hatchet-dev/hatchet/pkg/repository/cache"
"github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1"
)
// PGHealthRepository provides database health monitoring functionality.
//
// Important: Many health checks rely on PostgreSQL's statistics collector,
// which requires track_counts = on (enabled by default). If track_counts
// is disabled, queries against pg_stat_user_tables and pg_statio_user_tables
// will return empty results, and health checks will report no data available.
//
// To verify track_counts is enabled, run:
// SHOW track_counts;
//
// To enable track_counts (requires superuser or appropriate permissions):
// ALTER SYSTEM SET track_counts = on;
// SELECT pg_reload_conf();
type PGHealthError string
const (
PGHealthAlert PGHealthError = "alert"
PGHealthWarn PGHealthError = "warn"
PGHealthOK PGHealthError = "ok"
)
type PGHealthRepository interface {
PGStatStatementsEnabled(ctx context.Context) (bool, error)
TrackCountsEnabled(ctx context.Context) (bool, error)
CheckBloat(ctx context.Context) (PGHealthError, int, error)
GetBloatDetails(ctx context.Context) ([]*sqlcv1.CheckBloatRow, error)
CheckLongRunningQueries(ctx context.Context) (PGHealthError, int, error)
CheckQueryCache(ctx context.Context) (PGHealthError, int, error)
CheckQueryCaches(ctx context.Context) ([]*sqlcv1.CheckQueryCachesRow, error)
CheckLongRunningVacuum(ctx context.Context) (PGHealthError, int, error)
CheckLastAutovacuumForPartitionedTables(ctx context.Context) ([]*sqlcv1.CheckLastAutovacuumForPartitionedTablesRow, error)
CheckLastAutovacuumForPartitionedTablesCoreDB(ctx context.Context) ([]*sqlcv1.CheckLastAutovacuumForPartitionedTablesCoreDBRow, error)
}
type pgHealthRepository struct {
*sharedRepository
pgStatStatementsCache *cache.Cache
trackCountsCache *cache.Cache
}
const (
pgStatStatementsCacheKey = "pg_stat_statements_enabled"
trackCountsCacheKey = "track_counts_enabled"
)
func newPGHealthRepository(shared *sharedRepository) *pgHealthRepository {
return &pgHealthRepository{
sharedRepository: shared,
pgStatStatementsCache: cache.New(10 * time.Minute),
trackCountsCache: cache.New(10 * time.Minute),
}
}
func (h *pgHealthRepository) PGStatStatementsEnabled(ctx context.Context) (bool, error) {
if cached, ok := h.pgStatStatementsCache.Get(pgStatStatementsCacheKey); ok {
if enabled, ok := cached.(bool); ok {
return enabled, nil
}
}
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
count, err := h.queries.CheckPGStatStatementsEnabled(ctx, h.pool)
if err != nil {
return false, err
}
enabled := count > 0
h.pgStatStatementsCache.Set(pgStatStatementsCacheKey, enabled)
return enabled, nil
}
func (h *pgHealthRepository) TrackCountsEnabled(ctx context.Context) (bool, error) {
if cached, ok := h.trackCountsCache.Get(trackCountsCacheKey); ok {
if enabled, ok := cached.(bool); ok {
return enabled, nil
}
}
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
var setting string
err := h.pool.QueryRow(ctx, "SHOW track_counts").Scan(&setting)
if err != nil {
return false, err
}
enabled := setting == "on"
h.trackCountsCache.Set(trackCountsCacheKey, enabled)
return enabled, nil
}
func (h *pgHealthRepository) CheckBloat(ctx context.Context) (PGHealthError, int, error) {
enabled, err := h.PGStatStatementsEnabled(ctx)
if err != nil {
return PGHealthOK, 0, err
}
if !enabled {
return PGHealthOK, 0, nil
}
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
rows, err := h.queries.CheckBloat(ctx, h.pool)
if err != nil {
return PGHealthOK, 0, err
}
if len(rows) == 0 {
return PGHealthOK, 0, nil
}
return PGHealthWarn, len(rows), nil
}
func (h *pgHealthRepository) GetBloatDetails(ctx context.Context) ([]*sqlcv1.CheckBloatRow, error) {
enabled, err := h.PGStatStatementsEnabled(ctx)
if err != nil {
return nil, err
}
if !enabled {
return nil, nil
}
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
return h.queries.CheckBloat(ctx, h.pool)
}
func (h *pgHealthRepository) CheckLongRunningQueries(ctx context.Context) (PGHealthError, int, error) {
enabled, err := h.PGStatStatementsEnabled(ctx)
if err != nil {
return PGHealthOK, 0, err
}
if !enabled {
return PGHealthOK, 0, nil
}
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
rows, err := h.queries.CheckLongRunningQueries(ctx, h.pool)
if err != nil {
return PGHealthOK, 0, err
}
return PGHealthOK, len(rows), nil
}
func (h *pgHealthRepository) CheckQueryCache(ctx context.Context) (PGHealthError, int, error) {
enabled, err := h.PGStatStatementsEnabled(ctx)
if err != nil {
return PGHealthOK, 0, err
}
if !enabled {
return PGHealthOK, 0, nil
}
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
tables, err := h.queries.CheckQueryCaches(ctx, h.pool)
if err != nil {
return PGHealthOK, 0, err
}
problemTables := make(map[string]float64)
for _, table := range tables {
hitRatio := table.CacheHitRatioPct
if hitRatio < 95.0 && hitRatio > 10.0 {
problemTables[table.Tablename.String] = hitRatio
}
}
if len(problemTables) > 0 {
return PGHealthWarn, len(problemTables), nil
}
return PGHealthOK, 0, nil
}
func (h *pgHealthRepository) CheckQueryCaches(ctx context.Context) ([]*sqlcv1.CheckQueryCachesRow, error) {
enabled, err := h.PGStatStatementsEnabled(ctx)
if err != nil {
return nil, err
}
if !enabled {
return nil, nil
}
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
return h.queries.CheckQueryCaches(ctx, h.pool)
}
func (h *pgHealthRepository) CheckLongRunningVacuum(ctx context.Context) (PGHealthError, int, error) {
enabled, err := h.PGStatStatementsEnabled(ctx)
if err != nil {
return PGHealthOK, 0, err
}
if !enabled {
return PGHealthOK, 0, nil
}
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
rows, err := h.queries.LongRunningVacuum(ctx, h.pool)
if err != nil {
return PGHealthOK, 0, err
}
if len(rows) == 0 {
return PGHealthOK, 0, nil
}
for _, row := range rows {
if row.ElapsedTime > int32(time.Hour.Seconds()*10) {
return PGHealthAlert, len(rows), nil
}
}
return PGHealthWarn, len(rows), nil
}
func (h *pgHealthRepository) CheckLastAutovacuumForPartitionedTables(ctx context.Context) ([]*sqlcv1.CheckLastAutovacuumForPartitionedTablesRow, error) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
return h.queries.CheckLastAutovacuumForPartitionedTables(ctx, h.pool)
}
func (h *pgHealthRepository) CheckLastAutovacuumForPartitionedTablesCoreDB(ctx context.Context) ([]*sqlcv1.CheckLastAutovacuumForPartitionedTablesCoreDBRow, error) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
return h.queries.CheckLastAutovacuumForPartitionedTablesCoreDB(ctx, h.pool)
}

View File

@@ -35,6 +35,7 @@ type Repository interface {
Webhooks() WebhookRepository
Idempotency() IdempotencyRepository
IntervalSettings() IntervalSettingsRepository
PGHealth() PGHealthRepository
}
type repositoryImpl struct {
@@ -52,6 +53,7 @@ type repositoryImpl struct {
payloadStore PayloadStoreRepository
idempotency IdempotencyRepository
intervals IntervalSettingsRepository
pgHealth PGHealthRepository
}
func NewRepository(
@@ -83,6 +85,7 @@ func NewRepository(
payloadStore: shared.payloadStore,
idempotency: newIdempotencyRepository(shared),
intervals: newIntervalSettingsRepository(shared),
pgHealth: newPGHealthRepository(shared),
}
return impl, func() error {
@@ -157,3 +160,7 @@ func (r *repositoryImpl) Idempotency() IdempotencyRepository {
func (r *repositoryImpl) IntervalSettings() IntervalSettingsRepository {
return r.intervals
}
func (r *repositoryImpl) PGHealth() PGHealthRepository {
return r.pgHealth
}

View File

@@ -7,6 +7,7 @@ package sqlcv1
import (
"database/sql/driver"
"fmt"
"net/netip"
"github.com/jackc/pgx/v5/pgtype"
)
@@ -2432,6 +2433,102 @@ type MessageQueueItem struct {
Status MessageQueueItemStatus `json:"status"`
}
type PgAvailableExtensions struct {
Name pgtype.Text `json:"name"`
DefaultVersion pgtype.Text `json:"default_version"`
InstalledVersion pgtype.Text `json:"installed_version"`
Comment pgtype.Text `json:"comment"`
}
type PgStatActivity struct {
Pid pgtype.Int4 `json:"pid"`
Usename pgtype.Text `json:"usename"`
ApplicationName pgtype.Text `json:"application_name"`
ClientAddr *netip.Addr `json:"client_addr"`
State pgtype.Text `json:"state"`
QueryStart pgtype.Timestamptz `json:"query_start"`
WaitEventType pgtype.Text `json:"wait_event_type"`
WaitEvent pgtype.Text `json:"wait_event"`
Query pgtype.Text `json:"query"`
}
type PgStatProgressVacuum struct {
Pid pgtype.Int4 `json:"pid"`
Datid pgtype.Uint32 `json:"datid"`
Datname pgtype.Text `json:"datname"`
Relid pgtype.Uint32 `json:"relid"`
Phase pgtype.Text `json:"phase"`
HeapBlksTotal pgtype.Int8 `json:"heap_blks_total"`
HeapBlksScanned pgtype.Int8 `json:"heap_blks_scanned"`
HeapBlksVacuumed pgtype.Int8 `json:"heap_blks_vacuumed"`
HeapBlksFrozen pgtype.Int8 `json:"heap_blks_frozen"`
IndexVacuumCount pgtype.Int8 `json:"index_vacuum_count"`
MaxDeadTuples pgtype.Int8 `json:"max_dead_tuples"`
NumDeadTuples pgtype.Int8 `json:"num_dead_tuples"`
}
type PgStatStatements struct {
Userid pgtype.Uint32 `json:"userid"`
Dbid pgtype.Uint32 `json:"dbid"`
Queryid pgtype.Int8 `json:"queryid"`
Query pgtype.Text `json:"query"`
Calls pgtype.Int8 `json:"calls"`
TotalExecTime pgtype.Float8 `json:"total_exec_time"`
Rows pgtype.Int8 `json:"rows"`
SharedBlksHit pgtype.Int8 `json:"shared_blks_hit"`
SharedBlksRead pgtype.Int8 `json:"shared_blks_read"`
SharedBlksDirtied pgtype.Int8 `json:"shared_blks_dirtied"`
SharedBlksWritten pgtype.Int8 `json:"shared_blks_written"`
LocalBlksHit pgtype.Int8 `json:"local_blks_hit"`
LocalBlksRead pgtype.Int8 `json:"local_blks_read"`
LocalBlksDirtied pgtype.Int8 `json:"local_blks_dirtied"`
LocalBlksWritten pgtype.Int8 `json:"local_blks_written"`
TempBlksRead pgtype.Int8 `json:"temp_blks_read"`
TempBlksWritten pgtype.Int8 `json:"temp_blks_written"`
BlkReadTime pgtype.Float8 `json:"blk_read_time"`
BlkWriteTime pgtype.Float8 `json:"blk_write_time"`
}
type PgStatUserTables struct {
Relid pgtype.Uint32 `json:"relid"`
Schemaname pgtype.Text `json:"schemaname"`
Relname pgtype.Text `json:"relname"`
SeqScan pgtype.Int8 `json:"seq_scan"`
SeqTupRead pgtype.Int8 `json:"seq_tup_read"`
IdxScan pgtype.Int8 `json:"idx_scan"`
IdxTupFetch pgtype.Int8 `json:"idx_tup_fetch"`
NTupIns pgtype.Int8 `json:"n_tup_ins"`
NTupUpd pgtype.Int8 `json:"n_tup_upd"`
NTupDel pgtype.Int8 `json:"n_tup_del"`
NTupHotUpd pgtype.Int8 `json:"n_tup_hot_upd"`
NLiveTup pgtype.Int8 `json:"n_live_tup"`
NDeadTup pgtype.Int8 `json:"n_dead_tup"`
NModSinceAnalyze pgtype.Int8 `json:"n_mod_since_analyze"`
NInsSinceVacuum pgtype.Int8 `json:"n_ins_since_vacuum"`
VacuumCount pgtype.Int8 `json:"vacuum_count"`
AutovacuumCount pgtype.Int8 `json:"autovacuum_count"`
AnalyzeCount pgtype.Int8 `json:"analyze_count"`
AutoanalyzeCount pgtype.Int8 `json:"autoanalyze_count"`
LastVacuum pgtype.Timestamptz `json:"last_vacuum"`
LastAutovacuum pgtype.Timestamptz `json:"last_autovacuum"`
LastAnalyze pgtype.Timestamptz `json:"last_analyze"`
LastAutoanalyze pgtype.Timestamptz `json:"last_autoanalyze"`
}
type PgStatioUserTables struct {
Relid pgtype.Uint32 `json:"relid"`
Schemaname pgtype.Text `json:"schemaname"`
Relname pgtype.Text `json:"relname"`
HeapBlksRead pgtype.Int8 `json:"heap_blks_read"`
HeapBlksHit pgtype.Int8 `json:"heap_blks_hit"`
IdxBlksRead pgtype.Int8 `json:"idx_blks_read"`
IdxBlksHit pgtype.Int8 `json:"idx_blks_hit"`
ToastBlksRead pgtype.Int8 `json:"toast_blks_read"`
ToastBlksHit pgtype.Int8 `json:"toast_blks_hit"`
TidxBlksRead pgtype.Int8 `json:"tidx_blks_read"`
TidxBlksHit pgtype.Int8 `json:"tidx_blks_hit"`
}
type Queue struct {
ID int64 `json:"id"`
TenantId pgtype.UUID `json:"tenantId"`

View File

@@ -1989,6 +1989,37 @@ WHERE
)
;
-- name: CountOLAPTempTableSizeForDAGStatusUpdates :one
SELECT COUNT(*) AS total
FROM v1_task_status_updates_tmp
;
-- name: CountOLAPTempTableSizeForTaskStatusUpdates :one
SELECT COUNT(*) AS total
FROM v1_task_events_olap_tmp
;
-- name: ListYesterdayRunCountsByStatus :many
SELECT readable_status, COUNT(*)
FROM v1_runs_olap
WHERE inserted_at::DATE = (NOW() - INTERVAL '1 day')::DATE
GROUP BY readable_status
;
-- name: CheckLastAutovacuumForPartitionedTables :many
SELECT
s.schemaname,
s.relname AS tablename,
s.last_autovacuum,
EXTRACT(EPOCH FROM (NOW() - s.last_autovacuum)) AS seconds_since_last_autovacuum
FROM pg_stat_user_tables s
JOIN pg_catalog.pg_class c ON c.oid = (quote_ident(s.schemaname)||'.'||quote_ident(s.relname))::regclass
WHERE s.schemaname = 'public'
AND c.relispartition = true
AND c.relkind = 'r'
ORDER BY s.last_autovacuum ASC NULLS LAST
;
-- name: ListPaginatedOLAPPayloadsForOffload :many
WITH payloads AS (
SELECT

View File

@@ -158,6 +158,52 @@ type BulkCreateEventTriggersParams struct {
FilterID pgtype.UUID `json:"filter_id"`
}
const checkLastAutovacuumForPartitionedTables = `-- name: CheckLastAutovacuumForPartitionedTables :many
SELECT
s.schemaname,
s.relname AS tablename,
s.last_autovacuum,
EXTRACT(EPOCH FROM (NOW() - s.last_autovacuum)) AS seconds_since_last_autovacuum
FROM pg_stat_user_tables s
JOIN pg_catalog.pg_class c ON c.oid = (quote_ident(s.schemaname)||'.'||quote_ident(s.relname))::regclass
WHERE s.schemaname = 'public'
AND c.relispartition = true
AND c.relkind = 'r'
ORDER BY s.last_autovacuum ASC NULLS LAST
`
type CheckLastAutovacuumForPartitionedTablesRow struct {
Schemaname pgtype.Text `json:"schemaname"`
Tablename pgtype.Text `json:"tablename"`
LastAutovacuum pgtype.Timestamptz `json:"last_autovacuum"`
SecondsSinceLastAutovacuum pgtype.Numeric `json:"seconds_since_last_autovacuum"`
}
func (q *Queries) CheckLastAutovacuumForPartitionedTables(ctx context.Context, db DBTX) ([]*CheckLastAutovacuumForPartitionedTablesRow, error) {
rows, err := db.Query(ctx, checkLastAutovacuumForPartitionedTables)
if err != nil {
return nil, err
}
defer rows.Close()
var items []*CheckLastAutovacuumForPartitionedTablesRow
for rows.Next() {
var i CheckLastAutovacuumForPartitionedTablesRow
if err := rows.Scan(
&i.Schemaname,
&i.Tablename,
&i.LastAutovacuum,
&i.SecondsSinceLastAutovacuum,
); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const cleanUpOLAPCutoverJobOffsets = `-- name: CleanUpOLAPCutoverJobOffsets :exec
DELETE FROM v1_payload_cutover_job_offset
WHERE NOT key = ANY($1::DATE[])
@@ -255,6 +301,30 @@ func (q *Queries) CountEvents(ctx context.Context, db DBTX, arg CountEventsParam
return count, err
}
const countOLAPTempTableSizeForDAGStatusUpdates = `-- name: CountOLAPTempTableSizeForDAGStatusUpdates :one
SELECT COUNT(*) AS total
FROM v1_task_status_updates_tmp
`
func (q *Queries) CountOLAPTempTableSizeForDAGStatusUpdates(ctx context.Context, db DBTX) (int64, error) {
row := db.QueryRow(ctx, countOLAPTempTableSizeForDAGStatusUpdates)
var total int64
err := row.Scan(&total)
return total, err
}
const countOLAPTempTableSizeForTaskStatusUpdates = `-- name: CountOLAPTempTableSizeForTaskStatusUpdates :one
SELECT COUNT(*) AS total
FROM v1_task_events_olap_tmp
`
func (q *Queries) CountOLAPTempTableSizeForTaskStatusUpdates(ctx context.Context, db DBTX) (int64, error) {
row := db.QueryRow(ctx, countOLAPTempTableSizeForTaskStatusUpdates)
var total int64
err := row.Scan(&total)
return total, err
}
type CreateDAGsOLAPParams struct {
TenantID pgtype.UUID `json:"tenant_id"`
ID int64 `json:"id"`
@@ -1948,6 +2018,38 @@ func (q *Queries) ListWorkflowRunExternalIds(ctx context.Context, db DBTX, arg L
return items, nil
}
const listYesterdayRunCountsByStatus = `-- name: ListYesterdayRunCountsByStatus :many
SELECT readable_status, COUNT(*)
FROM v1_runs_olap
WHERE inserted_at::DATE = (NOW() - INTERVAL '1 day')::DATE
GROUP BY readable_status
`
type ListYesterdayRunCountsByStatusRow struct {
ReadableStatus V1ReadableStatusOlap `json:"readable_status"`
Count int64 `json:"count"`
}
func (q *Queries) ListYesterdayRunCountsByStatus(ctx context.Context, db DBTX) ([]*ListYesterdayRunCountsByStatusRow, error) {
rows, err := db.Query(ctx, listYesterdayRunCountsByStatus)
if err != nil {
return nil, err
}
defer rows.Close()
var items []*ListYesterdayRunCountsByStatusRow
for rows.Next() {
var i ListYesterdayRunCountsByStatusRow
if err := rows.Scan(&i.ReadableStatus, &i.Count); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const markOLAPCutoverJobAsCompleted = `-- name: MarkOLAPCutoverJobAsCompleted :exec
UPDATE v1_payloads_olap_cutover_job_offset
SET is_completed = TRUE

View File

@@ -0,0 +1,95 @@
-- name: CheckPGStatStatementsEnabled :one
SELECT COUNT(*) FROM pg_available_extensions WHERE name = 'pg_stat_statements';
-- name: CheckBloat :many
-- Note: Requires track_counts = on (enabled by default in PostgreSQL)
SELECT
schemaname,
relname AS tablename,
pg_size_pretty(pg_total_relation_size(quote_ident(schemaname)||'.'||quote_ident(relname))) AS total_size,
pg_size_pretty(pg_relation_size(quote_ident(schemaname)||'.'||quote_ident(relname))) AS table_size,
n_live_tup AS live_tuples,
n_dead_tup AS dead_tuples,
ROUND(100 * n_dead_tup::numeric / NULLIF(n_live_tup, 0), 2) AS dead_pct,
last_vacuum,
last_autovacuum,
last_analyze
FROM
pg_stat_user_tables
WHERE
n_dead_tup > 1000
AND n_live_tup > 1000
AND ROUND(100 * n_dead_tup::numeric / NULLIF(n_live_tup, 0), 2) > 50
AND relname NOT IN (
'Lease'
)
ORDER BY
dead_pct DESC NULLS LAST;
-- name: CheckLongRunningQueries :many
SELECT
pid,
usename,
application_name,
client_addr,
state,
now() - query_start AS duration,
query
FROM
pg_stat_activity
WHERE
state = 'active'
AND now() - query_start > interval '5 minutes'
AND query NOT LIKE 'autovacuum:%'
AND query NOT LIKE '%pg_stat_activity%'
ORDER BY
query_start;
-- name: CheckQueryCaches :many
-- Note: Requires track_counts = on (enabled by default in PostgreSQL)
SELECT
schemaname,
relname AS tablename,
heap_blks_read,
heap_blks_hit,
heap_blks_hit + heap_blks_read AS total_reads,
ROUND(
100.0 * heap_blks_hit / NULLIF(heap_blks_hit + heap_blks_read, 0),
2
)::float AS cache_hit_ratio_pct,
pg_size_pretty(pg_total_relation_size(quote_ident(schemaname)||'.'||quote_ident(relname))) AS total_size
FROM
pg_statio_user_tables
WHERE
heap_blks_read + heap_blks_hit > 0
ORDER BY
heap_blks_read DESC
LIMIT 20;
-- name: LongRunningVacuum :many
SELECT
p.pid,
p.relid::regclass AS table_name,
pg_size_pretty(pg_total_relation_size(p.relid)) AS table_size,
p.phase,
p.heap_blks_total AS total_blocks,
p.heap_blks_scanned AS scanned_blocks,
ROUND(100.0 * p.heap_blks_scanned / NULLIF(p.heap_blks_total, 0), 2) AS pct_scanned,
p.heap_blks_vacuumed AS vacuumed_blocks,
p.index_vacuum_count,
now() - a.query_start AS elapsed_time,
CASE
WHEN p.heap_blks_scanned > 0 THEN
((now() - a.query_start) * (p.heap_blks_total - p.heap_blks_scanned) / p.heap_blks_scanned)
ELSE interval '0'
END AS estimated_time_remaining,
a.wait_event_type,
a.wait_event,
a.query
FROM
pg_stat_progress_vacuum p
JOIN pg_stat_activity a ON p.pid = a.pid
WHERE
now() - a.query_start > interval '3 hours'
ORDER BY
a.query_start;

View File

@@ -0,0 +1,293 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.29.0
// source: pg_health.sql
package sqlcv1
import (
"context"
"net/netip"
"github.com/jackc/pgx/v5/pgtype"
)
const checkBloat = `-- name: CheckBloat :many
SELECT
schemaname,
relname AS tablename,
pg_size_pretty(pg_total_relation_size(quote_ident(schemaname)||'.'||quote_ident(relname))) AS total_size,
pg_size_pretty(pg_relation_size(quote_ident(schemaname)||'.'||quote_ident(relname))) AS table_size,
n_live_tup AS live_tuples,
n_dead_tup AS dead_tuples,
ROUND(100 * n_dead_tup::numeric / NULLIF(n_live_tup, 0), 2) AS dead_pct,
last_vacuum,
last_autovacuum,
last_analyze
FROM
pg_stat_user_tables
WHERE
n_dead_tup > 1000
AND n_live_tup > 1000
AND ROUND(100 * n_dead_tup::numeric / NULLIF(n_live_tup, 0), 2) > 50
AND relname NOT IN (
'Lease'
)
ORDER BY
dead_pct DESC NULLS LAST
`
type CheckBloatRow struct {
Schemaname pgtype.Text `json:"schemaname"`
Tablename pgtype.Text `json:"tablename"`
TotalSize string `json:"total_size"`
TableSize string `json:"table_size"`
LiveTuples pgtype.Int8 `json:"live_tuples"`
DeadTuples pgtype.Int8 `json:"dead_tuples"`
DeadPct pgtype.Numeric `json:"dead_pct"`
LastVacuum pgtype.Timestamptz `json:"last_vacuum"`
LastAutovacuum pgtype.Timestamptz `json:"last_autovacuum"`
LastAnalyze pgtype.Timestamptz `json:"last_analyze"`
}
// Note: Requires track_counts = on (enabled by default in PostgreSQL)
func (q *Queries) CheckBloat(ctx context.Context, db DBTX) ([]*CheckBloatRow, error) {
rows, err := db.Query(ctx, checkBloat)
if err != nil {
return nil, err
}
defer rows.Close()
var items []*CheckBloatRow
for rows.Next() {
var i CheckBloatRow
if err := rows.Scan(
&i.Schemaname,
&i.Tablename,
&i.TotalSize,
&i.TableSize,
&i.LiveTuples,
&i.DeadTuples,
&i.DeadPct,
&i.LastVacuum,
&i.LastAutovacuum,
&i.LastAnalyze,
); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const checkLongRunningQueries = `-- name: CheckLongRunningQueries :many
SELECT
pid,
usename,
application_name,
client_addr,
state,
now() - query_start AS duration,
query
FROM
pg_stat_activity
WHERE
state = 'active'
AND now() - query_start > interval '5 minutes'
AND query NOT LIKE 'autovacuum:%'
AND query NOT LIKE '%pg_stat_activity%'
ORDER BY
query_start
`
type CheckLongRunningQueriesRow struct {
Pid pgtype.Int4 `json:"pid"`
Usename pgtype.Text `json:"usename"`
ApplicationName pgtype.Text `json:"application_name"`
ClientAddr *netip.Addr `json:"client_addr"`
State pgtype.Text `json:"state"`
Duration int32 `json:"duration"`
Query pgtype.Text `json:"query"`
}
func (q *Queries) CheckLongRunningQueries(ctx context.Context, db DBTX) ([]*CheckLongRunningQueriesRow, error) {
rows, err := db.Query(ctx, checkLongRunningQueries)
if err != nil {
return nil, err
}
defer rows.Close()
var items []*CheckLongRunningQueriesRow
for rows.Next() {
var i CheckLongRunningQueriesRow
if err := rows.Scan(
&i.Pid,
&i.Usename,
&i.ApplicationName,
&i.ClientAddr,
&i.State,
&i.Duration,
&i.Query,
); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const checkPGStatStatementsEnabled = `-- name: CheckPGStatStatementsEnabled :one
SELECT COUNT(*) FROM pg_available_extensions WHERE name = 'pg_stat_statements'
`
func (q *Queries) CheckPGStatStatementsEnabled(ctx context.Context, db DBTX) (int64, error) {
row := db.QueryRow(ctx, checkPGStatStatementsEnabled)
var count int64
err := row.Scan(&count)
return count, err
}
const checkQueryCaches = `-- name: CheckQueryCaches :many
SELECT
schemaname,
relname AS tablename,
heap_blks_read,
heap_blks_hit,
heap_blks_hit + heap_blks_read AS total_reads,
ROUND(
100.0 * heap_blks_hit / NULLIF(heap_blks_hit + heap_blks_read, 0),
2
)::float AS cache_hit_ratio_pct,
pg_size_pretty(pg_total_relation_size(quote_ident(schemaname)||'.'||quote_ident(relname))) AS total_size
FROM
pg_statio_user_tables
WHERE
heap_blks_read + heap_blks_hit > 0
ORDER BY
heap_blks_read DESC
LIMIT 20
`
type CheckQueryCachesRow struct {
Schemaname pgtype.Text `json:"schemaname"`
Tablename pgtype.Text `json:"tablename"`
HeapBlksRead pgtype.Int8 `json:"heap_blks_read"`
HeapBlksHit pgtype.Int8 `json:"heap_blks_hit"`
TotalReads int32 `json:"total_reads"`
CacheHitRatioPct float64 `json:"cache_hit_ratio_pct"`
TotalSize string `json:"total_size"`
}
// Note: Requires track_counts = on (enabled by default in PostgreSQL)
func (q *Queries) CheckQueryCaches(ctx context.Context, db DBTX) ([]*CheckQueryCachesRow, error) {
rows, err := db.Query(ctx, checkQueryCaches)
if err != nil {
return nil, err
}
defer rows.Close()
var items []*CheckQueryCachesRow
for rows.Next() {
var i CheckQueryCachesRow
if err := rows.Scan(
&i.Schemaname,
&i.Tablename,
&i.HeapBlksRead,
&i.HeapBlksHit,
&i.TotalReads,
&i.CacheHitRatioPct,
&i.TotalSize,
); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const longRunningVacuum = `-- name: LongRunningVacuum :many
SELECT
p.pid,
p.relid::regclass AS table_name,
pg_size_pretty(pg_total_relation_size(p.relid)) AS table_size,
p.phase,
p.heap_blks_total AS total_blocks,
p.heap_blks_scanned AS scanned_blocks,
ROUND(100.0 * p.heap_blks_scanned / NULLIF(p.heap_blks_total, 0), 2) AS pct_scanned,
p.heap_blks_vacuumed AS vacuumed_blocks,
p.index_vacuum_count,
now() - a.query_start AS elapsed_time,
CASE
WHEN p.heap_blks_scanned > 0 THEN
((now() - a.query_start) * (p.heap_blks_total - p.heap_blks_scanned) / p.heap_blks_scanned)
ELSE interval '0'
END AS estimated_time_remaining,
a.wait_event_type,
a.wait_event,
a.query
FROM
pg_stat_progress_vacuum p
JOIN pg_stat_activity a ON p.pid = a.pid
WHERE
now() - a.query_start > interval '3 hours'
ORDER BY
a.query_start
`
type LongRunningVacuumRow struct {
Pid pgtype.Int4 `json:"pid"`
TableName interface{} `json:"table_name"`
TableSize string `json:"table_size"`
Phase pgtype.Text `json:"phase"`
TotalBlocks pgtype.Int8 `json:"total_blocks"`
ScannedBlocks pgtype.Int8 `json:"scanned_blocks"`
PctScanned pgtype.Numeric `json:"pct_scanned"`
VacuumedBlocks pgtype.Int8 `json:"vacuumed_blocks"`
IndexVacuumCount pgtype.Int8 `json:"index_vacuum_count"`
ElapsedTime int32 `json:"elapsed_time"`
EstimatedTimeRemaining pgtype.Interval `json:"estimated_time_remaining"`
WaitEventType pgtype.Text `json:"wait_event_type"`
WaitEvent pgtype.Text `json:"wait_event"`
Query pgtype.Text `json:"query"`
}
func (q *Queries) LongRunningVacuum(ctx context.Context, db DBTX) ([]*LongRunningVacuumRow, error) {
rows, err := db.Query(ctx, longRunningVacuum)
if err != nil {
return nil, err
}
defer rows.Close()
var items []*LongRunningVacuumRow
for rows.Next() {
var i LongRunningVacuumRow
if err := rows.Scan(
&i.Pid,
&i.TableName,
&i.TableSize,
&i.Phase,
&i.TotalBlocks,
&i.ScannedBlocks,
&i.PctScanned,
&i.VacuumedBlocks,
&i.IndexVacuumCount,
&i.ElapsedTime,
&i.EstimatedTimeRemaining,
&i.WaitEventType,
&i.WaitEvent,
&i.Query,
); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}

View File

@@ -23,10 +23,12 @@ sql:
- webhooks.sql
- idempotency-keys.sql
- interval_settings.sql
- pg_health.sql
schema:
- ../../../../sql/schema/v0.sql
- ../../../../sql/schema/v1-core.sql
- ../../../../sql/schema/v1-olap.sql
- ../../../../sql/schema/pg-stubs.sql
- ./concurrency-additional-tables.sql
strict_order_by: false
gen:

View File

@@ -1124,3 +1124,29 @@ SELECT
count,
oldest::TIMESTAMPTZ
FROM running_tasks;
-- name: FindOldestRunningTask :one
SELECT *
FROM v1_task_runtime
ORDER BY task_id, task_inserted_at
LIMIT 1;
-- name: FindOldestTask :one
SELECT *
FROM v1_task
ORDER BY id, inserted_at
LIMIT 1;
-- name: CheckLastAutovacuumForPartitionedTablesCoreDB :many
SELECT
s.schemaname,
s.relname AS tablename,
s.last_autovacuum,
EXTRACT(EPOCH FROM (NOW() - s.last_autovacuum)) AS seconds_since_last_autovacuum
FROM pg_stat_user_tables s
JOIN pg_catalog.pg_class c ON c.oid = (quote_ident(s.schemaname)||'.'||quote_ident(s.relname))::regclass
WHERE s.schemaname = 'public'
AND c.relispartition = true
AND c.relkind = 'r'
ORDER BY s.last_autovacuum ASC NULLS LAST
;

View File

@@ -39,6 +39,52 @@ func (q *Queries) AnalyzeV1TaskEvent(ctx context.Context, db DBTX) error {
return err
}
const checkLastAutovacuumForPartitionedTablesCoreDB = `-- name: CheckLastAutovacuumForPartitionedTablesCoreDB :many
SELECT
s.schemaname,
s.relname AS tablename,
s.last_autovacuum,
EXTRACT(EPOCH FROM (NOW() - s.last_autovacuum)) AS seconds_since_last_autovacuum
FROM pg_stat_user_tables s
JOIN pg_catalog.pg_class c ON c.oid = (quote_ident(s.schemaname)||'.'||quote_ident(s.relname))::regclass
WHERE s.schemaname = 'public'
AND c.relispartition = true
AND c.relkind = 'r'
ORDER BY s.last_autovacuum ASC NULLS LAST
`
type CheckLastAutovacuumForPartitionedTablesCoreDBRow struct {
Schemaname pgtype.Text `json:"schemaname"`
Tablename pgtype.Text `json:"tablename"`
LastAutovacuum pgtype.Timestamptz `json:"last_autovacuum"`
SecondsSinceLastAutovacuum pgtype.Numeric `json:"seconds_since_last_autovacuum"`
}
func (q *Queries) CheckLastAutovacuumForPartitionedTablesCoreDB(ctx context.Context, db DBTX) ([]*CheckLastAutovacuumForPartitionedTablesCoreDBRow, error) {
rows, err := db.Query(ctx, checkLastAutovacuumForPartitionedTablesCoreDB)
if err != nil {
return nil, err
}
defer rows.Close()
var items []*CheckLastAutovacuumForPartitionedTablesCoreDBRow
for rows.Next() {
var i CheckLastAutovacuumForPartitionedTablesCoreDBRow
if err := rows.Scan(
&i.Schemaname,
&i.Tablename,
&i.LastAutovacuum,
&i.SecondsSinceLastAutovacuum,
); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const cleanupV1ConcurrencySlot = `-- name: CleanupV1ConcurrencySlot :execresult
WITH locked_cs AS (
SELECT cs.task_id, cs.task_inserted_at, cs.task_retry_count
@@ -439,6 +485,79 @@ func (q *Queries) FailTaskInternalFailure(ctx context.Context, db DBTX, arg Fail
return items, nil
}
const findOldestRunningTask = `-- name: FindOldestRunningTask :one
SELECT task_id, task_inserted_at, retry_count, worker_id, tenant_id, timeout_at
FROM v1_task_runtime
ORDER BY task_id, task_inserted_at
LIMIT 1
`
func (q *Queries) FindOldestRunningTask(ctx context.Context, db DBTX) (*V1TaskRuntime, error) {
row := db.QueryRow(ctx, findOldestRunningTask)
var i V1TaskRuntime
err := row.Scan(
&i.TaskID,
&i.TaskInsertedAt,
&i.RetryCount,
&i.WorkerID,
&i.TenantID,
&i.TimeoutAt,
)
return &i, err
}
const findOldestTask = `-- name: FindOldestTask :one
SELECT id, inserted_at, tenant_id, queue, action_id, step_id, step_readable_id, workflow_id, workflow_version_id, workflow_run_id, schedule_timeout, step_timeout, priority, sticky, desired_worker_id, external_id, display_name, input, retry_count, internal_retry_count, app_retry_count, step_index, additional_metadata, dag_id, dag_inserted_at, parent_task_external_id, parent_task_id, parent_task_inserted_at, child_index, child_key, initial_state, initial_state_reason, concurrency_parent_strategy_ids, concurrency_strategy_ids, concurrency_keys, retry_backoff_factor, retry_max_backoff
FROM v1_task
ORDER BY id, inserted_at
LIMIT 1
`
func (q *Queries) FindOldestTask(ctx context.Context, db DBTX) (*V1Task, error) {
row := db.QueryRow(ctx, findOldestTask)
var i V1Task
err := row.Scan(
&i.ID,
&i.InsertedAt,
&i.TenantID,
&i.Queue,
&i.ActionID,
&i.StepID,
&i.StepReadableID,
&i.WorkflowID,
&i.WorkflowVersionID,
&i.WorkflowRunID,
&i.ScheduleTimeout,
&i.StepTimeout,
&i.Priority,
&i.Sticky,
&i.DesiredWorkerID,
&i.ExternalID,
&i.DisplayName,
&i.Input,
&i.RetryCount,
&i.InternalRetryCount,
&i.AppRetryCount,
&i.StepIndex,
&i.AdditionalMetadata,
&i.DagID,
&i.DagInsertedAt,
&i.ParentTaskExternalID,
&i.ParentTaskID,
&i.ParentTaskInsertedAt,
&i.ChildIndex,
&i.ChildKey,
&i.InitialState,
&i.InitialStateReason,
&i.ConcurrencyParentStrategyIds,
&i.ConcurrencyStrategyIds,
&i.ConcurrencyKeys,
&i.RetryBackoffFactor,
&i.RetryMaxBackoff,
)
return &i, err
}
const flattenExternalIds = `-- name: FlattenExternalIds :many
WITH lookup_rows AS (
SELECT

View File

@@ -83,3 +83,42 @@ WHERE
AND runtime.worker_id = @workerId::uuid
LIMIT
COALESCE(sqlc.narg('limit')::int, 100);
-- name: ListTotalActiveSlotsPerTenant :many
SELECT "tenantId", SUM("maxRuns") AS "totalActiveSlots"
FROM "Worker"
WHERE
"dispatcherId" IS NOT NULL
AND "lastHeartbeatAt" > NOW() - INTERVAL '5 seconds'
AND "isActive" = true
AND "isPaused" = false
GROUP BY "tenantId"
;
-- name: ListActiveSDKsPerTenant :many
SELECT
"tenantId",
COALESCE("language"::TEXT, 'unknown')::TEXT AS "language",
COALESCE("languageVersion", 'unknown') AS "languageVersion",
COALESCE("sdkVersion", 'unknown') AS "sdkVersion",
COALESCE("os", 'unknown') AS "os",
COUNT(*) AS "count"
FROM "Worker"
WHERE
"dispatcherId" IS NOT NULL
AND "lastHeartbeatAt" > NOW() - INTERVAL '5 seconds'
AND "isActive" = true
AND "isPaused" = false
GROUP BY "tenantId", "language", "languageVersion", "sdkVersion", "os"
;
-- name: ListActiveWorkersPerTenant :many
SELECT "tenantId", COUNT(*)
FROM "Worker"
WHERE
"dispatcherId" IS NOT NULL
AND "lastHeartbeatAt" > NOW() - INTERVAL '5 seconds'
AND "isActive" = true
AND "isPaused" = false
GROUP BY "tenantId"
;

View File

@@ -65,6 +65,95 @@ func (q *Queries) GetWorkerById(ctx context.Context, db DBTX, id pgtype.UUID) (*
return &i, err
}
const listActiveSDKsPerTenant = `-- name: ListActiveSDKsPerTenant :many
SELECT
"tenantId",
COALESCE("language"::TEXT, 'unknown')::TEXT AS "language",
COALESCE("languageVersion", 'unknown') AS "languageVersion",
COALESCE("sdkVersion", 'unknown') AS "sdkVersion",
COALESCE("os", 'unknown') AS "os",
COUNT(*) AS "count"
FROM "Worker"
WHERE
"dispatcherId" IS NOT NULL
AND "lastHeartbeatAt" > NOW() - INTERVAL '5 seconds'
AND "isActive" = true
AND "isPaused" = false
GROUP BY "tenantId", "language", "languageVersion", "sdkVersion", "os"
`
type ListActiveSDKsPerTenantRow struct {
TenantId pgtype.UUID `json:"tenantId"`
Language string `json:"language"`
LanguageVersion string `json:"languageVersion"`
SdkVersion string `json:"sdkVersion"`
Os string `json:"os"`
Count int64 `json:"count"`
}
func (q *Queries) ListActiveSDKsPerTenant(ctx context.Context, db DBTX) ([]*ListActiveSDKsPerTenantRow, error) {
rows, err := db.Query(ctx, listActiveSDKsPerTenant)
if err != nil {
return nil, err
}
defer rows.Close()
var items []*ListActiveSDKsPerTenantRow
for rows.Next() {
var i ListActiveSDKsPerTenantRow
if err := rows.Scan(
&i.TenantId,
&i.Language,
&i.LanguageVersion,
&i.SdkVersion,
&i.Os,
&i.Count,
); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listActiveWorkersPerTenant = `-- name: ListActiveWorkersPerTenant :many
SELECT "tenantId", COUNT(*)
FROM "Worker"
WHERE
"dispatcherId" IS NOT NULL
AND "lastHeartbeatAt" > NOW() - INTERVAL '5 seconds'
AND "isActive" = true
AND "isPaused" = false
GROUP BY "tenantId"
`
type ListActiveWorkersPerTenantRow struct {
TenantId pgtype.UUID `json:"tenantId"`
Count int64 `json:"count"`
}
func (q *Queries) ListActiveWorkersPerTenant(ctx context.Context, db DBTX) ([]*ListActiveWorkersPerTenantRow, error) {
rows, err := db.Query(ctx, listActiveWorkersPerTenant)
if err != nil {
return nil, err
}
defer rows.Close()
var items []*ListActiveWorkersPerTenantRow
for rows.Next() {
var i ListActiveWorkersPerTenantRow
if err := rows.Scan(&i.TenantId, &i.Count); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listManyWorkerLabels = `-- name: ListManyWorkerLabels :many
SELECT
"id",
@@ -246,6 +335,42 @@ func (q *Queries) ListSemaphoreSlotsWithStateForWorker(ctx context.Context, db D
return items, nil
}
const listTotalActiveSlotsPerTenant = `-- name: ListTotalActiveSlotsPerTenant :many
SELECT "tenantId", SUM("maxRuns") AS "totalActiveSlots"
FROM "Worker"
WHERE
"dispatcherId" IS NOT NULL
AND "lastHeartbeatAt" > NOW() - INTERVAL '5 seconds'
AND "isActive" = true
AND "isPaused" = false
GROUP BY "tenantId"
`
type ListTotalActiveSlotsPerTenantRow struct {
TenantId pgtype.UUID `json:"tenantId"`
TotalActiveSlots int64 `json:"totalActiveSlots"`
}
func (q *Queries) ListTotalActiveSlotsPerTenant(ctx context.Context, db DBTX) ([]*ListTotalActiveSlotsPerTenantRow, error) {
rows, err := db.Query(ctx, listTotalActiveSlotsPerTenant)
if err != nil {
return nil, err
}
defer rows.Close()
var items []*ListTotalActiveSlotsPerTenantRow
for rows.Next() {
var i ListTotalActiveSlotsPerTenantRow
if err := rows.Scan(&i.TenantId, &i.TotalActiveSlots); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listWorkersWithSlotCount = `-- name: ListWorkersWithSlotCount :many
SELECT
workers.id, workers."createdAt", workers."updatedAt", workers."deletedAt", workers."tenantId", workers."lastHeartbeatAt", workers.name, workers."dispatcherId", workers."maxRuns", workers."isActive", workers."lastListenerEstablished", workers."isPaused", workers.type, workers."webhookId", workers.language, workers."languageVersion", workers.os, workers."runtimeExtra", workers."sdkVersion",

View File

@@ -262,6 +262,9 @@ type TaskRepository interface {
Cleanup(ctx context.Context) (bool, error)
GetTaskStats(ctx context.Context, tenantId string) (map[string]TaskStat, error)
FindOldestRunningTaskInsertedAt(ctx context.Context) (*time.Time, error)
FindOldestTaskInsertedAt(ctx context.Context) (*time.Time, error)
}
type TaskRepositoryImpl struct {
@@ -3861,3 +3864,31 @@ func (r *TaskRepositoryImpl) GetTaskStats(ctx context.Context, tenantId string)
return result, nil
}
func (r *TaskRepositoryImpl) FindOldestRunningTaskInsertedAt(ctx context.Context) (*time.Time, error) {
t, err := r.queries.FindOldestRunningTask(ctx, r.pool)
if err != nil {
return nil, err
}
if t == nil {
return nil, nil
}
return &t.TaskInsertedAt.Time, nil
}
func (r *TaskRepositoryImpl) FindOldestTaskInsertedAt(ctx context.Context) (*time.Time, error) {
t, err := r.queries.FindOldestTask(ctx, r.pool)
if err != nil {
return nil, err
}
if t == nil {
return nil, nil
}
return &t.InsertedAt.Time, nil
}

View File

@@ -18,6 +18,9 @@ type WorkerRepository interface {
ListWorkers(tenantId string, opts *repository.ListWorkersOpts) ([]*sqlcv1.ListWorkersWithSlotCountRow, error)
GetWorkerById(workerId string) (*sqlcv1.GetWorkerByIdRow, error)
ListWorkerState(tenantId, workerId string, maxRuns int) ([]*sqlcv1.ListSemaphoreSlotsWithStateForWorkerRow, []*dbsqlc.GetStepRunForEngineRow, error)
CountActiveSlotsPerTenant() (map[string]int64, error)
CountActiveWorkersPerTenant() (map[string]int64, error)
ListActiveSDKsPerTenant() (map[TenantIdSDKTuple]int64, error)
}
type workerRepository struct {
@@ -89,3 +92,74 @@ func (w *workerRepository) ListWorkerState(tenantId, workerId string, maxRuns in
return slots, []*dbsqlc.GetStepRunForEngineRow{}, nil
}
func (w *workerRepository) CountActiveSlotsPerTenant() (map[string]int64, error) {
slots, err := w.queries.ListTotalActiveSlotsPerTenant(context.Background(), w.pool)
if err != nil {
return nil, fmt.Errorf("could not list active slots per tenant: %w", err)
}
tenantToSlots := make(map[string]int64)
for _, slot := range slots {
tenantToSlots[slot.TenantId.String()] = slot.TotalActiveSlots
}
return tenantToSlots, nil
}
type SDK struct {
OperatingSystem string
Language string
LanguageVersion string
SdkVersion string
}
type TenantIdSDKTuple struct {
TenantId string
SDK SDK
}
func (w *workerRepository) ListActiveSDKsPerTenant() (map[TenantIdSDKTuple]int64, error) {
sdks, err := w.queries.ListActiveSDKsPerTenant(context.Background(), w.pool)
if err != nil {
return nil, fmt.Errorf("could not list active sdks per tenant: %w", err)
}
tenantIdSDKTupleToCount := make(map[TenantIdSDKTuple]int64)
for _, sdk := range sdks {
tenantId := sdk.TenantId.String()
tenantIdSdkTuple := TenantIdSDKTuple{
TenantId: tenantId,
SDK: SDK{
OperatingSystem: sdk.Os,
Language: sdk.Language,
LanguageVersion: sdk.LanguageVersion,
SdkVersion: sdk.SdkVersion,
},
}
tenantIdSDKTupleToCount[tenantIdSdkTuple] = sdk.Count
}
return tenantIdSDKTupleToCount, nil
}
func (w *workerRepository) CountActiveWorkersPerTenant() (map[string]int64, error) {
workers, err := w.queries.ListActiveWorkersPerTenant(context.Background(), w.pool)
if err != nil {
return nil, fmt.Errorf("could not list active workers per tenant: %w", err)
}
tenantToWorkers := make(map[string]int64)
for _, worker := range workers {
tenantToWorkers[worker.TenantId.String()] = worker.Count
}
return tenantToWorkers, nil
}

236
pkg/telemetry/metrics.go Normal file
View File

@@ -0,0 +1,236 @@
package telemetry
import (
"context"
"fmt"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)
// MetricsRecorder provides a centralized way to record OTel metrics
type MetricsRecorder struct {
meter metric.Meter
// Database health metrics
dbBloatGauge metric.Int64Gauge
dbBloatPercentGauge metric.Float64Gauge
dbLongRunningQueriesGauge metric.Int64Gauge
dbQueryCacheHitRatioGauge metric.Float64Gauge
dbLongRunningVacuumGauge metric.Int64Gauge
dbLastAutovacuumSecondsSinceGauge metric.Float64Gauge
// OLAP metrics
olapTempTableSizeDAGGauge metric.Int64Gauge
olapTempTableSizeTaskGauge metric.Int64Gauge
yesterdayRunCountGauge metric.Int64Gauge
// Worker metrics
activeSlotsGauge metric.Int64Gauge
activeWorkersGauge metric.Int64Gauge
activeSDKsGauge metric.Int64Gauge
}
// NewMetricsRecorder creates a new metrics recorder with all instruments registered
func NewMetricsRecorder(ctx context.Context) (*MetricsRecorder, error) {
meter := otel.Meter("hatchet.run/metrics")
// Database health metrics
dbBloatGauge, err := meter.Int64Gauge(
"hatchet.db.bloat.count",
metric.WithDescription("Number of bloated tables detected in the database"),
)
if err != nil {
return nil, fmt.Errorf("failed to create db bloat gauge: %w", err)
}
dbBloatPercentGauge, err := meter.Float64Gauge(
"hatchet.db.bloat.dead_tuple_percent",
metric.WithDescription("Percentage of dead tuples per table"),
)
if err != nil {
return nil, fmt.Errorf("failed to create db bloat percent gauge: %w", err)
}
dbLongRunningQueriesGauge, err := meter.Int64Gauge(
"hatchet.db.long_running_queries.count",
metric.WithDescription("Number of long-running queries detected in the database"),
)
if err != nil {
return nil, fmt.Errorf("failed to create long running queries gauge: %w", err)
}
dbQueryCacheHitRatioGauge, err := meter.Float64Gauge(
"hatchet.db.query_cache.hit_ratio",
metric.WithDescription("Query cache hit ratio percentage for tables"),
)
if err != nil {
return nil, fmt.Errorf("failed to create query cache hit ratio gauge: %w", err)
}
dbLongRunningVacuumGauge, err := meter.Int64Gauge(
"hatchet.db.long_running_vacuum.count",
metric.WithDescription("Number of long-running vacuum operations detected in the database"),
)
if err != nil {
return nil, fmt.Errorf("failed to create long running vacuum gauge: %w", err)
}
dbLastAutovacuumSecondsSinceGauge, err := meter.Float64Gauge(
"hatchet.db.last_autovacuum.seconds_since",
metric.WithDescription("Seconds since last autovacuum for partitioned tables"),
)
if err != nil {
return nil, fmt.Errorf("failed to create last autovacuum gauge: %w", err)
}
// OLAP metrics (instance-wide)
olapTempTableSizeDAGGauge, err := meter.Int64Gauge(
"hatchet.olap.temp_table_size.dag_status_updates",
metric.WithDescription("Size of temporary table for DAG status updates (instance-wide)"),
)
if err != nil {
return nil, fmt.Errorf("failed to create OLAP DAG temp table size gauge: %w", err)
}
olapTempTableSizeTaskGauge, err := meter.Int64Gauge(
"hatchet.olap.temp_table_size.task_status_updates",
metric.WithDescription("Size of temporary table for task status updates (instance-wide)"),
)
if err != nil {
return nil, fmt.Errorf("failed to create OLAP task temp table size gauge: %w", err)
}
yesterdayRunCountGauge, err := meter.Int64Gauge(
"hatchet.olap.yesterday_run_count",
metric.WithDescription("Number of workflow runs from yesterday by status (instance-wide)"),
)
if err != nil {
return nil, fmt.Errorf("failed to create yesterday run count gauge: %w", err)
}
// Worker metrics
activeSlotsGauge, err := meter.Int64Gauge(
"hatchet.workers.active_slots",
metric.WithDescription("Number of active worker slots per tenant"),
)
if err != nil {
return nil, fmt.Errorf("failed to create active slots gauge: %w", err)
}
activeWorkersGauge, err := meter.Int64Gauge(
"hatchet.workers.active_count",
metric.WithDescription("Number of active workers per tenant"),
)
if err != nil {
return nil, fmt.Errorf("failed to create active workers gauge: %w", err)
}
activeSDKsGauge, err := meter.Int64Gauge(
"hatchet.workers.active_sdks",
metric.WithDescription("Number of active SDKs per tenant and SDK version"),
)
if err != nil {
return nil, fmt.Errorf("failed to create active SDKs gauge: %w", err)
}
return &MetricsRecorder{
meter: meter,
dbBloatGauge: dbBloatGauge,
dbBloatPercentGauge: dbBloatPercentGauge,
dbLongRunningQueriesGauge: dbLongRunningQueriesGauge,
dbQueryCacheHitRatioGauge: dbQueryCacheHitRatioGauge,
dbLongRunningVacuumGauge: dbLongRunningVacuumGauge,
dbLastAutovacuumSecondsSinceGauge: dbLastAutovacuumSecondsSinceGauge,
olapTempTableSizeDAGGauge: olapTempTableSizeDAGGauge,
olapTempTableSizeTaskGauge: olapTempTableSizeTaskGauge,
yesterdayRunCountGauge: yesterdayRunCountGauge,
activeSlotsGauge: activeSlotsGauge,
activeWorkersGauge: activeWorkersGauge,
activeSDKsGauge: activeSDKsGauge,
}, nil
}
// RecordDBBloat records the number of bloated tables detected
func (m *MetricsRecorder) RecordDBBloat(ctx context.Context, count int64, healthStatus string) {
m.dbBloatGauge.Record(ctx, count,
metric.WithAttributes(attribute.String("health_status", healthStatus)))
}
// RecordDBBloatPercent records the dead tuple percentage for a specific table
func (m *MetricsRecorder) RecordDBBloatPercent(ctx context.Context, tableName string, deadPercent float64) {
m.dbBloatPercentGauge.Record(ctx, deadPercent,
metric.WithAttributes(attribute.String("table_name", tableName)))
}
// RecordDBLongRunningQueries records the number of long-running queries
func (m *MetricsRecorder) RecordDBLongRunningQueries(ctx context.Context, count int64) {
m.dbLongRunningQueriesGauge.Record(ctx, count)
}
// RecordDBQueryCacheHitRatio records the query cache hit ratio for a table
func (m *MetricsRecorder) RecordDBQueryCacheHitRatio(ctx context.Context, tableName string, hitRatio float64) {
m.dbQueryCacheHitRatioGauge.Record(ctx, hitRatio,
metric.WithAttributes(attribute.String("table_name", tableName)))
}
// RecordDBLongRunningVacuum records the number of long-running vacuum operations
func (m *MetricsRecorder) RecordDBLongRunningVacuum(ctx context.Context, count int64, healthStatus string) {
m.dbLongRunningVacuumGauge.Record(ctx, count,
metric.WithAttributes(attribute.String("health_status", healthStatus)))
}
// RecordDBLastAutovacuumSecondsSince records seconds since last autovacuum for a partitioned table
func (m *MetricsRecorder) RecordDBLastAutovacuumSecondsSince(ctx context.Context, tableName string, seconds float64) {
m.dbLastAutovacuumSecondsSinceGauge.Record(ctx, seconds,
metric.WithAttributes(attribute.String("table_name", tableName)))
}
// RecordOLAPTempTableSizeDAG records the size of the OLAP DAG status updates temp table (instance-wide)
func (m *MetricsRecorder) RecordOLAPTempTableSizeDAG(ctx context.Context, size int64) {
m.olapTempTableSizeDAGGauge.Record(ctx, size)
}
// RecordOLAPTempTableSizeTask records the size of the OLAP task status updates temp table (instance-wide)
func (m *MetricsRecorder) RecordOLAPTempTableSizeTask(ctx context.Context, size int64) {
m.olapTempTableSizeTaskGauge.Record(ctx, size)
}
// RecordYesterdayRunCount records the number of workflow runs from yesterday (instance-wide)
func (m *MetricsRecorder) RecordYesterdayRunCount(ctx context.Context, status string, count int64) {
m.yesterdayRunCountGauge.Record(ctx, count,
metric.WithAttributes(attribute.String("status", status)))
}
// RecordActiveSlots records the number of active worker slots
func (m *MetricsRecorder) RecordActiveSlots(ctx context.Context, tenantId string, count int64) {
m.activeSlotsGauge.Record(ctx, count,
metric.WithAttributes(attribute.String("tenant_id", tenantId)))
}
// RecordActiveWorkers records the number of active workers
func (m *MetricsRecorder) RecordActiveWorkers(ctx context.Context, tenantId string, count int64) {
m.activeWorkersGauge.Record(ctx, count,
metric.WithAttributes(attribute.String("tenant_id", tenantId)))
}
// RecordActiveSDKs records the number of active SDKs
func (m *MetricsRecorder) RecordActiveSDKs(ctx context.Context, tenantId string, sdk SDKInfo, count int64) {
m.activeSDKsGauge.Record(ctx, count,
metric.WithAttributes(
attribute.String("tenant_id", tenantId),
attribute.String("sdk_language", sdk.Language),
attribute.String("sdk_version", sdk.SdkVersion),
attribute.String("sdk_os", sdk.OperatingSystem),
attribute.String("sdk_language_version", sdk.LanguageVersion),
))
}
// SDKInfo contains information about an SDK
type SDKInfo struct {
OperatingSystem string
Language string
LanguageVersion string
SdkVersion string
}

View File

@@ -11,9 +11,11 @@ import (
"github.com/google/uuid"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
@@ -106,6 +108,89 @@ func InitTracer(opts *TracerOpts) (func(context.Context) error, error) {
return exporter.Shutdown, nil
}
func InitMeter(opts *TracerOpts) (func(context.Context) error, error) {
if opts.CollectorURL == "" {
// no-op
return func(context.Context) error {
return nil
}, nil
}
var secureOption otlpmetricgrpc.Option
if !opts.Insecure {
secureOption = otlpmetricgrpc.WithTLSCredentials(credentials.NewClientTLSFromCert(nil, ""))
} else {
secureOption = otlpmetricgrpc.WithInsecure()
}
exporter, err := otlpmetricgrpc.New(
context.Background(),
secureOption,
otlpmetricgrpc.WithEndpoint(opts.CollectorURL),
otlpmetricgrpc.WithHeaders(map[string]string{
"Authorization": opts.CollectorAuth,
}),
)
if err != nil {
return nil, fmt.Errorf("failed to create exporter: %w", err)
}
resourceAttrs := []attribute.KeyValue{
attribute.String("service.name", opts.ServiceName),
attribute.String("library.language", "go"),
}
// Add Kubernetes pod information if available
if podName := os.Getenv("K8S_POD_NAME"); podName != "" {
resourceAttrs = append(resourceAttrs, attribute.String("k8s.pod.name", podName))
}
if podNamespace := os.Getenv("K8S_POD_NAMESPACE"); podNamespace != "" {
resourceAttrs = append(resourceAttrs, attribute.String("k8s.namespace.name", podNamespace))
}
resources, err := resource.New(
context.Background(),
resource.WithAttributes(resourceAttrs...),
)
if err != nil {
return nil, fmt.Errorf("failed to set resources: %w", err)
}
meterProvider := metric.NewMeterProvider(
metric.WithReader(
metric.NewPeriodicReader(
exporter,
metric.WithInterval(3*time.Second),
),
),
metric.WithResource(resources),
)
otel.SetMeterProvider(
meterProvider,
)
return func(ctx context.Context) error {
var shutdownErr error
if err := meterProvider.Shutdown(ctx); err != nil {
shutdownErr = fmt.Errorf("failed to shutdown meter provider: %w", err)
}
if err := exporter.Shutdown(ctx); err != nil {
if shutdownErr != nil {
shutdownErr = fmt.Errorf("%v; failed to shutdown exporter: %w", shutdownErr, err)
} else {
shutdownErr = fmt.Errorf("failed to shutdown exporter: %w", err)
}
}
return shutdownErr
}, nil
}
func NewSpan(ctx context.Context, name string) (context.Context, trace.Span) {
ctx, span := otel.Tracer("").Start(ctx, prefixSpanKey(name))
return ctx, span

109
sql/schema/pg-stubs.sql Normal file
View File

@@ -0,0 +1,109 @@
-- these are stub views to satisfy the sqlc generator for pg features
CREATE VIEW pg_stat_user_tables AS
SELECT
NULL::oid AS relid,
NULL::name AS schemaname,
NULL::name AS relname,
NULL::bigint AS seq_scan,
NULL::bigint AS seq_tup_read,
NULL::bigint AS idx_scan,
NULL::bigint AS idx_tup_fetch,
NULL::bigint AS n_tup_ins,
NULL::bigint AS n_tup_upd,
NULL::bigint AS n_tup_del,
NULL::bigint AS n_tup_hot_upd,
NULL::bigint AS n_live_tup,
NULL::bigint AS n_dead_tup,
NULL::bigint AS n_mod_since_analyze,
NULL::bigint AS n_ins_since_vacuum,
NULL::bigint AS vacuum_count,
NULL::bigint AS autovacuum_count,
NULL::bigint AS analyze_count,
NULL::bigint AS autoanalyze_count,
NULL::timestamp with time zone AS last_vacuum,
NULL::timestamp with time zone AS last_autovacuum,
NULL::timestamp with time zone AS last_analyze,
NULL::timestamp with time zone AS last_autoanalyze
WHERE
false;
CREATE VIEW pg_statio_user_tables AS
SELECT
NULL::oid AS relid,
NULL::name AS schemaname,
NULL::name AS relname,
NULL::bigint AS heap_blks_read,
NULL::bigint AS heap_blks_hit,
NULL::bigint AS idx_blks_read,
NULL::bigint AS idx_blks_hit,
NULL::bigint AS toast_blks_read,
NULL::bigint AS toast_blks_hit,
NULL::bigint AS tidx_blks_read,
NULL::bigint AS tidx_blks_hit
WHERE
false;
CREATE VIEW pg_available_extensions AS
SELECT
NULL::name AS name,
NULL::text AS default_version,
NULL::text AS installed_version,
NULL::text AS comment
WHERE
false;
CREATE VIEW pg_stat_statements AS
SELECT
NULL::oid AS userid,
NULL::oid AS dbid,
NULL::bigint AS queryid,
NULL::text AS query,
NULL::bigint AS calls,
NULL::double precision AS total_exec_time,
NULL::bigint AS rows,
NULL::bigint AS shared_blks_hit,
NULL::bigint AS shared_blks_read,
NULL::bigint AS shared_blks_dirtied,
NULL::bigint AS shared_blks_written,
NULL::bigint AS local_blks_hit,
NULL::bigint AS local_blks_read,
NULL::bigint AS local_blks_dirtied,
NULL::bigint AS local_blks_written,
NULL::bigint AS temp_blks_read,
NULL::bigint AS temp_blks_written,
NULL::double precision AS blk_read_time,
NULL::double precision AS blk_write_time
WHERE
false;
CREATE VIEW pg_stat_progress_vacuum AS
SELECT
NULL::integer AS pid,
NULL::oid AS datid,
NULL::name AS datname,
NULL::oid AS relid,
NULL::text AS phase,
NULL::bigint AS heap_blks_total,
NULL::bigint AS heap_blks_scanned,
NULL::bigint AS heap_blks_vacuumed,
NULL::bigint AS heap_blks_frozen,
NULL::bigint AS index_vacuum_count,
NULL::bigint AS max_dead_tuples,
NULL::bigint AS num_dead_tuples
WHERE
false;
CREATE VIEW pg_stat_activity AS
SELECT
NULL::integer AS pid,
NULL::name AS usename,
NULL::text AS application_name,
NULL::inet AS client_addr,
NULL::text AS state,
NULL::timestamp with time zone AS query_start,
NULL::text AS wait_event_type,
NULL::text AS wait_event,
NULL::text AS query
WHERE
false;