fix: add support for minimum data points in PromQL alerts (#9975)
Some checks failed
build-staging / prepare (push) Has been cancelled
build-staging / js-build (push) Has been cancelled
build-staging / go-build (push) Has been cancelled
build-staging / staging (push) Has been cancelled
Release Drafter / update_release_draft (push) Has been cancelled

This commit is contained in:
Jatinderjit Singh
2026-01-21 16:00:30 +05:30
committed by GitHub
parent 8629c959f0
commit 0865d2edaf
14 changed files with 482 additions and 243 deletions

View File

@@ -240,11 +240,9 @@ func (r *AnomalyRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID, t
r.logger.InfoContext(ctx, "anomaly scores", "scores", string(scoresJSON))
for _, series := range queryResult.AnomalyScores {
if r.Condition() != nil && r.Condition().RequireMinPoints {
if len(series.Points) < r.Condition().RequiredNumPoints {
r.logger.InfoContext(ctx, "not enough data points to evaluate series, skipping", "ruleid", r.ID(), "numPoints", len(series.Points), "requiredPoints", r.Condition().RequiredNumPoints)
continue
}
if !r.Condition().ShouldEval(series) {
r.logger.InfoContext(ctx, "not enough data points to evaluate series, skipping", "ruleid", r.ID(), "numPoints", len(series.Points), "requiredPoints", r.Condition().RequiredNumPoints)
continue
}
results, err := r.Threshold.Eval(*series, r.Unit(), ruletypes.EvalData{
ActiveAlerts: r.ActiveAlertsLabelFP(),
@@ -305,11 +303,9 @@ func (r *AnomalyRule) buildAndRunQueryV5(ctx context.Context, orgID valuer.UUID,
}
for _, series := range seriesToProcess {
if r.Condition().RequireMinPoints {
if len(series.Points) < r.Condition().RequiredNumPoints {
r.logger.InfoContext(ctx, "not enough data points to evaluate series, skipping", "ruleid", r.ID(), "numPoints", len(series.Points), "requiredPoints", r.Condition().RequiredNumPoints)
continue
}
if !r.Condition().ShouldEval(series) {
r.logger.InfoContext(ctx, "not enough data points to evaluate series, skipping", "ruleid", r.ID(), "numPoints", len(series.Points), "requiredPoints", r.Condition().RequiredNumPoints)
continue
}
results, err := r.Threshold.Eval(*series, r.Unit(), ruletypes.EvalData{
ActiveAlerts: r.ActiveAlertsLabelFP(),
@@ -323,7 +319,7 @@ func (r *AnomalyRule) buildAndRunQueryV5(ctx context.Context, orgID valuer.UUID,
return resultVector, nil
}
func (r *AnomalyRule) Eval(ctx context.Context, ts time.Time) (interface{}, error) {
func (r *AnomalyRule) Eval(ctx context.Context, ts time.Time) (int, error) {
prevState := r.State()
@@ -340,7 +336,7 @@ func (r *AnomalyRule) Eval(ctx context.Context, ts time.Time) (interface{}, erro
res, err = r.buildAndRunQuery(ctx, r.OrgID(), ts)
}
if err != nil {
return nil, err
return 0, err
}
r.mtx.Lock()
@@ -415,7 +411,7 @@ func (r *AnomalyRule) Eval(ctx context.Context, ts time.Time) (interface{}, erro
if _, ok := alerts[h]; ok {
r.logger.ErrorContext(ctx, "the alert query returns duplicate records", "rule_id", r.ID(), "alert", alerts[h])
err = fmt.Errorf("duplicate alert found, vector contains metrics with the same labelset after applying alert labels")
return nil, err
return 0, err
}
alerts[h] = &ruletypes.Alert{

View File

@@ -10,7 +10,7 @@ import (
basemodel "github.com/SigNoz/signoz/pkg/query-service/model"
baserules "github.com/SigNoz/signoz/pkg/query-service/rules"
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/google/uuid"
"go.uber.org/zap"
@@ -47,7 +47,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
rules = append(rules, tr)
// create ch rule task for evalution
// create ch rule task for evaluation
task = newTask(baserules.TaskTypeCh, opts.TaskName, time.Duration(evaluation.GetFrequency()), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
} else if opts.Rule.RuleType == ruletypes.RuleTypeProm {
@@ -71,7 +71,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
rules = append(rules, pr)
// create promql rule task for evalution
// create promql rule task for evaluation
task = newTask(baserules.TaskTypeProm, opts.TaskName, time.Duration(evaluation.GetFrequency()), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
} else if opts.Rule.RuleType == ruletypes.RuleTypeAnomaly {
@@ -95,7 +95,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
rules = append(rules, ar)
// create anomaly rule task for evalution
// create anomaly rule task for evaluation
task = newTask(baserules.TaskTypeCh, opts.TaskName, time.Duration(evaluation.GetFrequency()), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
} else {
@@ -203,16 +203,12 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
// set timestamp to current utc time
ts := time.Now().UTC()
count, err := rule.Eval(ctx, ts)
alertsFound, err := rule.Eval(ctx, ts)
if err != nil {
zap.L().Error("evaluating rule failed", zap.String("rule", rule.Name()), zap.Error(err))
return 0, basemodel.InternalError(fmt.Errorf("rule evaluation failed"))
}
alertsFound, ok := count.(int)
if !ok {
return 0, basemodel.InternalError(fmt.Errorf("something went wrong"))
}
rule.SendAlerts(ctx, ts, 0, time.Duration(1*time.Minute), opts.NotifyFunc)
rule.SendAlerts(ctx, ts, 0, time.Minute, opts.NotifyFunc)
return alertsFound, nil
}

View File

@@ -1,6 +1,10 @@
package prometheus
import "github.com/SigNoz/signoz/pkg/factory"
import (
"time"
"github.com/SigNoz/signoz/pkg/factory"
)
type ActiveQueryTrackerConfig struct {
Enabled bool `mapstructure:"enabled"`
@@ -10,6 +14,12 @@ type ActiveQueryTrackerConfig struct {
type Config struct {
ActiveQueryTrackerConfig ActiveQueryTrackerConfig `mapstructure:"active_query_tracker"`
// LookbackDelta determines the time since the last sample after which a time
// series is considered stale.
//
// If not set, the prometheus default is used (currently 5m).
LookbackDelta time.Duration `mapstructure:"lookback_delta"`
}
func NewConfigFactory() factory.ConfigFactory {

View File

@@ -20,8 +20,9 @@ func NewEngine(logger *slog.Logger, cfg Config) *Engine {
return promql.NewEngine(promql.EngineOpts{
Logger: logger,
Reg: nil,
MaxSamples: 50000000,
Timeout: time.Duration(2 * time.Minute),
MaxSamples: 5_0000_000,
Timeout: 2 * time.Minute,
ActiveQueryTracker: activeQueryTracker,
LookbackDelta: cfg.LookbackDelta,
})
}

View File

@@ -4,7 +4,6 @@ import (
"context"
"fmt"
"log/slog"
"net/url"
"sync"
"time"
@@ -16,7 +15,7 @@ import (
"github.com/SigNoz/signoz/pkg/queryparser"
"github.com/SigNoz/signoz/pkg/sqlstore"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
"go.uber.org/zap"
@@ -37,7 +36,7 @@ type BaseRule struct {
Threshold ruletypes.RuleThreshold
// evalWindow is the time window used for evaluating the rule
// i.e each time we lookback from the current time, we look at data for the last
// i.e. each time we lookback from the current time, we look at data for the last
// evalWindow duration
evalWindow time.Duration
// holdDuration is the duration for which the alert waits before firing
@@ -64,28 +63,26 @@ type BaseRule struct {
lastError error
Active map[uint64]*ruletypes.Alert
// lastTimestampWithDatapoints is the timestamp of the last datapoint we observed
// for this rule
// this is used for missing data alerts
// lastTimestampWithDatapoints is the timestamp of the last datapoint we
// observed for this rule.
// This is used for missing data alerts.
lastTimestampWithDatapoints time.Time
reader interfaces.Reader
logger *slog.Logger
// sendUnmatched sends observed metric values
// even if they dont match the rule condition. this is
// useful in testing the rule
// sendUnmatched sends observed metric values even if they don't match the
// rule condition. This is useful in testing the rule.
sendUnmatched bool
// sendAlways will send alert irresepective of resendDelay
// or other params
// sendAlways will send alert irrespective of resendDelay or other params
sendAlways bool
// TemporalityMap is a map of metric name to temporality
// to avoid fetching temporality for the same metric multiple times
// querying the v4 table on low cardinal temporality column
// should be fast but we can still avoid the query if we have the data in memory
// TemporalityMap is a map of metric name to temporality to avoid fetching
// temporality for the same metric multiple times.
// Querying the v4 table on low cardinal temporality column should be fast,
// but we can still avoid the query if we have the data in memory.
TemporalityMap map[string]map[v3.Temporality]bool
sqlstore sqlstore.SQLStore
@@ -258,17 +255,6 @@ func (r *BaseRule) HoldDuration() time.Duration {
return r.holdDuration
}
func (r *ThresholdRule) hostFromSource() string {
parsedUrl, err := url.Parse(r.source)
if err != nil {
return ""
}
if parsedUrl.Port() != "" {
return fmt.Sprintf("%s://%s:%s", parsedUrl.Scheme, parsedUrl.Hostname(), parsedUrl.Port())
}
return fmt.Sprintf("%s://%s", parsedUrl.Scheme, parsedUrl.Hostname())
}
func (r *BaseRule) ID() string { return r.id }
func (r *BaseRule) OrgID() valuer.UUID { return r.orgID }
func (r *BaseRule) Name() string { return r.name }
@@ -289,14 +275,13 @@ func (r *BaseRule) Unit() string {
}
func (r *BaseRule) Timestamps(ts time.Time) (time.Time, time.Time) {
st, en := r.evaluation.NextWindowFor(ts)
start := st.UnixMilli()
end := en.UnixMilli()
if r.evalDelay > 0 {
start = start - int64(r.evalDelay.Milliseconds())
end = end - int64(r.evalDelay.Milliseconds())
start = start - r.evalDelay.Milliseconds()
end = end - r.evalDelay.Milliseconds()
}
// round to minute otherwise we could potentially miss data
start = start - (start % (60 * 1000))
@@ -509,7 +494,6 @@ func (r *BaseRule) RecordRuleStateHistory(ctx context.Context, prevState, curren
}
func (r *BaseRule) PopulateTemporality(ctx context.Context, orgID valuer.UUID, qp *v3.QueryRangeParamsV3) error {
missingTemporality := make([]string, 0)
metricNameToTemporality := make(map[string]map[v3.Temporality]bool)
if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 {

View File

@@ -26,79 +26,6 @@ import (
"github.com/SigNoz/signoz/pkg/valuer"
)
func TestBaseRule_RequireMinPoints(t *testing.T) {
threshold := 1.0
tests := []struct {
name string
rule *BaseRule
shouldAlert bool
series *v3.Series
}{
{
name: "test should skip if less than min points",
rule: &BaseRule{
ruleCondition: &ruletypes.RuleCondition{
RequireMinPoints: true,
RequiredNumPoints: 4,
},
Threshold: ruletypes.BasicRuleThresholds{
{
Name: "test-threshold",
TargetValue: &threshold,
CompareOp: ruletypes.ValueIsAbove,
MatchType: ruletypes.AtleastOnce,
},
},
},
series: &v3.Series{
Points: []v3.Point{
{Value: 1},
{Value: 2},
},
},
shouldAlert: false,
},
{
name: "test should alert if more than min points",
rule: &BaseRule{
ruleCondition: &ruletypes.RuleCondition{
RequireMinPoints: true,
RequiredNumPoints: 4,
CompareOp: ruletypes.ValueIsAbove,
MatchType: ruletypes.AtleastOnce,
Target: &threshold,
},
Threshold: ruletypes.BasicRuleThresholds{
{
Name: "test-threshold",
TargetValue: &threshold,
CompareOp: ruletypes.ValueIsAbove,
MatchType: ruletypes.AtleastOnce,
},
},
},
series: &v3.Series{
Points: []v3.Point{
{Value: 1},
{Value: 2},
{Value: 3},
{Value: 4},
},
},
shouldAlert: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_, err := test.rule.Threshold.Eval(*test.series, "", ruletypes.EvalData{})
require.NoError(t, err)
require.Equal(t, len(test.series.Points) >= test.rule.ruleCondition.RequiredNumPoints, test.shouldAlert)
})
}
}
// createTestSeries creates a *v3.Series with the given labels and optional points
// so we don't exactly need the points in the series because the labels are used to determine if the series is new or old
// we use the labels to create a lookup key for the series and then check the first_seen timestamp for the series in the metadata table

View File

@@ -37,7 +37,6 @@ func NewPromRule(
prometheus prometheus.Prometheus,
opts ...RuleOption,
) (*PromRule, error) {
opts = append(opts, WithLogger(logger))
baseRule, err := NewBaseRule(id, orgID, postableRule, reader, opts...)
@@ -53,7 +52,6 @@ func NewPromRule(
p.logger = logger
query, err := p.getPqlQuery()
if err != nil {
// can not generate a valid prom QL query
return nil, err
@@ -83,7 +81,6 @@ func (r *PromRule) GetSelectedQuery() string {
}
func (r *PromRule) getPqlQuery() (string, error) {
if r.version == "v5" {
if len(r.ruleCondition.CompositeQuery.Queries) > 0 {
selectedQuery := r.GetSelectedQuery()
@@ -158,9 +155,16 @@ func (r *PromRule) buildAndRunQuery(ctx context.Context, ts time.Time) (ruletype
var resultVector ruletypes.Vector
for _, series := range matrixToProcess {
if !r.Condition().ShouldEval(series) {
r.logger.InfoContext(
ctx, "not enough data points to evaluate series, skipping",
"rule_id", r.ID(), "num_points", len(series.Points), "required_points", r.Condition().RequiredNumPoints,
)
continue
}
resultSeries, err := r.Threshold.Eval(*series, r.Unit(), ruletypes.EvalData{
ActiveAlerts: r.ActiveAlertsLabelFP(),
SendUnmatched: r.ShouldSendUnmatched(),
SendUnmatched: r.ShouldSendUnmatched(),
})
if err != nil {
return nil, err
@@ -170,14 +174,14 @@ func (r *PromRule) buildAndRunQuery(ctx context.Context, ts time.Time) (ruletype
return resultVector, nil
}
func (r *PromRule) Eval(ctx context.Context, ts time.Time) (interface{}, error) {
func (r *PromRule) Eval(ctx context.Context, ts time.Time) (int, error) {
prevState := r.State()
valueFormatter := formatter.FromUnit(r.Unit())
// prepare query, run query get data and filter the data based on the threshold
results, err := r.buildAndRunQuery(ctx, ts)
if err != nil {
return nil, err
return 0, err
}
r.mtx.Lock()
@@ -185,7 +189,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (interface{}, error)
resultFPs := map[uint64]struct{}{}
var alerts = make(map[uint64]*ruletypes.Alert, len(results))
alerts := make(map[uint64]*ruletypes.Alert, len(results))
ruleReceivers := r.Threshold.GetRuleReceivers()
ruleReceiverMap := make(map[string][]string)
@@ -208,7 +212,6 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (interface{}, error)
defs := "{{$labels := .Labels}}{{$value := .Value}}{{$threshold := .Threshold}}"
expand := func(text string) string {
tmpl := ruletypes.NewTemplateExpander(
ctx,
defs+text,
@@ -251,7 +254,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (interface{}, error)
// SetLastError will deadlock.
r.health = ruletypes.HealthBad
r.lastError = err
return nil, err
return 0, err
}
alerts[h] = &ruletypes.Alert{
Labels: lbs,
@@ -378,7 +381,6 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (interface{}, error)
}
func (r *PromRule) String() string {
ar := ruletypes.PostableRule{
AlertName: r.name,
RuleCondition: r.ruleCondition,

View File

@@ -6,6 +6,12 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
pql "github.com/prometheus/prometheus/promql"
cmock "github.com/srikanthccv/ClickHouse-go-mock"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/prometheus/prometheustest"
@@ -14,11 +20,8 @@ import (
qslabels "github.com/SigNoz/signoz/pkg/query-service/utils/labels"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/valuer"
pql "github.com/prometheus/prometheus/promql"
cmock "github.com/srikanthccv/ClickHouse-go-mock"
"github.com/stretchr/testify/assert"
)
func getVectorValues(vectors []ruletypes.Sample) []float64 {
@@ -961,7 +964,7 @@ func TestPromRuleUnitCombinations(t *testing.T) {
}
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
reader := clickhouseReader.NewReader(nil, telemetryStore, promProvider, "", time.Duration(time.Second), nil, nil, options)
reader := clickhouseReader.NewReader(nil, telemetryStore, promProvider, "", time.Second, nil, nil, options)
rule, err := NewPromRule("69", valuer.GenerateUUID(), &postableRule, logger, reader, promProvider)
if err != nil {
assert.NoError(t, err)
@@ -969,14 +972,14 @@ func TestPromRuleUnitCombinations(t *testing.T) {
continue
}
retVal, err := rule.Eval(context.Background(), evalTime)
alertsFound, err := rule.Eval(context.Background(), evalTime)
if err != nil {
assert.NoError(t, err)
promProvider.Close()
continue
}
assert.Equal(t, c.expectAlerts, retVal.(int), "case %d", idx)
assert.Equal(t, c.expectAlerts, alertsFound, "case %d", idx)
if c.expectAlerts != 0 {
foundCount := 0
for _, item := range rule.Active {
@@ -1077,7 +1080,7 @@ func _Enable_this_after_9146_issue_fix_is_merged_TestPromRuleNoData(t *testing.T
}
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
reader := clickhouseReader.NewReader(nil, telemetryStore, promProvider, "", time.Duration(time.Second), nil, nil, options)
reader := clickhouseReader.NewReader(nil, telemetryStore, promProvider, "", time.Second, nil, nil, options)
rule, err := NewPromRule("69", valuer.GenerateUUID(), &postableRule, logger, reader, promProvider)
if err != nil {
assert.NoError(t, err)
@@ -1085,14 +1088,14 @@ func _Enable_this_after_9146_issue_fix_is_merged_TestPromRuleNoData(t *testing.T
continue
}
retVal, err := rule.Eval(context.Background(), evalTime)
alertsFound, err := rule.Eval(context.Background(), evalTime)
if err != nil {
assert.NoError(t, err)
promProvider.Close()
continue
}
assert.Equal(t, 1, retVal.(int), "case %d", idx)
assert.Equal(t, 1, alertsFound, "case %d", idx)
for _, item := range rule.Active {
if c.expectNoData {
assert.True(t, strings.Contains(item.Labels.Get(qslabels.AlertNameLabel), "[No data]"), "case %d", idx)
@@ -1309,7 +1312,7 @@ func TestMultipleThresholdPromRule(t *testing.T) {
}
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
reader := clickhouseReader.NewReader(nil, telemetryStore, promProvider, "", time.Duration(time.Second), nil, nil, options)
reader := clickhouseReader.NewReader(nil, telemetryStore, promProvider, "", time.Second, nil, nil, options)
rule, err := NewPromRule("69", valuer.GenerateUUID(), &postableRule, logger, reader, promProvider)
if err != nil {
assert.NoError(t, err)
@@ -1317,14 +1320,14 @@ func TestMultipleThresholdPromRule(t *testing.T) {
continue
}
retVal, err := rule.Eval(context.Background(), evalTime)
alertsFound, err := rule.Eval(context.Background(), evalTime)
if err != nil {
assert.NoError(t, err)
promProvider.Close()
continue
}
assert.Equal(t, c.expectAlerts, retVal.(int), "case %d", idx)
assert.Equal(t, c.expectAlerts, alertsFound, "case %d", idx)
if c.expectAlerts != 0 {
foundCount := 0
for _, item := range rule.Active {
@@ -1341,3 +1344,156 @@ func TestMultipleThresholdPromRule(t *testing.T) {
promProvider.Close()
}
}
func TestPromRuleEval_RequireMinPoints(t *testing.T) {
// fixed base time for deterministic tests
baseTime := time.Unix(1700000000, 0)
evalTime := baseTime.Add(5 * time.Minute)
evalWindow := 5 * time.Minute
lookBackDelta := time.Minute
postableRule := ruletypes.PostableRule{
AlertName: "Unit test",
AlertType: ruletypes.AlertTypeMetric,
RuleType: ruletypes.RuleTypeProm,
Evaluation: &ruletypes.EvaluationEnvelope{Kind: ruletypes.RollingEvaluation, Spec: ruletypes.RollingWindow{
EvalWindow: ruletypes.Duration(evalWindow),
Frequency: ruletypes.Duration(time.Minute),
}},
RuleCondition: &ruletypes.RuleCondition{
CompareOp: ruletypes.ValueIsAbove,
MatchType: ruletypes.AtleastOnce,
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypePromQL,
PromQueries: map[string]*v3.PromQuery{
"A": {Query: "test_metric"},
},
},
},
}
fingerprintCols := []cmock.ColumnType{
{Name: "fingerprint", Type: "UInt64"},
{Name: "any(labels)", Type: "String"},
}
fingerprint := uint64(12345)
fingerprintData := [][]any{{fingerprint, `{"__name__":"test_metric"}`}}
samplesCols := []cmock.ColumnType{
{Name: "metric_name", Type: "String"},
{Name: "fingerprint", Type: "UInt64"},
{Name: "unix_milli", Type: "Int64"},
{Name: "value", Type: "Float64"},
{Name: "flags", Type: "UInt32"},
}
samplesData := [][]any{
{"test_metric", fingerprint, baseTime.UnixMilli(), 100.0, 0},
{"test_metric", fingerprint, baseTime.Add(time.Minute).UnixMilli(), 150.0, 0},
{"test_metric", fingerprint, baseTime.Add(2 * time.Minute).UnixMilli(), 250.0, 0},
}
targetForAlert := 200.0
targetForNoAlert := 500.0
// see Timestamps on base_rule
evalTimeMs := evalTime.UnixMilli()
queryStart := ((evalTimeMs-evalWindow.Milliseconds()-lookBackDelta.Milliseconds())/60000)*60000 + 1 // truncate to minute + 1ms
queryEnd := (evalTimeMs / 60000) * 60000 // truncate to minute
cases := []struct {
description string
alertCondition bool
requireMinPoints bool
requiredNumPoints int
expectAlerts int
}{
{
description: "AlertCondition=false, RequireMinPoints=false",
alertCondition: false,
requireMinPoints: false,
expectAlerts: 0,
},
{
description: "AlertCondition=true, RequireMinPoints=false",
alertCondition: true,
requireMinPoints: false,
expectAlerts: 1,
},
{
description: "AlertCondition=true, RequireMinPoints=true, NumPoints=more_than_required",
alertCondition: true,
requireMinPoints: true,
requiredNumPoints: 2,
expectAlerts: 1,
},
{
description: "AlertCondition=true, RequireMinPoints=true, NumPoints=same_as_required",
alertCondition: true,
requireMinPoints: true,
requiredNumPoints: 3,
expectAlerts: 1,
},
{
description: "AlertCondition=true, RequireMinPoints=true, NumPoints=insufficient",
alertCondition: true,
requireMinPoints: true,
requiredNumPoints: 4,
expectAlerts: 0,
},
}
logger := instrumentationtest.New().Logger()
for _, c := range cases {
rc := postableRule.RuleCondition
rc.Target = &targetForNoAlert
if c.alertCondition {
rc.Target = &targetForAlert
}
rc.RequireMinPoints = c.requireMinPoints
rc.RequiredNumPoints = c.requiredNumPoints
rc.Thresholds = &ruletypes.RuleThresholdData{
Kind: ruletypes.BasicThresholdKind,
Spec: ruletypes.BasicRuleThresholds{
{
Name: postableRule.AlertName,
TargetValue: rc.Target,
MatchType: rc.MatchType,
CompareOp: rc.CompareOp,
},
},
}
t.Run(c.description, func(t *testing.T) {
telemetryStore := telemetrystoretest.New(telemetrystore.Config{}, &queryMatcherAny{})
telemetryStore.Mock().
ExpectQuery("SELECT fingerprint, any").
WithArgs("test_metric", "__name__", "test_metric").
WillReturnRows(cmock.NewRows(fingerprintCols, fingerprintData))
telemetryStore.Mock().
ExpectQuery("SELECT metric_name, fingerprint, unix_milli").
WithArgs("test_metric", "test_metric", "__name__", "test_metric", queryStart, queryEnd).
WillReturnRows(cmock.NewRows(samplesCols, samplesData))
promProvider := prometheustest.New(
context.Background(),
instrumentationtest.New().ToProviderSettings(),
prometheus.Config{LookbackDelta: lookBackDelta},
telemetryStore,
)
defer func() {
_ = promProvider.Close()
}()
options := clickhouseReader.NewOptions("primaryNamespace")
reader := clickhouseReader.NewReader(nil, telemetryStore, promProvider, "", time.Second, nil, nil, options)
rule, err := NewPromRule("some-id", valuer.GenerateUUID(), &postableRule, logger, reader, promProvider)
require.NoError(t, err)
alertsFound, err := rule.Eval(context.Background(), evalTime)
require.NoError(t, err)
assert.Equal(t, c.expectAlerts, alertsFound)
})
}
}

View File

@@ -29,7 +29,8 @@ type Rule interface {
PreferredChannels() []string
Eval(context.Context, time.Time) (interface{}, error)
// Eval evaluates the rule at the given timestamp and returns the number of active alerts.
Eval(context.Context, time.Time) (int, error)
String() string
SetLastError(error)
LastError() error

View File

@@ -89,15 +89,11 @@ func defaultTestNotification(opts PrepareTestRuleOptions) (int, *model.ApiError)
// set timestamp to current utc time
ts := time.Now().UTC()
count, err := rule.Eval(ctx, ts)
alertsFound, err := rule.Eval(ctx, ts)
if err != nil {
zap.L().Error("evaluating rule failed", zap.String("rule", rule.Name()), zap.Error(err))
return 0, model.InternalError(fmt.Errorf("rule evaluation failed"))
}
alertsFound, ok := count.(int)
if !ok {
return 0, model.InternalError(fmt.Errorf("something went wrong"))
}
rule.SendAlerts(ctx, ts, 0, time.Duration(1*time.Minute), opts.NotifyFunc)
return alertsFound, nil

View File

@@ -7,6 +7,7 @@ import (
"fmt"
"log/slog"
"math"
"net/url"
"reflect"
"text/template"
"time"
@@ -16,7 +17,7 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/query-service/postprocess"
"github.com/SigNoz/signoz/pkg/transition"
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
@@ -70,7 +71,6 @@ func NewThresholdRule(
logger *slog.Logger,
opts ...RuleOption,
) (*ThresholdRule, error) {
logger.Info("creating new ThresholdRule", "id", id)
opts = append(opts, WithLogger(logger))
@@ -104,12 +104,22 @@ func NewThresholdRule(
return &t, nil
}
func (r *ThresholdRule) hostFromSource() string {
parsedUrl, err := url.Parse(r.source)
if err != nil {
return ""
}
if parsedUrl.Port() != "" {
return fmt.Sprintf("%s://%s:%s", parsedUrl.Scheme, parsedUrl.Hostname(), parsedUrl.Port())
}
return fmt.Sprintf("%s://%s", parsedUrl.Scheme, parsedUrl.Hostname())
}
func (r *ThresholdRule) Type() ruletypes.RuleType {
return ruletypes.RuleTypeThreshold
}
func (r *ThresholdRule) prepareQueryRange(ctx context.Context, ts time.Time) (*v3.QueryRangeParamsV3, error) {
r.logger.InfoContext(
ctx, "prepare query range request v4", "ts", ts.UnixMilli(), "eval_window", r.evalWindow.Milliseconds(), "eval_delay", r.evalDelay.Milliseconds(),
)
@@ -130,7 +140,7 @@ func (r *ThresholdRule) prepareQueryRange(ctx context.Context, ts time.Time) (*v
PromQueries: make(map[string]*v3.PromQuery),
Unit: r.ruleCondition.CompositeQuery.Unit,
},
Variables: make(map[string]interface{}, 0),
Variables: make(map[string]interface{}),
NoCache: true,
}
querytemplate.AssignReservedVarsV3(params)
@@ -188,13 +198,12 @@ func (r *ThresholdRule) prepareQueryRange(ctx context.Context, ts time.Time) (*v
End: end,
Step: int64(math.Max(float64(common.MinAllowedStepInterval(start, end)), 60)),
CompositeQuery: r.ruleCondition.CompositeQuery,
Variables: make(map[string]interface{}, 0),
Variables: make(map[string]interface{}),
NoCache: true,
}, nil
}
func (r *ThresholdRule) prepareLinksToLogs(ctx context.Context, ts time.Time, lbls labels.Labels) string {
if r.version == "v5" {
return r.prepareLinksToLogsV5(ctx, ts, lbls)
}
@@ -233,7 +242,6 @@ func (r *ThresholdRule) prepareLinksToLogs(ctx context.Context, ts time.Time, lb
}
func (r *ThresholdRule) prepareLinksToTraces(ctx context.Context, ts time.Time, lbls labels.Labels) string {
if r.version == "v5" {
return r.prepareLinksToTracesV5(ctx, ts, lbls)
}
@@ -272,7 +280,6 @@ func (r *ThresholdRule) prepareLinksToTraces(ctx context.Context, ts time.Time,
}
func (r *ThresholdRule) prepareQueryRangeV5(ctx context.Context, ts time.Time) (*qbtypes.QueryRangeRequest, error) {
r.logger.InfoContext(
ctx, "prepare query range request v5", "ts", ts.UnixMilli(), "eval_window", r.evalWindow.Milliseconds(), "eval_delay", r.evalDelay.Milliseconds(),
)
@@ -379,7 +386,6 @@ func (r *ThresholdRule) GetSelectedQuery() string {
}
func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID, ts time.Time) (ruletypes.Vector, error) {
params, err := r.prepareQueryRange(ctx, ts)
if err != nil {
return nil, err
@@ -482,11 +488,9 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID,
}
for _, series := range queryResult.Series {
if r.Condition() != nil && r.Condition().RequireMinPoints {
if len(series.Points) < r.ruleCondition.RequiredNumPoints {
r.logger.InfoContext(ctx, "not enough data points to evaluate series, skipping", "ruleid", r.ID(), "numPoints", len(series.Points), "requiredPoints", r.Condition().RequiredNumPoints)
continue
}
if !r.Condition().ShouldEval(series) {
r.logger.InfoContext(ctx, "not enough data points to evaluate series, skipping", "ruleid", r.ID(), "numPoints", len(series.Points), "requiredPoints", r.Condition().RequiredNumPoints)
continue
}
resultSeries, err := r.Threshold.Eval(*series, r.Unit(), ruletypes.EvalData{
ActiveAlerts: r.ActiveAlertsLabelFP(),
@@ -502,7 +506,6 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID,
}
func (r *ThresholdRule) buildAndRunQueryV5(ctx context.Context, orgID valuer.UUID, ts time.Time) (ruletypes.Vector, error) {
params, err := r.prepareQueryRangeV5(ctx, ts)
if err != nil {
return nil, err
@@ -511,7 +514,6 @@ func (r *ThresholdRule) buildAndRunQueryV5(ctx context.Context, orgID valuer.UUI
var results []*v3.Result
v5Result, err := r.querierV5.QueryRange(ctx, orgID, params)
if err != nil {
r.logger.ErrorContext(ctx, "failed to get alert query result", "rule_name", r.Name(), "error", err)
return nil, fmt.Errorf("internal error while querying")
@@ -574,11 +576,9 @@ func (r *ThresholdRule) buildAndRunQueryV5(ctx context.Context, orgID valuer.UUI
}
for _, series := range seriesToProcess {
if r.Condition() != nil && r.Condition().RequireMinPoints {
if len(series.Points) < r.Condition().RequiredNumPoints {
r.logger.InfoContext(ctx, "not enough data points to evaluate series, skipping", "ruleid", r.ID(), "numPoints", len(series.Points), "requiredPoints", r.Condition().RequiredNumPoints)
continue
}
if !r.Condition().ShouldEval(series) {
r.logger.InfoContext(ctx, "not enough data points to evaluate series, skipping", "ruleid", r.ID(), "numPoints", len(series.Points), "requiredPoints", r.Condition().RequiredNumPoints)
continue
}
resultSeries, err := r.Threshold.Eval(*series, r.Unit(), ruletypes.EvalData{
ActiveAlerts: r.ActiveAlertsLabelFP(),
@@ -593,8 +593,7 @@ func (r *ThresholdRule) buildAndRunQueryV5(ctx context.Context, orgID valuer.UUI
return resultVector, nil
}
func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (interface{}, error) {
func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (int, error) {
prevState := r.State()
valueFormatter := formatter.FromUnit(r.Unit())
@@ -611,14 +610,14 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (interface{}, er
}
if err != nil {
return nil, err
return 0, err
}
r.mtx.Lock()
defer r.mtx.Unlock()
resultFPs := map[uint64]struct{}{}
var alerts = make(map[uint64]*ruletypes.Alert, len(res))
alerts := make(map[uint64]*ruletypes.Alert, len(res))
ruleReceivers := r.Threshold.GetRuleReceivers()
ruleReceiverMap := make(map[string][]string)
@@ -633,7 +632,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (interface{}, er
}
value := valueFormatter.Format(smpl.V, r.Unit())
//todo(aniket): handle different threshold
// todo(aniket): handle different threshold
threshold := valueFormatter.Format(smpl.Target, smpl.TargetUnit)
r.logger.DebugContext(ctx, "Alert template data for rule", "rule_name", r.Name(), "formatter", valueFormatter.Name(), "value", value, "threshold", threshold)
@@ -644,7 +643,6 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (interface{}, er
// utility function to apply go template on labels and annotations
expand := func(text string) string {
tmpl := ruletypes.NewTemplateExpander(
ctx,
defs+text,
@@ -704,7 +702,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (interface{}, er
resultFPs[h] = struct{}{}
if _, ok := alerts[h]; ok {
return nil, fmt.Errorf("duplicate alert found, vector contains metrics with the same labelset after applying alert labels")
return 0, fmt.Errorf("duplicate alert found, vector contains metrics with the same labelset after applying alert labels")
}
alerts[h] = &ruletypes.Alert{
@@ -838,7 +836,6 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (interface{}, er
}
func (r *ThresholdRule) String() string {
ar := ruletypes.PostableRule{
AlertName: r.name,
RuleCondition: r.ruleCondition,
@@ -855,13 +852,3 @@ func (r *ThresholdRule) String() string {
return string(byt)
}
func removeGroupinSetPoints(series v3.Series) []v3.Point {
var result []v3.Point
for _, s := range series.Points {
if s.Timestamp >= 0 && !math.IsNaN(s.Value) && !math.IsInf(s.Value, 0) {
result = append(result, s)
}
}
return result
}

View File

@@ -2,32 +2,32 @@ package rules
import (
"context"
"fmt"
"math"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
cmock "github.com/srikanthccv/ClickHouse-go-mock"
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/cache/cachetest"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/prometheus/prometheustest"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
"github.com/SigNoz/signoz/pkg/query-service/common"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
cmock "github.com/srikanthccv/ClickHouse-go-mock"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
func TestThresholdRuleEvalBackwardCompat(t *testing.T) {
@@ -184,8 +184,8 @@ func TestPrepareLinksToLogs(t *testing.T) {
Spec: ruletypes.BasicRuleThresholds{
{
TargetValue: postableRule.RuleCondition.Target,
MatchType: ruletypes.MatchType(postableRule.RuleCondition.MatchType),
CompareOp: ruletypes.CompareOp(postableRule.RuleCondition.CompareOp),
MatchType: postableRule.RuleCondition.MatchType,
CompareOp: postableRule.RuleCondition.CompareOp,
},
},
}
@@ -245,8 +245,8 @@ func TestPrepareLinksToLogsV5(t *testing.T) {
Spec: ruletypes.BasicRuleThresholds{
{
TargetValue: postableRule.RuleCondition.Target,
MatchType: ruletypes.MatchType(postableRule.RuleCondition.MatchType),
CompareOp: ruletypes.CompareOp(postableRule.RuleCondition.CompareOp),
MatchType: postableRule.RuleCondition.MatchType,
CompareOp: postableRule.RuleCondition.CompareOp,
},
},
}
@@ -306,8 +306,8 @@ func TestPrepareLinksToTracesV5(t *testing.T) {
Spec: ruletypes.BasicRuleThresholds{
{
TargetValue: postableRule.RuleCondition.Target,
MatchType: ruletypes.MatchType(postableRule.RuleCondition.MatchType),
CompareOp: ruletypes.CompareOp(postableRule.RuleCondition.CompareOp),
MatchType: postableRule.RuleCondition.MatchType,
CompareOp: postableRule.RuleCondition.CompareOp,
},
},
}
@@ -360,8 +360,8 @@ func TestPrepareLinksToTraces(t *testing.T) {
Spec: ruletypes.BasicRuleThresholds{
{
TargetValue: postableRule.RuleCondition.Target,
MatchType: ruletypes.MatchType(postableRule.RuleCondition.MatchType),
CompareOp: ruletypes.CompareOp(postableRule.RuleCondition.CompareOp),
MatchType: postableRule.RuleCondition.MatchType,
CompareOp: postableRule.RuleCondition.CompareOp,
},
},
}
@@ -509,8 +509,8 @@ func TestThresholdRuleEvalDelay(t *testing.T) {
Spec: ruletypes.BasicRuleThresholds{
{
TargetValue: postableRule.RuleCondition.Target,
MatchType: ruletypes.MatchType(postableRule.RuleCondition.MatchType),
CompareOp: ruletypes.CompareOp(postableRule.RuleCondition.CompareOp),
MatchType: postableRule.RuleCondition.MatchType,
CompareOp: postableRule.RuleCondition.CompareOp,
},
},
}
@@ -572,8 +572,8 @@ func TestThresholdRuleClickHouseTmpl(t *testing.T) {
Spec: ruletypes.BasicRuleThresholds{
{
TargetValue: postableRule.RuleCondition.Target,
MatchType: ruletypes.MatchType(postableRule.RuleCondition.MatchType),
CompareOp: ruletypes.CompareOp(postableRule.RuleCondition.CompareOp),
MatchType: postableRule.RuleCondition.MatchType,
CompareOp: postableRule.RuleCondition.CompareOp,
},
},
}
@@ -789,12 +789,12 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
assert.NoError(t, err)
}
retVal, err := rule.Eval(context.Background(), time.Now())
alertsFound, err := rule.Eval(context.Background(), time.Now())
if err != nil {
assert.NoError(t, err)
}
assert.Equal(t, c.expectAlerts, retVal.(int), "case %d", idx)
assert.Equal(t, c.expectAlerts, alertsFound, "case %d", idx)
if c.expectAlerts != 0 {
foundCount := 0
for _, item := range rule.Active {
@@ -905,12 +905,12 @@ func TestThresholdRuleNoData(t *testing.T) {
assert.NoError(t, err)
}
retVal, err := rule.Eval(context.Background(), time.Now())
alertsFound, err := rule.Eval(context.Background(), time.Now())
if err != nil {
assert.NoError(t, err)
}
assert.Equal(t, 1, retVal.(int), "case %d", idx)
assert.Equal(t, 1, alertsFound, "case %d", idx)
for _, item := range rule.Active {
if c.expectNoData {
assert.True(t, strings.Contains(item.Labels.Get(labels.AlertNameLabel), "[No data]"), "case %d", idx)
@@ -1025,15 +1025,15 @@ func TestThresholdRuleTracesLink(t *testing.T) {
assert.NoError(t, err)
}
retVal, err := rule.Eval(context.Background(), time.Now())
alertsFound, err := rule.Eval(context.Background(), time.Now())
if err != nil {
assert.NoError(t, err)
}
if c.expectAlerts == 0 {
assert.Equal(t, 0, retVal.(int), "case %d", idx)
assert.Equal(t, 0, alertsFound, "case %d", idx)
} else {
assert.Equal(t, c.expectAlerts, retVal.(int), "case %d", idx)
assert.Equal(t, c.expectAlerts, alertsFound, "case %d", idx)
for _, item := range rule.Active {
for name, value := range item.Annotations.Map() {
if name == "related_traces" {
@@ -1162,15 +1162,15 @@ func TestThresholdRuleLogsLink(t *testing.T) {
assert.NoError(t, err)
}
retVal, err := rule.Eval(context.Background(), time.Now())
alertsFound, err := rule.Eval(context.Background(), time.Now())
if err != nil {
assert.NoError(t, err)
}
if c.expectAlerts == 0 {
assert.Equal(t, 0, retVal.(int), "case %d", idx)
assert.Equal(t, 0, alertsFound, "case %d", idx)
} else {
assert.Equal(t, c.expectAlerts, retVal.(int), "case %d", idx)
assert.Equal(t, c.expectAlerts, alertsFound, "case %d", idx)
for _, item := range rule.Active {
for name, value := range item.Annotations.Map() {
if name == "related_logs" {
@@ -1260,7 +1260,7 @@ func TestThresholdRuleShiftBy(t *testing.T) {
func TestMultipleThresholdRule(t *testing.T) {
postableRule := ruletypes.PostableRule{
AlertName: "Mulitple threshold test",
AlertName: "Multiple threshold test",
AlertType: ruletypes.AlertTypeMetric,
RuleType: ruletypes.RuleTypeThreshold,
Evaluation: &ruletypes.EvaluationEnvelope{Kind: ruletypes.RollingEvaluation, Spec: ruletypes.RollingWindow{
@@ -1417,7 +1417,7 @@ func TestMultipleThresholdRule(t *testing.T) {
},
)
require.NoError(t, err)
reader := clickhouseReader.NewReader(nil, telemetryStore, prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore), "", time.Duration(time.Second), nil, readerCache, options)
reader := clickhouseReader.NewReader(nil, telemetryStore, prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore), "", time.Second, nil, readerCache, options)
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, reader, nil, logger)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
"signoz_calls_total": {
@@ -1428,12 +1428,12 @@ func TestMultipleThresholdRule(t *testing.T) {
assert.NoError(t, err)
}
retVal, err := rule.Eval(context.Background(), time.Now())
alertsFound, err := rule.Eval(context.Background(), time.Now())
if err != nil {
assert.NoError(t, err)
}
assert.Equal(t, c.expectAlerts, retVal.(int), "case %d", idx)
assert.Equal(t, c.expectAlerts, alertsFound, "case %d", idx)
if c.expectAlerts != 0 {
foundCount := 0
for _, item := range rule.Active {
@@ -1478,7 +1478,6 @@ func TestThresholdRuleEval_BasicCases(t *testing.T) {
}
runEvalTests(t, postableRule, tcThresholdRuleEval)
}
func TestThresholdRuleEval_MatchPlusCompareOps(t *testing.T) {
@@ -1510,7 +1509,6 @@ func TestThresholdRuleEval_MatchPlusCompareOps(t *testing.T) {
}
runEvalTests(t, postableRule, tcThresholdRuleEvalMatchPlusCompareOps)
}
// TestThresholdRuleEval_SendUnmatchedBypassesRecovery tests the case where the sendUnmatched is true and the recovery target is met.
@@ -2061,3 +2059,181 @@ func TestThresholdRuleEval_MultiThreshold(t *testing.T) {
runMultiThresholdEvalTests(t, postableRule, tcThresholdRuleEvalMultiThreshold)
}
func TestThresholdEval_RequireMinPoints(t *testing.T) {
postableRule := ruletypes.PostableRule{
AlertName: "Unit test",
AlertType: ruletypes.AlertTypeMetric,
RuleType: ruletypes.RuleTypeThreshold,
Evaluation: &ruletypes.EvaluationEnvelope{Kind: ruletypes.RollingEvaluation, Spec: ruletypes.RollingWindow{
EvalWindow: ruletypes.Duration(5 * time.Minute),
Frequency: ruletypes.Duration(1 * time.Minute),
}},
RuleCondition: &ruletypes.RuleCondition{
CompareOp: ruletypes.ValueIsAbove,
MatchType: ruletypes.AtleastOnce,
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
StepInterval: 60,
AggregateAttribute: v3.AttributeKey{Key: "signoz_calls_total"},
AggregateOperator: v3.AggregateOperatorSumRate,
SpaceAggregation: v3.SpaceAggregationSum,
TimeAggregation: v3.TimeAggregationRate,
DataSource: v3.DataSourceMetrics,
Expression: "A",
},
},
},
},
}
cases := []struct {
description string
requireMinPoints bool
requiredNumPoints int
values [][]any
target float64
expectAlerts int
}{
{
description: "AlertCondition=false, RequireMinPoints=false",
requireMinPoints: false,
values: [][]any{
{100.0, "attr", time.Now()},
{150.0, "attr", time.Now().Add(-1 * time.Minute)},
},
target: 200,
expectAlerts: 0,
},
{
description: "AlertCondition=true, RequireMinPoints=false",
requireMinPoints: false,
values: [][]any{
{100.0, "attr", time.Now()},
{150.0, "attr", time.Now().Add(-1 * time.Minute)},
{250.0, "attr", time.Now().Add(-2 * time.Minute)},
},
target: 200,
expectAlerts: 1,
},
{
description: "AlertCondition=true, RequireMinPoints=true, NumPoints=more_than_required",
requireMinPoints: true,
requiredNumPoints: 2,
values: [][]any{
{100.0, "attr", time.Now()},
{150.0, "attr", time.Now().Add(-1 * time.Minute)},
{250.0, "attr", time.Now().Add(-2 * time.Minute)},
},
target: 200,
expectAlerts: 1,
},
{
description: "AlertCondition=true, RequireMinPoints=true, NumPoints=same_as_required",
requireMinPoints: true,
requiredNumPoints: 3,
values: [][]any{
{100.0, "attr", time.Now()},
{150.0, "attr", time.Now().Add(-1 * time.Minute)},
{250.0, "attr", time.Now().Add(-2 * time.Minute)},
},
target: 200,
expectAlerts: 1,
},
{
description: "AlertCondition=true, RequireMinPoints=true, NumPoints=insufficient",
requireMinPoints: true,
requiredNumPoints: 4,
values: [][]any{
{100.0, "attr", time.Now()},
{150.0, "attr", time.Now().Add(-1 * time.Minute)},
{250.0, "attr", time.Now().Add(-2 * time.Minute)},
},
target: 200,
expectAlerts: 0,
},
{
description: "AlertCondition=true, RequireMinPoints=true, NumPoints=zero",
requireMinPoints: true,
requiredNumPoints: 4,
values: [][]any{},
target: 200,
expectAlerts: 0,
},
}
validateMetricNameColumns := []cmock.ColumnType{
{Name: "metric_name", Type: "String"},
{Name: "toUInt8(__normalized)", Type: "UInt8"},
}
dataColumns := []cmock.ColumnType{
{Name: "value", Type: "Float64"},
{Name: "attr", Type: "String"},
{Name: "timestamp", Type: "DateTime"},
}
// TODO: handle tests for v5
for _, version := range []string{"v3", "v4"} {
postableRule.Version = version
for idx, c := range cases {
logger := instrumentationtest.New().Logger()
telemetryStore := telemetrystoretest.New(telemetrystore.Config{}, &queryMatcherAny{})
if version == "v4" {
telemetryStore.Mock().
ExpectQuery("SELECT metric_name, toUInt8(__normalized) .*").
WillReturnRows(cmock.NewRows(validateMetricNameColumns, [][]any{{"signoz_calls_total", 1}}))
}
telemetryStore.Mock().
ExpectQuery("SELECT any").
WillReturnRows(cmock.NewRows(dataColumns, c.values))
rc := postableRule.RuleCondition
rc.Target = &c.target
rc.RequireMinPoints = c.requireMinPoints
rc.RequiredNumPoints = c.requiredNumPoints
rc.Thresholds = &ruletypes.RuleThresholdData{
Kind: ruletypes.BasicThresholdKind,
Spec: ruletypes.BasicRuleThresholds{
{
Name: postableRule.AlertName,
TargetValue: &c.target,
MatchType: rc.MatchType,
CompareOp: rc.CompareOp,
},
},
}
options := clickhouseReader.NewOptions("primaryNamespace")
readerCache, err := cachetest.New(
cache.Config{
Provider: "memory",
Memory: cache.Memory{
NumCounters: 10 * 1000,
MaxCost: 1 << 26,
},
},
)
require.NoError(t, err)
prometheusProvider := prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore)
reader := clickhouseReader.NewReader(nil, telemetryStore, prometheusProvider, "", time.Second, nil, readerCache, options)
rule, err := NewThresholdRule("some-id", valuer.GenerateUUID(), &postableRule, reader, nil, logger)
t.Run(fmt.Sprintf("%d Version=%s, %s", idx, version, c.description), func(t *testing.T) {
require.NoError(t, err)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
"signoz_calls_total": {v3.Delta: true},
}
alertsFound, err := rule.Eval(context.Background(), time.Now())
require.NoError(t, err)
assert.Equal(t, c.expectAlerts, alertsFound, "case %d", idx)
})
}
}
}

View File

@@ -202,7 +202,15 @@ func (rc *RuleCondition) IsValid() bool {
return true
}
// QueryType is a short hand method to get query type
// ShouldEval checks if the further series should be evaluated at all for alerts.
func (rc *RuleCondition) ShouldEval(series *v3.Series) bool {
if rc == nil {
return true
}
return !rc.RequireMinPoints || len(series.Points) >= rc.RequiredNumPoints
}
// QueryType is a shorthand method to get query type
func (rc *RuleCondition) QueryType() v3.QueryType {
if rc.CompositeQuery != nil {
return rc.CompositeQuery.QueryType
@@ -219,10 +227,9 @@ func (rc *RuleCondition) String() string {
return string(data)
}
// prepareRuleGeneratorURL creates an appropriate url
// for the rule. the URL is sent in slack messages as well as
// to other systems and allows backtracking to the rule definition
// from the third party systems.
// PrepareRuleGeneratorURL creates an appropriate url for the rule. The URL is
// sent in Slack messages as well as to other systems and allows backtracking
// to the rule definition from the third party systems.
func PrepareRuleGeneratorURL(ruleId string, source string) string {
if source == "" {
return source

View File

@@ -164,8 +164,8 @@ func (r BasicRuleThresholds) Eval(series v3.Series, unit string, evalData EvalDa
alertHash := sampleLabels.Hash()
// check if alert is active and then check if recovery threshold matches
if evalData.HasActiveAlert(alertHash) {
smpl, matchesRecoveryThrehold := threshold.matchesRecoveryThreshold(series, unit)
if matchesRecoveryThrehold {
smpl, matchesRecoveryThreshold := threshold.matchesRecoveryThreshold(series, unit)
if matchesRecoveryThreshold {
smpl.Target = *threshold.TargetValue
smpl.RecoveryTarget = threshold.RecoveryTarget
smpl.TargetUnit = threshold.TargetUnit