From c8608c18aeeff4a251b1fa72a96e5e500bd7d975 Mon Sep 17 00:00:00 2001 From: Abhishek Kumar Singh Date: Tue, 2 Dec 2025 12:27:08 +0530 Subject: [PATCH] fix: deadlock in prom rule (#9741) --- go.mod | 3 - pkg/prometheus/prometheustest/provider.go | 61 +- pkg/query-service/app/querier/querier_test.go | 8 +- .../app/querier/v2/querier_test.go | 8 +- pkg/query-service/rules/prom_rule.go | 176 ++--- pkg/query-service/rules/promrule_test.go | 617 +++++++++++++++++- .../rules/threshold_rule_test.go | 26 +- 7 files changed, 751 insertions(+), 148 deletions(-) diff --git a/go.mod b/go.mod index 4e1f65ed8e..14309a38cb 100644 --- a/go.mod +++ b/go.mod @@ -114,7 +114,6 @@ require ( github.com/armon/go-metrics v0.4.1 // indirect github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect github.com/aws/aws-sdk-go v1.55.7 // indirect - github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 // indirect github.com/beevik/etree v1.1.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect @@ -158,7 +157,6 @@ require ( github.com/golang/snappy v1.0.0 // indirect github.com/google/btree v1.1.3 // indirect github.com/google/cel-go v0.26.1 // indirect - github.com/google/go-cmp v0.7.0 // indirect github.com/google/s2a-go v0.1.9 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.6 // indirect github.com/googleapis/gax-go/v2 v2.14.2 // indirect @@ -324,7 +322,6 @@ require ( go.opentelemetry.io/otel/sdk/metric v1.38.0 go.opentelemetry.io/proto/otlp v1.8.0 // indirect go.uber.org/atomic v1.11.0 // indirect - go.uber.org/goleak v1.3.0 // indirect go.uber.org/mock v0.6.0 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/mod v0.27.0 // indirect diff --git a/pkg/prometheus/prometheustest/provider.go b/pkg/prometheus/prometheustest/provider.go index abd92b66a7..4896fd7ef1 100644 --- a/pkg/prometheus/prometheustest/provider.go +++ b/pkg/prometheus/prometheustest/provider.go @@ -1,55 +1,42 @@ package prometheustest import ( - "log/slog" - "os" - "time" + "context" + "github.com/SigNoz/signoz/pkg/factory" "github.com/SigNoz/signoz/pkg/prometheus" + "github.com/SigNoz/signoz/pkg/prometheus/clickhouseprometheus" + "github.com/SigNoz/signoz/pkg/telemetrystore" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/storage/remote" ) var _ prometheus.Prometheus = (*Provider)(nil) type Provider struct { - db *tsdb.DB - dir string - engine *prometheus.Engine + queryable storage.SampleAndChunkQueryable + engine *prometheus.Engine } -func New(logger *slog.Logger, cfg prometheus.Config, outOfOrderTimeWindow ...int64) *Provider { - dir, err := os.MkdirTemp("", "test_storage") - if err != nil { - panic(err) - } +var stCallback = func() (int64, error) { + return int64(model.Latest), nil +} - // Tests just load data for a series sequentially. Thus we - // need a long appendable window. - opts := tsdb.DefaultOptions() - opts.MinBlockDuration = int64(24 * time.Hour / time.Millisecond) - opts.MaxBlockDuration = int64(24 * time.Hour / time.Millisecond) - opts.RetentionDuration = 0 - opts.EnableNativeHistograms = true +func New(ctx context.Context, providerSettings factory.ProviderSettings, config prometheus.Config, telemetryStore telemetrystore.TelemetryStore) *Provider { - // Set OutOfOrderTimeWindow if provided, otherwise use default (0) - if len(outOfOrderTimeWindow) > 0 { - opts.OutOfOrderTimeWindow = outOfOrderTimeWindow[0] - } else { - opts.OutOfOrderTimeWindow = 0 // Default value is zero - } + settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/prometheus/prometheustest") - db, err := tsdb.Open(dir, nil, nil, opts, tsdb.NewDBStats()) - if err != nil { - panic(err) - } + engine := prometheus.NewEngine(settings.Logger(), config) - engine := prometheus.NewEngine(logger, cfg) + readClient := clickhouseprometheus.NewReadClient(settings, telemetryStore) + + queryable := remote.NewSampleAndChunkQueryableClient(readClient, labels.EmptyLabels(), []*labels.Matcher{}, false, stCallback) return &Provider{ - db: db, - dir: dir, - engine: engine, + engine: engine, + queryable: queryable, } } @@ -58,12 +45,12 @@ func (provider *Provider) Engine() *prometheus.Engine { } func (provider *Provider) Storage() storage.Queryable { - return provider.db + return provider.queryable } func (provider *Provider) Close() error { - if err := provider.db.Close(); err != nil { - return err + if provider.engine != nil { + provider.engine.Close() } - return os.RemoveAll(provider.dir) + return nil } diff --git a/pkg/query-service/app/querier/querier_test.go b/pkg/query-service/app/querier/querier_test.go index 3c195e2fee..dc18c1722e 100644 --- a/pkg/query-service/app/querier/querier_test.go +++ b/pkg/query-service/app/querier/querier_test.go @@ -1405,7 +1405,7 @@ func Test_querier_Traces_runWindowBasedListQueryDesc(t *testing.T) { reader := clickhouseReader.NewReader( nil, telemetryStore, - prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), + prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore), "", time.Duration(time.Second), nil, @@ -1630,7 +1630,7 @@ func Test_querier_Traces_runWindowBasedListQueryAsc(t *testing.T) { reader := clickhouseReader.NewReader( nil, telemetryStore, - prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), + prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore), "", time.Duration(time.Second), nil, @@ -1930,7 +1930,7 @@ func Test_querier_Logs_runWindowBasedListQueryDesc(t *testing.T) { reader := clickhouseReader.NewReader( nil, telemetryStore, - prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), + prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore), "", time.Duration(time.Second), nil, @@ -2157,7 +2157,7 @@ func Test_querier_Logs_runWindowBasedListQueryAsc(t *testing.T) { reader := clickhouseReader.NewReader( nil, telemetryStore, - prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), + prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore), "", time.Duration(time.Second), nil, diff --git a/pkg/query-service/app/querier/v2/querier_test.go b/pkg/query-service/app/querier/v2/querier_test.go index 05322b82c2..673f19e647 100644 --- a/pkg/query-service/app/querier/v2/querier_test.go +++ b/pkg/query-service/app/querier/v2/querier_test.go @@ -1457,7 +1457,7 @@ func Test_querier_Traces_runWindowBasedListQueryDesc(t *testing.T) { reader := clickhouseReader.NewReader( nil, telemetryStore, - prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), + prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore), "", time.Duration(time.Second), nil, @@ -1682,7 +1682,7 @@ func Test_querier_Traces_runWindowBasedListQueryAsc(t *testing.T) { reader := clickhouseReader.NewReader( nil, telemetryStore, - prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), + prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore), "", time.Duration(time.Second), nil, @@ -1981,7 +1981,7 @@ func Test_querier_Logs_runWindowBasedListQueryDesc(t *testing.T) { reader := clickhouseReader.NewReader( nil, telemetryStore, - prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), + prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore), "", time.Duration(time.Second), nil, @@ -2208,7 +2208,7 @@ func Test_querier_Logs_runWindowBasedListQueryAsc(t *testing.T) { reader := clickhouseReader.NewReader( nil, telemetryStore, - prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), + prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore), "", time.Duration(time.Second), nil, diff --git a/pkg/query-service/rules/prom_rule.go b/pkg/query-service/rules/prom_rule.go index 0f46d76068..d7c5ca952f 100644 --- a/pkg/query-service/rules/prom_rule.go +++ b/pkg/query-service/rules/prom_rule.go @@ -119,15 +119,10 @@ func (r *PromRule) getPqlQuery() (string, error) { return "", fmt.Errorf("invalid promql rule query") } -func (r *PromRule) Eval(ctx context.Context, ts time.Time) (interface{}, error) { - - prevState := r.State() - +func (r *PromRule) buildAndRunQuery(ctx context.Context, ts time.Time) (ruletypes.Vector, error) { start, end := r.Timestamps(ts) interval := 60 * time.Second // TODO(srikanthccv): this should be configurable - valueFormatter := formatter.FromUnit(r.Unit()) - q, err := r.getPqlQuery() if err != nil { return nil, err @@ -140,12 +135,35 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (interface{}, error) return nil, err } + var resultVector ruletypes.Vector + for _, series := range res { + resultSeries, err := r.Threshold.Eval(toCommonSeries(series), r.Unit(), ruletypes.EvalData{ + ActiveAlerts: r.ActiveAlertsLabelFP(), + }) + if err != nil { + return nil, err + } + resultVector = append(resultVector, resultSeries...) + } + return resultVector, nil +} + +func (r *PromRule) Eval(ctx context.Context, ts time.Time) (interface{}, error) { + prevState := r.State() + valueFormatter := formatter.FromUnit(r.Unit()) + + // prepare query, run query get data and filter the data based on the threshold + results, err := r.buildAndRunQuery(ctx, ts) + if err != nil { + return nil, err + } + r.mtx.Lock() defer r.mtx.Unlock() resultFPs := map[uint64]struct{}{} - var alerts = make(map[uint64]*ruletypes.Alert, len(res)) + var alerts = make(map[uint64]*ruletypes.Alert, len(results)) ruleReceivers := r.Threshold.GetRuleReceivers() ruleReceiverMap := make(map[string][]string) @@ -153,90 +171,76 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (interface{}, error) ruleReceiverMap[value.Name] = value.Channels } - for _, series := range res { + for _, result := range results { + l := make(map[string]string, len(result.Metric)) + for _, lbl := range result.Metric { + l[lbl.Name] = lbl.Value + } + r.logger.DebugContext(ctx, "alerting for series", "rule_name", r.Name(), "series", result) - if len(series.Floats) == 0 { - continue + threshold := valueFormatter.Format(result.Target, result.TargetUnit) + + tmplData := ruletypes.AlertTemplateData(l, valueFormatter.Format(result.V, r.Unit()), threshold) + // Inject some convenience variables that are easier to remember for users + // who are not used to Go's templating system. + defs := "{{$labels := .Labels}}{{$value := .Value}}{{$threshold := .Threshold}}" + + expand := func(text string) string { + + tmpl := ruletypes.NewTemplateExpander( + ctx, + defs+text, + "__alert_"+r.Name(), + tmplData, + times.Time(timestamp.FromTime(ts)), + nil, + ) + result, err := tmpl.Expand() + if err != nil { + result = fmt.Sprintf("", err) + r.logger.WarnContext(ctx, "Expanding alert template failed", "rule_name", r.Name(), "error", err, "data", tmplData) + } + return result } - results, err := r.Threshold.Eval(toCommonSeries(series), r.Unit(), ruletypes.EvalData{ - ActiveAlerts: r.ActiveAlertsLabelFP(), - }) - if err != nil { + lb := qslabels.NewBuilder(result.Metric).Del(qslabels.MetricNameLabel) + resultLabels := qslabels.NewBuilder(result.Metric).Del(qslabels.MetricNameLabel).Labels() + + for name, value := range r.labels.Map() { + lb.Set(name, expand(value)) + } + + lb.Set(qslabels.AlertNameLabel, r.Name()) + lb.Set(qslabels.AlertRuleIdLabel, r.ID()) + lb.Set(qslabels.RuleSourceLabel, r.GeneratorURL()) + + annotations := make(qslabels.Labels, 0, len(r.annotations.Map())) + for name, value := range r.annotations.Map() { + annotations = append(annotations, qslabels.Label{Name: name, Value: expand(value)}) + } + + lbs := lb.Labels() + h := lbs.Hash() + resultFPs[h] = struct{}{} + + if _, ok := alerts[h]; ok { + err = fmt.Errorf("vector contains metrics with the same labelset after applying alert labels") + // We have already acquired the lock above hence using SetHealth and + // SetLastError will deadlock. + r.health = ruletypes.HealthBad + r.lastError = err return nil, err } - - for _, result := range results { - l := make(map[string]string, len(series.Metric)) - for _, lbl := range series.Metric { - l[lbl.Name] = lbl.Value - } - r.logger.DebugContext(ctx, "alerting for series", "rule_name", r.Name(), "series", series) - - threshold := valueFormatter.Format(result.Target, result.TargetUnit) - - tmplData := ruletypes.AlertTemplateData(l, valueFormatter.Format(result.V, r.Unit()), threshold) - // Inject some convenience variables that are easier to remember for users - // who are not used to Go's templating system. - defs := "{{$labels := .Labels}}{{$value := .Value}}{{$threshold := .Threshold}}" - - expand := func(text string) string { - - tmpl := ruletypes.NewTemplateExpander( - ctx, - defs+text, - "__alert_"+r.Name(), - tmplData, - times.Time(timestamp.FromTime(ts)), - nil, - ) - result, err := tmpl.Expand() - if err != nil { - result = fmt.Sprintf("", err) - r.logger.WarnContext(ctx, "Expanding alert template failed", "rule_name", r.Name(), "error", err, "data", tmplData) - } - return result - } - - lb := qslabels.NewBuilder(result.Metric).Del(qslabels.MetricNameLabel) - resultLabels := qslabels.NewBuilder(result.Metric).Del(qslabels.MetricNameLabel).Labels() - - for name, value := range r.labels.Map() { - lb.Set(name, expand(value)) - } - - lb.Set(qslabels.AlertNameLabel, r.Name()) - lb.Set(qslabels.AlertRuleIdLabel, r.ID()) - lb.Set(qslabels.RuleSourceLabel, r.GeneratorURL()) - - annotations := make(qslabels.Labels, 0, len(r.annotations.Map())) - for name, value := range r.annotations.Map() { - annotations = append(annotations, qslabels.Label{Name: name, Value: expand(value)}) - } - - lbs := lb.Labels() - h := lbs.Hash() - resultFPs[h] = struct{}{} - - if _, ok := alerts[h]; ok { - err = fmt.Errorf("vector contains metrics with the same labelset after applying alert labels") - // We have already acquired the lock above hence using SetHealth and - // SetLastError will deadlock. - r.health = ruletypes.HealthBad - r.lastError = err - return nil, err - } - alerts[h] = &ruletypes.Alert{ - Labels: lbs, - QueryResultLables: resultLabels, - Annotations: annotations, - ActiveAt: ts, - State: model.StatePending, - Value: result.V, - GeneratorURL: r.GeneratorURL(), - Receivers: ruleReceiverMap[lbs.Map()[ruletypes.LabelThresholdName]], - IsRecovering: result.IsRecovering, - } + alerts[h] = &ruletypes.Alert{ + Labels: lbs, + QueryResultLables: resultLabels, + Annotations: annotations, + ActiveAt: ts, + State: model.StatePending, + Value: result.V, + GeneratorURL: r.GeneratorURL(), + Receivers: ruleReceiverMap[lbs.Map()[ruletypes.LabelThresholdName]], + IsRecovering: result.IsRecovering, } } diff --git a/pkg/query-service/rules/promrule_test.go b/pkg/query-service/rules/promrule_test.go index a1a1cc69ae..55c04ebaef 100644 --- a/pkg/query-service/rules/promrule_test.go +++ b/pkg/query-service/rules/promrule_test.go @@ -1,14 +1,23 @@ package rules import ( + "context" + "strings" "testing" "time" "github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest" + "github.com/SigNoz/signoz/pkg/prometheus" + "github.com/SigNoz/signoz/pkg/prometheus/prometheustest" + "github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader" v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3" + qslabels "github.com/SigNoz/signoz/pkg/query-service/utils/labels" + "github.com/SigNoz/signoz/pkg/telemetrystore" + "github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest" ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes" "github.com/SigNoz/signoz/pkg/valuer" pql "github.com/prometheus/prometheus/promql" + cmock "github.com/srikanthccv/ClickHouse-go-mock" "github.com/stretchr/testify/assert" ) @@ -723,6 +732,612 @@ func TestPromRuleEval(t *testing.T) { assert.Empty(t, resultVectors, "Expected no alert but got result vectors for case %d", idx) } } - + } +} + +func TestPromRuleUnitCombinations(t *testing.T) { + // fixed base time for deterministic tests + baseTime := time.Unix(1700000000, 0) + evalTime := baseTime.Add(5 * time.Minute) + + postableRule := ruletypes.PostableRule{ + AlertName: "Units test", + AlertType: ruletypes.AlertTypeMetric, + RuleType: ruletypes.RuleTypeProm, + Evaluation: &ruletypes.EvaluationEnvelope{Kind: ruletypes.RollingEvaluation, Spec: ruletypes.RollingWindow{ + EvalWindow: ruletypes.Duration(5 * time.Minute), + Frequency: ruletypes.Duration(1 * time.Minute), + }}, + RuleCondition: &ruletypes.RuleCondition{ + CompositeQuery: &v3.CompositeQuery{ + QueryType: v3.QueryTypePromQL, + PromQueries: map[string]*v3.PromQuery{ + "A": { + Query: "test_metric", + }, + }, + }, + }, + } + + // time_series_v4 cols of interest + fingerprintCols := []cmock.ColumnType{ + {Name: "fingerprint", Type: "UInt64"}, + {Name: "any(labels)", Type: "String"}, + } + + // samples_v4 columns + samplesCols := []cmock.ColumnType{ + {Name: "metric_name", Type: "String"}, + {Name: "fingerprint", Type: "UInt64"}, + {Name: "unix_milli", Type: "Int64"}, + {Name: "value", Type: "Float64"}, + {Name: "flags", Type: "UInt32"}, + } + + // see Timestamps on base_rule + evalWindowMs := int64(5 * 60 * 1000) // 5 minutes in ms + evalTimeMs := evalTime.UnixMilli() + queryStart := ((evalTimeMs-2*evalWindowMs)/60000)*60000 + 1 // truncate to minute + 1ms + queryEnd := (evalTimeMs / 60000) * 60000 // truncate to minute + + cases := []struct { + targetUnit string + yAxisUnit string + values []struct { + timestamp time.Time + value float64 + } + expectAlerts int + compareOp string + matchType string + target float64 + summaryAny []string + }{ + { + targetUnit: "s", + yAxisUnit: "ns", + values: []struct { + timestamp time.Time + value float64 + }{ + {baseTime, 572588400}, // 0.57 seconds + {baseTime.Add(1 * time.Minute), 572386400}, // 0.57 seconds + {baseTime.Add(2 * time.Minute), 300947400}, // 0.3 seconds + {baseTime.Add(3 * time.Minute), 299316000}, // 0.3 seconds + {baseTime.Add(4 * time.Minute), 66640400.00000001}, // 0.06 seconds + }, + expectAlerts: 0, + compareOp: "1", // Above + matchType: "1", // Once + target: 1, // 1 second + }, + { + targetUnit: "ms", + yAxisUnit: "ns", + values: []struct { + timestamp time.Time + value float64 + }{ + {baseTime, 572588400}, // 572.58 ms + {baseTime.Add(1 * time.Minute), 572386400}, // 572.38 ms + {baseTime.Add(2 * time.Minute), 300947400}, // 300.94 ms + {baseTime.Add(3 * time.Minute), 299316000}, // 299.31 ms + {baseTime.Add(4 * time.Minute), 66640400.00000001}, // 66.64 ms + }, + expectAlerts: 1, + compareOp: "1", // Above + matchType: "1", // Once + target: 200, // 200 ms + summaryAny: []string{ + "observed metric value is 299 ms", + "the observed metric value is 573 ms", + "the observed metric value is 572 ms", + "the observed metric value is 301 ms", + }, + }, + { + targetUnit: "decgbytes", + yAxisUnit: "bytes", + values: []struct { + timestamp time.Time + value float64 + }{ + {baseTime, 2863284053}, // 2.86 GB + {baseTime.Add(1 * time.Minute), 2863388842}, // 2.86 GB + {baseTime.Add(2 * time.Minute), 300947400}, // 0.3 GB + {baseTime.Add(3 * time.Minute), 299316000}, // 0.3 GB + {baseTime.Add(4 * time.Minute), 66640400.00000001}, // 66.64 MB + }, + expectAlerts: 0, + compareOp: "1", // Above + matchType: "1", // Once + target: 200, // 200 GB + }, + { + targetUnit: "decgbytes", + yAxisUnit: "By", + values: []struct { + timestamp time.Time + value float64 + }{ + {baseTime, 2863284053}, // 2.86 GB + {baseTime.Add(1 * time.Minute), 2863388842}, // 2.86 GB + {baseTime.Add(2 * time.Minute), 300947400}, // 0.3 GB + {baseTime.Add(3 * time.Minute), 299316000}, // 0.3 GB + {baseTime.Add(4 * time.Minute), 66640400.00000001}, // 66.64 MB + }, + expectAlerts: 0, + compareOp: "1", // Above + matchType: "1", // Once + target: 200, // 200 GB + }, + { + targetUnit: "h", + yAxisUnit: "min", + values: []struct { + timestamp time.Time + value float64 + }{ + {baseTime, 55}, // 55 minutes + {baseTime.Add(1 * time.Minute), 57}, // 57 minutes + {baseTime.Add(2 * time.Minute), 30}, // 30 minutes + {baseTime.Add(3 * time.Minute), 29}, // 29 minutes + }, + expectAlerts: 0, + compareOp: "1", // Above + matchType: "1", // Once + target: 1, // 1 hour + }, + } + + logger := instrumentationtest.New().Logger() + + for idx, c := range cases { + telemetryStore := telemetrystoretest.New(telemetrystore.Config{}, &queryMatcherAny{}) + + // single fingerprint with labels JSON + fingerprint := uint64(12345) + labelsJSON := `{"__name__":"test_metric"}` + fingerprintData := [][]interface{}{ + {fingerprint, labelsJSON}, + } + fingerprintRows := cmock.NewRows(fingerprintCols, fingerprintData) + + // create samples data from test case values + samplesData := make([][]interface{}, len(c.values)) + for i, v := range c.values { + samplesData[i] = []interface{}{ + "test_metric", + fingerprint, + v.timestamp.UnixMilli(), + v.value, + uint32(0), // flags - 0 means normal value, 1 means stale, we are not doing staleness tests + } + } + samplesRows := cmock.NewRows(samplesCols, samplesData) + + // args: $1=metric_name, $2=label_name, $3=label_value + telemetryStore.Mock(). + ExpectQuery("SELECT fingerprint, any"). + WithArgs("test_metric", "__name__", "test_metric"). + WillReturnRows(fingerprintRows) + + // args: $1=metric_name (outer), $2=metric_name (subquery), $3=label_name, $4=label_value, $5=start, $6=end + telemetryStore.Mock(). + ExpectQuery("SELECT metric_name, fingerprint, unix_milli"). + WithArgs( + "test_metric", + "test_metric", + "__name__", + "test_metric", + queryStart, + queryEnd, + ). + WillReturnRows(samplesRows) + + promProvider := prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore) + + postableRule.RuleCondition.CompareOp = ruletypes.CompareOp(c.compareOp) + postableRule.RuleCondition.MatchType = ruletypes.MatchType(c.matchType) + postableRule.RuleCondition.Target = &c.target + postableRule.RuleCondition.CompositeQuery.Unit = c.yAxisUnit + postableRule.RuleCondition.TargetUnit = c.targetUnit + postableRule.RuleCondition.Thresholds = &ruletypes.RuleThresholdData{ + Kind: ruletypes.BasicThresholdKind, + Spec: ruletypes.BasicRuleThresholds{ + { + Name: postableRule.AlertName, + TargetValue: &c.target, + TargetUnit: c.targetUnit, + MatchType: ruletypes.MatchType(c.matchType), + CompareOp: ruletypes.CompareOp(c.compareOp), + }, + }, + } + postableRule.Annotations = map[string]string{ + "description": "This alert is fired when the defined metric (current value: {{$value}}) crosses the threshold ({{$threshold}})", + "summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}", + } + + options := clickhouseReader.NewOptions("", "", "archiveNamespace") + reader := clickhouseReader.NewReader(nil, telemetryStore, promProvider, "", time.Duration(time.Second), nil, nil, options) + rule, err := NewPromRule("69", valuer.GenerateUUID(), &postableRule, logger, reader, promProvider) + if err != nil { + assert.NoError(t, err) + promProvider.Close() + continue + } + + retVal, err := rule.Eval(context.Background(), evalTime) + if err != nil { + assert.NoError(t, err) + promProvider.Close() + continue + } + + assert.Equal(t, c.expectAlerts, retVal.(int), "case %d", idx) + if c.expectAlerts != 0 { + foundCount := 0 + for _, item := range rule.Active { + for _, summary := range c.summaryAny { + if strings.Contains(item.Annotations.Get("summary"), summary) { + foundCount++ + break + } + } + } + assert.Equal(t, c.expectAlerts, foundCount, "case %d", idx) + } + + promProvider.Close() + } +} + +// TODO(abhishekhugetech): enable this +func _Enable_this_after_9146_issue_fix_is_merged_TestPromRuleNoData(t *testing.T) { + baseTime := time.Unix(1700000000, 0) + evalTime := baseTime.Add(5 * time.Minute) + + postableRule := ruletypes.PostableRule{ + AlertName: "No data test", + AlertType: ruletypes.AlertTypeMetric, + RuleType: ruletypes.RuleTypeProm, + Evaluation: &ruletypes.EvaluationEnvelope{Kind: ruletypes.RollingEvaluation, Spec: ruletypes.RollingWindow{ + EvalWindow: ruletypes.Duration(5 * time.Minute), + Frequency: ruletypes.Duration(1 * time.Minute), + }}, + RuleCondition: &ruletypes.RuleCondition{ + CompositeQuery: &v3.CompositeQuery{ + QueryType: v3.QueryTypePromQL, + PromQueries: map[string]*v3.PromQuery{ + "A": { + Query: "test_metric", + }, + }, + }, + AlertOnAbsent: true, + }, + } + + // time_series_v4 cols of interest + fingerprintCols := []cmock.ColumnType{ + {Name: "fingerprint", Type: "UInt64"}, + {Name: "any(labels)", Type: "String"}, + } + + cases := []struct { + values []struct { + timestamp time.Time + value float64 + } + expectNoData bool + }{ + { + values: []struct { + timestamp time.Time + value float64 + }{}, + expectNoData: true, + }, + } + + logger := instrumentationtest.New().Logger() + + for idx, c := range cases { + telemetryStore := telemetrystoretest.New(telemetrystore.Config{}, &queryMatcherAny{}) + + // no data + fingerprintData := [][]interface{}{} + fingerprintRows := cmock.NewRows(fingerprintCols, fingerprintData) + + // no rows == no data + telemetryStore.Mock(). + ExpectQuery("SELECT fingerprint, any"). + WithArgs("test_metric", "__name__", "test_metric"). + WillReturnRows(fingerprintRows) + + promProvider := prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore) + + var target float64 = 0 + postableRule.RuleCondition.Thresholds = &ruletypes.RuleThresholdData{ + Kind: ruletypes.BasicThresholdKind, + Spec: ruletypes.BasicRuleThresholds{ + { + Name: postableRule.AlertName, + TargetValue: &target, + MatchType: ruletypes.AtleastOnce, + CompareOp: ruletypes.ValueIsEq, + }, + }, + } + postableRule.Annotations = map[string]string{ + "description": "This alert is fired when the defined metric (current value: {{$value}}) crosses the threshold ({{$threshold}})", + "summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}", + } + + options := clickhouseReader.NewOptions("", "", "archiveNamespace") + reader := clickhouseReader.NewReader(nil, telemetryStore, promProvider, "", time.Duration(time.Second), nil, nil, options) + rule, err := NewPromRule("69", valuer.GenerateUUID(), &postableRule, logger, reader, promProvider) + if err != nil { + assert.NoError(t, err) + promProvider.Close() + continue + } + + retVal, err := rule.Eval(context.Background(), evalTime) + if err != nil { + assert.NoError(t, err) + promProvider.Close() + continue + } + + assert.Equal(t, 1, retVal.(int), "case %d", idx) + for _, item := range rule.Active { + if c.expectNoData { + assert.True(t, strings.Contains(item.Labels.Get(qslabels.AlertNameLabel), "[No data]"), "case %d", idx) + } else { + assert.False(t, strings.Contains(item.Labels.Get(qslabels.AlertNameLabel), "[No data]"), "case %d", idx) + } + } + + promProvider.Close() + } +} + +func TestMultipleThresholdPromRule(t *testing.T) { + // fixed base time for deterministic tests + baseTime := time.Unix(1700000000, 0) + evalTime := baseTime.Add(5 * time.Minute) + + postableRule := ruletypes.PostableRule{ + AlertName: "Multiple threshold test", + AlertType: ruletypes.AlertTypeMetric, + RuleType: ruletypes.RuleTypeProm, + Evaluation: &ruletypes.EvaluationEnvelope{Kind: ruletypes.RollingEvaluation, Spec: ruletypes.RollingWindow{ + EvalWindow: ruletypes.Duration(5 * time.Minute), + Frequency: ruletypes.Duration(1 * time.Minute), + }}, + RuleCondition: &ruletypes.RuleCondition{ + CompositeQuery: &v3.CompositeQuery{ + QueryType: v3.QueryTypePromQL, + PromQueries: map[string]*v3.PromQuery{ + "A": { + Query: "test_metric", + }, + }, + }, + }, + } + + fingerprintCols := []cmock.ColumnType{ + {Name: "fingerprint", Type: "UInt64"}, + {Name: "any(labels)", Type: "String"}, + } + + samplesCols := []cmock.ColumnType{ + {Name: "metric_name", Type: "String"}, + {Name: "fingerprint", Type: "UInt64"}, + {Name: "unix_milli", Type: "Int64"}, + {Name: "value", Type: "Float64"}, + {Name: "flags", Type: "UInt32"}, + } + + // see .Timestamps of base rule + evalWindowMs := int64(5 * 60 * 1000) + evalTimeMs := evalTime.UnixMilli() + queryStart := ((evalTimeMs-2*evalWindowMs)/60000)*60000 + 1 + queryEnd := (evalTimeMs / 60000) * 60000 + + cases := []struct { + targetUnit string + yAxisUnit string + values []struct { + timestamp time.Time + value float64 + } + expectAlerts int + compareOp string + matchType string + target float64 + secondTarget float64 + summaryAny []string + }{ + { + targetUnit: "s", + yAxisUnit: "ns", + values: []struct { + timestamp time.Time + value float64 + }{ + {baseTime, 572588400}, // 0.57 seconds + {baseTime.Add(1 * time.Minute), 572386400}, // 0.57 seconds + {baseTime.Add(2 * time.Minute), 300947400}, // 0.3 seconds + {baseTime.Add(3 * time.Minute), 299316000}, // 0.3 seconds + {baseTime.Add(4 * time.Minute), 66640400.00000001}, // 0.06 seconds + }, + expectAlerts: 1, + compareOp: "1", // Above + matchType: "1", // Once + target: 1, // 1 second + secondTarget: .5, + summaryAny: []string{ + "observed metric value is 573 ms", + "observed metric value is 572 ms", + }, + }, + { + targetUnit: "ms", + yAxisUnit: "ns", + values: []struct { + timestamp time.Time + value float64 + }{ + {baseTime, 572588400}, // 572.58 ms + {baseTime.Add(1 * time.Minute), 572386400}, // 572.38 ms + {baseTime.Add(2 * time.Minute), 300947400}, // 300.94 ms + {baseTime.Add(3 * time.Minute), 299316000}, // 299.31 ms + {baseTime.Add(4 * time.Minute), 66640400.00000001}, // 66.64 ms + }, + expectAlerts: 2, // One alert per threshold that fires + compareOp: "1", // Above + matchType: "1", // Once + target: 200, // 200 ms + secondTarget: 500, + summaryAny: []string{ + "observed metric value is 299 ms", + "the observed metric value is 573 ms", + "the observed metric value is 572 ms", + "the observed metric value is 301 ms", + }, + }, + { + targetUnit: "decgbytes", + yAxisUnit: "bytes", + values: []struct { + timestamp time.Time + value float64 + }{ + {baseTime, 2863284053}, // 2.86 GB + {baseTime.Add(1 * time.Minute), 2863388842}, // 2.86 GB + {baseTime.Add(2 * time.Minute), 300947400}, // 0.3 GB + {baseTime.Add(3 * time.Minute), 299316000}, // 0.3 GB + {baseTime.Add(4 * time.Minute), 66640400.00000001}, // 66.64 MB + }, + expectAlerts: 1, + compareOp: "1", // Above + matchType: "1", // Once + target: 200, // 200 GB + secondTarget: 2, // 2GB + summaryAny: []string{ + "observed metric value is 2.7 GiB", + "the observed metric value is 0.3 GB", + }, + }, + } + + logger := instrumentationtest.New().Logger() + + for idx, c := range cases { + telemetryStore := telemetrystoretest.New(telemetrystore.Config{}, &queryMatcherAny{}) + + fingerprint := uint64(12345) + labelsJSON := `{"__name__":"test_metric"}` + fingerprintData := [][]interface{}{ + {fingerprint, labelsJSON}, + } + fingerprintRows := cmock.NewRows(fingerprintCols, fingerprintData) + + samplesData := make([][]interface{}, len(c.values)) + for i, v := range c.values { + samplesData[i] = []interface{}{ + "test_metric", + fingerprint, + v.timestamp.UnixMilli(), + v.value, + uint32(0), + } + } + samplesRows := cmock.NewRows(samplesCols, samplesData) + + telemetryStore.Mock(). + ExpectQuery("SELECT fingerprint, any"). + WithArgs("test_metric", "__name__", "test_metric"). + WillReturnRows(fingerprintRows) + + telemetryStore.Mock(). + ExpectQuery("SELECT metric_name, fingerprint, unix_milli"). + WithArgs( + "test_metric", + "test_metric", + "__name__", + "test_metric", + queryStart, + queryEnd, + ). + WillReturnRows(samplesRows) + + promProvider := prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore) + + postableRule.RuleCondition.CompareOp = ruletypes.CompareOp(c.compareOp) + postableRule.RuleCondition.MatchType = ruletypes.MatchType(c.matchType) + postableRule.RuleCondition.Target = &c.target + postableRule.RuleCondition.CompositeQuery.Unit = c.yAxisUnit + postableRule.RuleCondition.TargetUnit = c.targetUnit + postableRule.RuleCondition.Thresholds = &ruletypes.RuleThresholdData{ + Kind: ruletypes.BasicThresholdKind, + Spec: ruletypes.BasicRuleThresholds{ + { + Name: "first_threshold", + TargetValue: &c.target, + TargetUnit: c.targetUnit, + MatchType: ruletypes.MatchType(c.matchType), + CompareOp: ruletypes.CompareOp(c.compareOp), + }, + { + Name: "second_threshold", + TargetValue: &c.secondTarget, + TargetUnit: c.targetUnit, + MatchType: ruletypes.MatchType(c.matchType), + CompareOp: ruletypes.CompareOp(c.compareOp), + }, + }, + } + postableRule.Annotations = map[string]string{ + "description": "This alert is fired when the defined metric (current value: {{$value}}) crosses the threshold ({{$threshold}})", + "summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}", + } + + options := clickhouseReader.NewOptions("", "", "archiveNamespace") + reader := clickhouseReader.NewReader(nil, telemetryStore, promProvider, "", time.Duration(time.Second), nil, nil, options) + rule, err := NewPromRule("69", valuer.GenerateUUID(), &postableRule, logger, reader, promProvider) + if err != nil { + assert.NoError(t, err) + promProvider.Close() + continue + } + + retVal, err := rule.Eval(context.Background(), evalTime) + if err != nil { + assert.NoError(t, err) + promProvider.Close() + continue + } + + assert.Equal(t, c.expectAlerts, retVal.(int), "case %d", idx) + if c.expectAlerts != 0 { + foundCount := 0 + for _, item := range rule.Active { + for _, summary := range c.summaryAny { + if strings.Contains(item.Annotations.Get("summary"), summary) { + foundCount++ + break + } + } + } + assert.Equal(t, c.expectAlerts, foundCount, "case %d", idx) + } + + promProvider.Close() } } diff --git a/pkg/query-service/rules/threshold_rule_test.go b/pkg/query-service/rules/threshold_rule_test.go index 2990b1819b..f3e9804680 100644 --- a/pkg/query-service/rules/threshold_rule_test.go +++ b/pkg/query-service/rules/threshold_rule_test.go @@ -488,7 +488,7 @@ func TestThresholdRuleEvalDelay(t *testing.T) { AlertName: "Test Eval Delay", AlertType: ruletypes.AlertTypeMetric, RuleType: ruletypes.RuleTypeThreshold, - Evaluation: &ruletypes.EvaluationEnvelope{ruletypes.RollingEvaluation, ruletypes.RollingWindow{ + Evaluation: &ruletypes.EvaluationEnvelope{Kind: ruletypes.RollingEvaluation, Spec: ruletypes.RollingWindow{ EvalWindow: ruletypes.Duration(5 * time.Minute), Frequency: ruletypes.Duration(1 * time.Minute), }}, @@ -551,7 +551,7 @@ func TestThresholdRuleClickHouseTmpl(t *testing.T) { AlertName: "Tricky Condition Tests", AlertType: ruletypes.AlertTypeMetric, RuleType: ruletypes.RuleTypeThreshold, - Evaluation: &ruletypes.EvaluationEnvelope{ruletypes.RollingEvaluation, ruletypes.RollingWindow{ + Evaluation: &ruletypes.EvaluationEnvelope{Kind: ruletypes.RollingEvaluation, Spec: ruletypes.RollingWindow{ EvalWindow: ruletypes.Duration(5 * time.Minute), Frequency: ruletypes.Duration(1 * time.Minute), }}, @@ -620,7 +620,7 @@ func TestThresholdRuleUnitCombinations(t *testing.T) { AlertName: "Units test", AlertType: ruletypes.AlertTypeMetric, RuleType: ruletypes.RuleTypeThreshold, - Evaluation: &ruletypes.EvaluationEnvelope{ruletypes.RollingEvaluation, ruletypes.RollingWindow{ + Evaluation: &ruletypes.EvaluationEnvelope{Kind: ruletypes.RollingEvaluation, Spec: ruletypes.RollingWindow{ EvalWindow: ruletypes.Duration(5 * time.Minute), Frequency: ruletypes.Duration(1 * time.Minute), }}, @@ -784,7 +784,7 @@ func TestThresholdRuleUnitCombinations(t *testing.T) { }, ) require.NoError(t, err) - reader := clickhouseReader.NewReader(nil, telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", time.Duration(time.Second), nil, readerCache, options) + reader := clickhouseReader.NewReader(nil, telemetryStore, prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore), "", time.Duration(time.Second), nil, readerCache, options) rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, reader, nil, logger) rule.TemporalityMap = map[string]map[v3.Temporality]bool{ "signoz_calls_total": { @@ -821,7 +821,7 @@ func TestThresholdRuleNoData(t *testing.T) { AlertName: "No data test", AlertType: ruletypes.AlertTypeMetric, RuleType: ruletypes.RuleTypeThreshold, - Evaluation: &ruletypes.EvaluationEnvelope{ruletypes.RollingEvaluation, ruletypes.RollingWindow{ + Evaluation: &ruletypes.EvaluationEnvelope{Kind: ruletypes.RollingEvaluation, Spec: ruletypes.RollingWindow{ EvalWindow: ruletypes.Duration(5 * time.Minute), Frequency: ruletypes.Duration(1 * time.Minute), }}, @@ -899,7 +899,7 @@ func TestThresholdRuleNoData(t *testing.T) { ) assert.NoError(t, err) options := clickhouseReader.NewOptions("", "", "archiveNamespace") - reader := clickhouseReader.NewReader(nil, telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", time.Duration(time.Second), nil, readerCache, options) + reader := clickhouseReader.NewReader(nil, telemetryStore, prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore), "", time.Duration(time.Second), nil, readerCache, options) rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, reader, nil, logger) rule.TemporalityMap = map[string]map[v3.Temporality]bool{ @@ -932,7 +932,7 @@ func TestThresholdRuleTracesLink(t *testing.T) { AlertName: "Traces link test", AlertType: ruletypes.AlertTypeTraces, RuleType: ruletypes.RuleTypeThreshold, - Evaluation: &ruletypes.EvaluationEnvelope{ruletypes.RollingEvaluation, ruletypes.RollingWindow{ + Evaluation: &ruletypes.EvaluationEnvelope{Kind: ruletypes.RollingEvaluation, Spec: ruletypes.RollingWindow{ EvalWindow: ruletypes.Duration(5 * time.Minute), Frequency: ruletypes.Duration(1 * time.Minute), }}, @@ -1019,7 +1019,7 @@ func TestThresholdRuleTracesLink(t *testing.T) { } options := clickhouseReader.NewOptions("", "", "archiveNamespace") - reader := clickhouseReader.NewReader(nil, telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", time.Duration(time.Second), nil, nil, options) + reader := clickhouseReader.NewReader(nil, telemetryStore, prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore), "", time.Duration(time.Second), nil, nil, options) rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, reader, nil, logger) rule.TemporalityMap = map[string]map[v3.Temporality]bool{ @@ -1057,7 +1057,7 @@ func TestThresholdRuleLogsLink(t *testing.T) { AlertName: "Logs link test", AlertType: ruletypes.AlertTypeLogs, RuleType: ruletypes.RuleTypeThreshold, - Evaluation: &ruletypes.EvaluationEnvelope{ruletypes.RollingEvaluation, ruletypes.RollingWindow{ + Evaluation: &ruletypes.EvaluationEnvelope{Kind: ruletypes.RollingEvaluation, Spec: ruletypes.RollingWindow{ EvalWindow: ruletypes.Duration(5 * time.Minute), Frequency: ruletypes.Duration(1 * time.Minute), }}, @@ -1156,7 +1156,7 @@ func TestThresholdRuleLogsLink(t *testing.T) { } options := clickhouseReader.NewOptions("", "", "archiveNamespace") - reader := clickhouseReader.NewReader(nil, telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", time.Duration(time.Second), nil, nil, options) + reader := clickhouseReader.NewReader(nil, telemetryStore, prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore), "", time.Duration(time.Second), nil, nil, options) rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, reader, nil, logger) rule.TemporalityMap = map[string]map[v3.Temporality]bool{ @@ -1195,7 +1195,7 @@ func TestThresholdRuleShiftBy(t *testing.T) { AlertName: "Logs link test", AlertType: ruletypes.AlertTypeLogs, RuleType: ruletypes.RuleTypeThreshold, - Evaluation: &ruletypes.EvaluationEnvelope{ruletypes.RollingEvaluation, ruletypes.RollingWindow{ + Evaluation: &ruletypes.EvaluationEnvelope{Kind: ruletypes.RollingEvaluation, Spec: ruletypes.RollingWindow{ EvalWindow: ruletypes.Duration(5 * time.Minute), Frequency: ruletypes.Duration(1 * time.Minute), }}, @@ -1269,7 +1269,7 @@ func TestMultipleThresholdRule(t *testing.T) { AlertName: "Mulitple threshold test", AlertType: ruletypes.AlertTypeMetric, RuleType: ruletypes.RuleTypeThreshold, - Evaluation: &ruletypes.EvaluationEnvelope{ruletypes.RollingEvaluation, ruletypes.RollingWindow{ + Evaluation: &ruletypes.EvaluationEnvelope{Kind: ruletypes.RollingEvaluation, Spec: ruletypes.RollingWindow{ EvalWindow: ruletypes.Duration(5 * time.Minute), Frequency: ruletypes.Duration(1 * time.Minute), }}, @@ -1423,7 +1423,7 @@ func TestMultipleThresholdRule(t *testing.T) { }, ) require.NoError(t, err) - reader := clickhouseReader.NewReader(nil, telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", time.Duration(time.Second), nil, readerCache, options) + reader := clickhouseReader.NewReader(nil, telemetryStore, prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore), "", time.Duration(time.Second), nil, readerCache, options) rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, reader, nil, logger) rule.TemporalityMap = map[string]map[v3.Temporality]bool{ "signoz_calls_total": {