Compare commits

...

3 Commits

Author SHA1 Message Date
srikanthccv
5e76de6f8d chore: resolve conflicts 2026-02-18 22:36:38 +05:30
Srikanth Chekuri
25e0c19ddb Merge branch 'main' into remove-v4-support-rules 2026-02-14 23:51:21 +05:30
srikanthccv
8a6abb2f09 chore: remove support for older versions in rules 2026-02-14 23:48:09 +05:30
19 changed files with 1104 additions and 1964 deletions

View File

@@ -5,23 +5,16 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"log/slog" "log/slog"
"math"
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/SigNoz/signoz/ee/query-service/anomaly"
"github.com/SigNoz/signoz/pkg/cache" "github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/query-service/common"
"github.com/SigNoz/signoz/pkg/query-service/model" "github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/transition"
"github.com/SigNoz/signoz/pkg/types/ruletypes" "github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/valuer" "github.com/SigNoz/signoz/pkg/valuer"
querierV2 "github.com/SigNoz/signoz/pkg/query-service/app/querier/v2"
"github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder"
"github.com/SigNoz/signoz/pkg/query-service/interfaces" "github.com/SigNoz/signoz/pkg/query-service/interfaces"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/utils/labels" "github.com/SigNoz/signoz/pkg/query-service/utils/labels"
"github.com/SigNoz/signoz/pkg/query-service/utils/times" "github.com/SigNoz/signoz/pkg/query-service/utils/times"
"github.com/SigNoz/signoz/pkg/query-service/utils/timestamp" "github.com/SigNoz/signoz/pkg/query-service/utils/timestamp"
@@ -32,6 +25,8 @@ import (
querierV5 "github.com/SigNoz/signoz/pkg/querier" querierV5 "github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/ee/query-service/anomaly"
anomalyV2 "github.com/SigNoz/signoz/ee/anomaly" anomalyV2 "github.com/SigNoz/signoz/ee/anomaly"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
@@ -48,17 +43,12 @@ type AnomalyRule struct {
reader interfaces.Reader reader interfaces.Reader
// querierV2 is used for alerts created after the introduction of new metrics query builder // querierV5 is the query builder v5 querier used for all alert rule evaluation
querierV2 interfaces.Querier
// querierV5 is used for alerts migrated after the introduction of new query builder
querierV5 querierV5.Querier querierV5 querierV5.Querier
provider anomaly.Provider
providerV2 anomalyV2.Provider providerV2 anomalyV2.Provider
version string logger *slog.Logger
logger *slog.Logger
seasonality anomaly.Seasonality seasonality anomaly.Seasonality
} }
@@ -102,34 +92,6 @@ func NewAnomalyRule(
logger.Info("using seasonality", "seasonality", t.seasonality.String()) logger.Info("using seasonality", "seasonality", t.seasonality.String())
querierOptsV2 := querierV2.QuerierOptions{
Reader: reader,
Cache: cache,
KeyGenerator: queryBuilder.NewKeyGenerator(),
}
t.querierV2 = querierV2.NewQuerier(querierOptsV2)
t.reader = reader
if t.seasonality == anomaly.SeasonalityHourly {
t.provider = anomaly.NewHourlyProvider(
anomaly.WithCache[*anomaly.HourlyProvider](cache),
anomaly.WithKeyGenerator[*anomaly.HourlyProvider](queryBuilder.NewKeyGenerator()),
anomaly.WithReader[*anomaly.HourlyProvider](reader),
)
} else if t.seasonality == anomaly.SeasonalityDaily {
t.provider = anomaly.NewDailyProvider(
anomaly.WithCache[*anomaly.DailyProvider](cache),
anomaly.WithKeyGenerator[*anomaly.DailyProvider](queryBuilder.NewKeyGenerator()),
anomaly.WithReader[*anomaly.DailyProvider](reader),
)
} else if t.seasonality == anomaly.SeasonalityWeekly {
t.provider = anomaly.NewWeeklyProvider(
anomaly.WithCache[*anomaly.WeeklyProvider](cache),
anomaly.WithKeyGenerator[*anomaly.WeeklyProvider](queryBuilder.NewKeyGenerator()),
anomaly.WithReader[*anomaly.WeeklyProvider](reader),
)
}
if t.seasonality == anomaly.SeasonalityHourly { if t.seasonality == anomaly.SeasonalityHourly {
t.providerV2 = anomalyV2.NewHourlyProvider( t.providerV2 = anomalyV2.NewHourlyProvider(
anomalyV2.WithQuerier[*anomalyV2.HourlyProvider](querierV5), anomalyV2.WithQuerier[*anomalyV2.HourlyProvider](querierV5),
@@ -148,7 +110,7 @@ func NewAnomalyRule(
} }
t.querierV5 = querierV5 t.querierV5 = querierV5
t.version = p.Version t.reader = reader
t.logger = logger t.logger = logger
return &t, nil return &t, nil
} }
@@ -157,36 +119,9 @@ func (r *AnomalyRule) Type() ruletypes.RuleType {
return RuleTypeAnomaly return RuleTypeAnomaly
} }
func (r *AnomalyRule) prepareQueryRange(ctx context.Context, ts time.Time) (*v3.QueryRangeParamsV3, error) { func (r *AnomalyRule) prepareQueryRange(ctx context.Context, ts time.Time) (*qbtypes.QueryRangeRequest, error) {
r.logger.InfoContext( r.logger.InfoContext(ctx, "prepare query range request", "ts", ts.UnixMilli(), "eval_window", r.EvalWindow().Milliseconds(), "eval_delay", r.EvalDelay().Milliseconds())
ctx, "prepare query range request v4", "ts", ts.UnixMilli(), "eval_window", r.EvalWindow().Milliseconds(), "eval_delay", r.EvalDelay().Milliseconds(),
)
st, en := r.Timestamps(ts)
start := st.UnixMilli()
end := en.UnixMilli()
compositeQuery := r.Condition().CompositeQuery
if compositeQuery.PanelType != v3.PanelTypeGraph {
compositeQuery.PanelType = v3.PanelTypeGraph
}
// default mode
return &v3.QueryRangeParamsV3{
Start: start,
End: end,
Step: int64(math.Max(float64(common.MinAllowedStepInterval(start, end)), 60)),
CompositeQuery: compositeQuery,
Variables: make(map[string]interface{}, 0),
NoCache: false,
}, nil
}
func (r *AnomalyRule) prepareQueryRangeV5(ctx context.Context, ts time.Time) (*qbtypes.QueryRangeRequest, error) {
r.logger.InfoContext(ctx, "prepare query range request v5", "ts", ts.UnixMilli(), "eval_window", r.EvalWindow().Milliseconds(), "eval_delay", r.EvalDelay().Milliseconds())
startTs, endTs := r.Timestamps(ts) startTs, endTs := r.Timestamps(ts)
start, end := startTs.UnixMilli(), endTs.UnixMilli() start, end := startTs.UnixMilli(), endTs.UnixMilli()
@@ -215,60 +150,6 @@ func (r *AnomalyRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID, t
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = r.PopulateTemporality(ctx, orgID, params)
if err != nil {
return nil, fmt.Errorf("internal error while setting temporality")
}
anomalies, err := r.provider.GetAnomalies(ctx, orgID, &anomaly.GetAnomaliesRequest{
Params: params,
Seasonality: r.seasonality,
})
if err != nil {
return nil, err
}
var queryResult *v3.Result
for _, result := range anomalies.Results {
if result.QueryName == r.GetSelectedQuery() {
queryResult = result
break
}
}
hasData := len(queryResult.AnomalyScores) > 0
if missingDataAlert := r.HandleMissingDataAlert(ctx, ts, hasData); missingDataAlert != nil {
return ruletypes.Vector{*missingDataAlert}, nil
}
var resultVector ruletypes.Vector
scoresJSON, _ := json.Marshal(queryResult.AnomalyScores)
r.logger.InfoContext(ctx, "anomaly scores", "scores", string(scoresJSON))
for _, series := range queryResult.AnomalyScores {
if !r.Condition().ShouldEval(series) {
r.logger.InfoContext(ctx, "not enough data points to evaluate series, skipping", "ruleid", r.ID(), "numPoints", len(series.Points), "requiredPoints", r.Condition().RequiredNumPoints)
continue
}
results, err := r.Threshold.Eval(*series, r.Unit(), ruletypes.EvalData{
ActiveAlerts: r.ActiveAlertsLabelFP(),
SendUnmatched: r.ShouldSendUnmatched(),
})
if err != nil {
return nil, err
}
resultVector = append(resultVector, results...)
}
return resultVector, nil
}
func (r *AnomalyRule) buildAndRunQueryV5(ctx context.Context, orgID valuer.UUID, ts time.Time) (ruletypes.Vector, error) {
params, err := r.prepareQueryRangeV5(ctx, ts)
if err != nil {
return nil, err
}
anomalies, err := r.providerV2.GetAnomalies(ctx, orgID, &anomalyV2.AnomaliesRequest{ anomalies, err := r.providerV2.GetAnomalies(ctx, orgID, &anomalyV2.AnomaliesRequest{
Params: *params, Params: *params,
@@ -290,20 +171,25 @@ func (r *AnomalyRule) buildAndRunQueryV5(ctx context.Context, orgID valuer.UUID,
r.logger.WarnContext(ctx, "nil qb result", "ts", ts.UnixMilli()) r.logger.WarnContext(ctx, "nil qb result", "ts", ts.UnixMilli())
} }
queryResult := transition.ConvertV5TimeSeriesDataToV4Result(qbResult) var anomalyScores []*qbtypes.TimeSeries
if qbResult != nil {
for _, bucket := range qbResult.Aggregations {
anomalyScores = append(anomalyScores, bucket.AnomalyScores...)
}
}
hasData := len(queryResult.AnomalyScores) > 0 hasData := len(anomalyScores) > 0
if missingDataAlert := r.HandleMissingDataAlert(ctx, ts, hasData); missingDataAlert != nil { if missingDataAlert := r.HandleMissingDataAlert(ctx, ts, hasData); missingDataAlert != nil {
return ruletypes.Vector{*missingDataAlert}, nil return ruletypes.Vector{*missingDataAlert}, nil
} }
var resultVector ruletypes.Vector var resultVector ruletypes.Vector
scoresJSON, _ := json.Marshal(queryResult.AnomalyScores) scoresJSON, _ := json.Marshal(anomalyScores)
r.logger.InfoContext(ctx, "anomaly scores", "scores", string(scoresJSON)) r.logger.InfoContext(ctx, "anomaly scores", "scores", string(scoresJSON))
// Filter out new series if newGroupEvalDelay is configured // Filter out new series if newGroupEvalDelay is configured
seriesToProcess := queryResult.AnomalyScores seriesToProcess := anomalyScores
if r.ShouldSkipNewGroups() { if r.ShouldSkipNewGroups() {
filteredSeries, filterErr := r.BaseRule.FilterNewSeries(ctx, ts, seriesToProcess) filteredSeries, filterErr := r.BaseRule.FilterNewSeries(ctx, ts, seriesToProcess)
// In case of error we log the error and continue with the original series // In case of error we log the error and continue with the original series
@@ -316,7 +202,7 @@ func (r *AnomalyRule) buildAndRunQueryV5(ctx context.Context, orgID valuer.UUID,
for _, series := range seriesToProcess { for _, series := range seriesToProcess {
if !r.Condition().ShouldEval(series) { if !r.Condition().ShouldEval(series) {
r.logger.InfoContext(ctx, "not enough data points to evaluate series, skipping", "ruleid", r.ID(), "numPoints", len(series.Points), "requiredPoints", r.Condition().RequiredNumPoints) r.logger.InfoContext(ctx, "not enough data points to evaluate series, skipping", "ruleid", r.ID(), "numPoints", len(series.Values), "requiredPoints", r.Condition().RequiredNumPoints)
continue continue
} }
results, err := r.Threshold.Eval(*series, r.Unit(), ruletypes.EvalData{ results, err := r.Threshold.Eval(*series, r.Unit(), ruletypes.EvalData{
@@ -337,16 +223,7 @@ func (r *AnomalyRule) Eval(ctx context.Context, ts time.Time) (int, error) {
valueFormatter := formatter.FromUnit(r.Unit()) valueFormatter := formatter.FromUnit(r.Unit())
var res ruletypes.Vector res, err := r.buildAndRunQuery(ctx, r.OrgID(), ts)
var err error
if r.version == "v5" {
r.logger.InfoContext(ctx, "running v5 query")
res, err = r.buildAndRunQueryV5(ctx, r.OrgID(), ts)
} else {
r.logger.InfoContext(ctx, "running v4 query")
res, err = r.buildAndRunQuery(ctx, r.OrgID(), ts)
}
if err != nil { if err != nil {
return 0, err return 0, err
} }

View File

@@ -8,28 +8,27 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/SigNoz/signoz/ee/query-service/anomaly" anomalyV2 "github.com/SigNoz/signoz/ee/anomaly"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest" "github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader" "github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/telemetrystore" "github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest" "github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/ruletypes" "github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/valuer" "github.com/SigNoz/signoz/pkg/valuer"
) )
// mockAnomalyProvider is a mock implementation of anomaly.Provider for testing. // mockAnomalyProviderV2 is a mock implementation of anomalyV2.Provider for testing.
// We need this because the anomaly provider makes 6 different queries for various type mockAnomalyProviderV2 struct {
// time periods (current, past period, current season, past season, past 2 seasons, responses []*anomalyV2.AnomaliesResponse
// past 3 seasons), making it cumbersome to create mock data.
type mockAnomalyProvider struct {
responses []*anomaly.GetAnomaliesResponse
callCount int callCount int
} }
func (m *mockAnomalyProvider) GetAnomalies(ctx context.Context, orgID valuer.UUID, req *anomaly.GetAnomaliesRequest) (*anomaly.GetAnomaliesResponse, error) { func (m *mockAnomalyProviderV2) GetAnomalies(ctx context.Context, orgID valuer.UUID, req *anomalyV2.AnomaliesRequest) (*anomalyV2.AnomaliesResponse, error) {
if m.callCount >= len(m.responses) { if m.callCount >= len(m.responses) {
return &anomaly.GetAnomaliesResponse{Results: []*v3.Result{}}, nil return &anomalyV2.AnomaliesResponse{Results: []*qbtypes.TimeSeriesData{}}, nil
} }
resp := m.responses[m.callCount] resp := m.responses[m.callCount]
m.callCount++ m.callCount++
@@ -82,11 +81,11 @@ func TestAnomalyRule_NoData_AlertOnAbsent(t *testing.T) {
}, },
} }
responseNoData := &anomaly.GetAnomaliesResponse{ responseNoData := &anomalyV2.AnomaliesResponse{
Results: []*v3.Result{ Results: []*qbtypes.TimeSeriesData{
{ {
QueryName: "A", QueryName: "A",
AnomalyScores: []*v3.Series{}, Aggregations: []*qbtypes.AggregationBucket{},
}, },
}, },
} }
@@ -129,8 +128,8 @@ func TestAnomalyRule_NoData_AlertOnAbsent(t *testing.T) {
) )
require.NoError(t, err) require.NoError(t, err)
rule.provider = &mockAnomalyProvider{ rule.providerV2 = &mockAnomalyProviderV2{
responses: []*anomaly.GetAnomaliesResponse{responseNoData}, responses: []*anomalyV2.AnomaliesResponse{responseNoData},
} }
alertsFound, err := rule.Eval(context.Background(), evalTime) alertsFound, err := rule.Eval(context.Background(), evalTime)
@@ -190,11 +189,11 @@ func TestAnomalyRule_NoData_AbsentFor(t *testing.T) {
}, },
} }
responseNoData := &anomaly.GetAnomaliesResponse{ responseNoData := &anomalyV2.AnomaliesResponse{
Results: []*v3.Result{ Results: []*qbtypes.TimeSeriesData{
{ {
QueryName: "A", QueryName: "A",
AnomalyScores: []*v3.Series{}, Aggregations: []*qbtypes.AggregationBucket{},
}, },
}, },
} }
@@ -228,16 +227,22 @@ func TestAnomalyRule_NoData_AbsentFor(t *testing.T) {
t1 := baseTime.Add(5 * time.Minute) t1 := baseTime.Add(5 * time.Minute)
t2 := t1.Add(c.timeBetweenEvals) t2 := t1.Add(c.timeBetweenEvals)
responseWithData := &anomaly.GetAnomaliesResponse{ responseWithData := &anomalyV2.AnomaliesResponse{
Results: []*v3.Result{ Results: []*qbtypes.TimeSeriesData{
{ {
QueryName: "A", QueryName: "A",
AnomalyScores: []*v3.Series{ Aggregations: []*qbtypes.AggregationBucket{
{ {
Labels: map[string]string{"test": "label"}, AnomalyScores: []*qbtypes.TimeSeries{
Points: []v3.Point{ {
{Timestamp: baseTime.UnixMilli(), Value: 1.0}, Labels: []*qbtypes.Label{
{Timestamp: baseTime.Add(time.Minute).UnixMilli(), Value: 1.5}, {Key: telemetrytypes.TelemetryFieldKey{Name: "test"}, Value: "label"},
},
Values: []*qbtypes.TimeSeriesValue{
{Timestamp: baseTime.UnixMilli(), Value: 1.0},
{Timestamp: baseTime.Add(time.Minute).UnixMilli(), Value: 1.5},
},
},
}, },
}, },
}, },
@@ -252,8 +257,8 @@ func TestAnomalyRule_NoData_AbsentFor(t *testing.T) {
rule, err := NewAnomalyRule("test-anomaly-rule", valuer.GenerateUUID(), &postableRule, reader, nil, logger, nil) rule, err := NewAnomalyRule("test-anomaly-rule", valuer.GenerateUUID(), &postableRule, reader, nil, logger, nil)
require.NoError(t, err) require.NoError(t, err)
rule.provider = &mockAnomalyProvider{ rule.providerV2 = &mockAnomalyProviderV2{
responses: []*anomaly.GetAnomaliesResponse{responseWithData, responseNoData}, responses: []*anomalyV2.AnomaliesResponse{responseWithData, responseNoData},
} }
alertsFound1, err := rule.Eval(context.Background(), t1) alertsFound1, err := rule.Eval(context.Background(), t1)

View File

@@ -6,7 +6,7 @@ import logEvent from 'api/common/logEvent';
import PromQLIcon from 'assets/Dashboard/PromQl'; import PromQLIcon from 'assets/Dashboard/PromQl';
import { QueryBuilderV2 } from 'components/QueryBuilderV2/QueryBuilderV2'; import { QueryBuilderV2 } from 'components/QueryBuilderV2/QueryBuilderV2';
import { ALERTS_DATA_SOURCE_MAP } from 'constants/alerts'; import { ALERTS_DATA_SOURCE_MAP } from 'constants/alerts';
import { ENTITY_VERSION_V4 } from 'constants/app'; import { ENTITY_VERSION_V5 } from 'constants/app';
import { PANEL_TYPES } from 'constants/queryBuilder'; import { PANEL_TYPES } from 'constants/queryBuilder';
import { QBShortcuts } from 'constants/shortcuts/QBShortcuts'; import { QBShortcuts } from 'constants/shortcuts/QBShortcuts';
import RunQueryBtn from 'container/QueryBuilder/components/RunQueryBtn/RunQueryBtn'; import RunQueryBtn from 'container/QueryBuilder/components/RunQueryBtn/RunQueryBtn';
@@ -63,12 +63,8 @@ function QuerySection({
signalSource: signalSource === 'meter' ? 'meter' : '', signalSource: signalSource === 'meter' ? 'meter' : '',
}} }}
showTraceOperator={alertType === AlertTypes.TRACES_BASED_ALERT} showTraceOperator={alertType === AlertTypes.TRACES_BASED_ALERT}
showFunctions={ showFunctions
(alertType === AlertTypes.METRICS_BASED_ALERT && version={ENTITY_VERSION_V5}
alertDef.version === ENTITY_VERSION_V4) ||
alertType === AlertTypes.LOGS_BASED_ALERT
}
version={alertDef.version || 'v3'}
onSignalSourceChange={handleSignalSourceChange} onSignalSourceChange={handleSignalSourceChange}
signalSourceChangeEnabled signalSourceChangeEnabled
/> />

View File

@@ -625,7 +625,7 @@ func (r *BaseRule) extractMetricAndGroupBys(ctx context.Context) (map[string][]s
// FilterNewSeries filters out items that are too new based on metadata first_seen timestamps. // FilterNewSeries filters out items that are too new based on metadata first_seen timestamps.
// Returns the filtered series (old ones) excluding new series that are still within the grace period. // Returns the filtered series (old ones) excluding new series that are still within the grace period.
func (r *BaseRule) FilterNewSeries(ctx context.Context, ts time.Time, series []*v3.Series) ([]*v3.Series, error) { func (r *BaseRule) FilterNewSeries(ctx context.Context, ts time.Time, series []*qbtypes.TimeSeries) ([]*qbtypes.TimeSeries, error) {
// Extract metric names and groupBy keys // Extract metric names and groupBy keys
metricToGroupedFields, err := r.extractMetricAndGroupBys(ctx) metricToGroupedFields, err := r.extractMetricAndGroupBys(ctx)
if err != nil { if err != nil {
@@ -642,7 +642,7 @@ func (r *BaseRule) FilterNewSeries(ctx context.Context, ts time.Time, series []*
seriesIdxToLookupKeys := make(map[int][]telemetrytypes.MetricMetadataLookupKey) // series index -> lookup keys seriesIdxToLookupKeys := make(map[int][]telemetrytypes.MetricMetadataLookupKey) // series index -> lookup keys
for i := 0; i < len(series); i++ { for i := 0; i < len(series); i++ {
metricLabelMap := series[i].Labels metricLabelMap := series[i].LabelsMap()
// Collect groupBy attribute-value pairs for this series // Collect groupBy attribute-value pairs for this series
seriesKeys := make([]telemetrytypes.MetricMetadataLookupKey, 0) seriesKeys := make([]telemetrytypes.MetricMetadataLookupKey, 0)
@@ -689,7 +689,7 @@ func (r *BaseRule) FilterNewSeries(ctx context.Context, ts time.Time, series []*
} }
// Filter series based on first_seen + delay // Filter series based on first_seen + delay
filteredSeries := make([]*v3.Series, 0, len(series)) filteredSeries := make([]*qbtypes.TimeSeries, 0, len(series))
evalTimeMs := ts.UnixMilli() evalTimeMs := ts.UnixMilli()
newGroupEvalDelayMs := r.newGroupEvalDelay.Milliseconds() newGroupEvalDelayMs := r.newGroupEvalDelay.Milliseconds()
@@ -727,7 +727,7 @@ func (r *BaseRule) FilterNewSeries(ctx context.Context, ts time.Time, series []*
// Check if first_seen + delay has passed // Check if first_seen + delay has passed
if maxFirstSeen+newGroupEvalDelayMs > evalTimeMs { if maxFirstSeen+newGroupEvalDelayMs > evalTimeMs {
// Still within grace period, skip this series // Still within grace period, skip this series
r.logger.InfoContext(ctx, "Skipping new series", "rule_name", r.Name(), "series_idx", i, "max_first_seen", maxFirstSeen, "eval_time_ms", evalTimeMs, "delay_ms", newGroupEvalDelayMs, "labels", series[i].Labels) r.logger.InfoContext(ctx, "Skipping new series", "rule_name", r.Name(), "series_idx", i, "max_first_seen", maxFirstSeen, "eval_time_ms", evalTimeMs, "delay_ms", newGroupEvalDelayMs, "labels", series[i].LabelsMap())
continue continue
} }

View File

@@ -26,33 +26,33 @@ import (
"github.com/SigNoz/signoz/pkg/valuer" "github.com/SigNoz/signoz/pkg/valuer"
) )
// createTestSeries creates a *v3.Series with the given labels and optional points // createTestSeries creates a *qbtypes.TimeSeries with the given labels and optional values
// so we don't exactly need the points in the series because the labels are used to determine if the series is new or old // so we don't exactly need the points in the series because the labels are used to determine if the series is new or old
// we use the labels to create a lookup key for the series and then check the first_seen timestamp for the series in the metadata table // we use the labels to create a lookup key for the series and then check the first_seen timestamp for the series in the metadata table
func createTestSeries(labels map[string]string, points []v3.Point) *v3.Series { func createTestSeries(labels map[string]string, points []*qbtypes.TimeSeriesValue) *qbtypes.TimeSeries {
if points == nil { if points == nil {
points = []v3.Point{} points = []*qbtypes.TimeSeriesValue{}
} }
return &v3.Series{ lbls := make([]*qbtypes.Label, 0, len(labels))
Labels: labels, for k, v := range labels {
Points: points, lbls = append(lbls, &qbtypes.Label{Key: telemetrytypes.TelemetryFieldKey{Name: k}, Value: v})
}
return &qbtypes.TimeSeries{
Labels: lbls,
Values: points,
} }
} }
// seriesEqual compares two v3.Series by their labels // seriesEqual compares two *qbtypes.TimeSeries by their labels
// Returns true if the series have the same labels (order doesn't matter) // Returns true if the series have the same labels (order doesn't matter)
func seriesEqual(s1, s2 *v3.Series) bool { func seriesEqual(s1, s2 *qbtypes.TimeSeries) bool {
if s1 == nil && s2 == nil { m1 := s1.LabelsMap()
return true m2 := s2.LabelsMap()
} if len(m1) != len(m2) {
if s1 == nil || s2 == nil {
return false return false
} }
if len(s1.Labels) != len(s2.Labels) { for k, v := range m1 {
return false if m2[k] != v {
}
for k, v := range s1.Labels {
if s2.Labels[k] != v {
return false return false
} }
} }
@@ -149,11 +149,11 @@ func createPostableRule(compositeQuery *v3.CompositeQuery) ruletypes.PostableRul
type filterNewSeriesTestCase struct { type filterNewSeriesTestCase struct {
name string name string
compositeQuery *v3.CompositeQuery compositeQuery *v3.CompositeQuery
series []*v3.Series series []*qbtypes.TimeSeries
firstSeenMap map[telemetrytypes.MetricMetadataLookupKey]int64 firstSeenMap map[telemetrytypes.MetricMetadataLookupKey]int64
newGroupEvalDelay valuer.TextDuration newGroupEvalDelay valuer.TextDuration
evalTime time.Time evalTime time.Time
expectedFiltered []*v3.Series // series that should be in the final filtered result (old enough) expectedFiltered []*qbtypes.TimeSeries // series that should be in the final filtered result (old enough)
expectError bool expectError bool
} }
@@ -193,7 +193,7 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
}, },
}, },
}, },
series: []*v3.Series{ series: []*qbtypes.TimeSeries{
createTestSeries(map[string]string{"service_name": "svc-old", "env": "prod"}, nil), createTestSeries(map[string]string{"service_name": "svc-old", "env": "prod"}, nil),
createTestSeries(map[string]string{"service_name": "svc-new", "env": "prod"}, nil), createTestSeries(map[string]string{"service_name": "svc-new", "env": "prod"}, nil),
createTestSeries(map[string]string{"service_name": "svc-missing", "env": "stage"}, nil), createTestSeries(map[string]string{"service_name": "svc-missing", "env": "stage"}, nil),
@@ -205,7 +205,7 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
), ),
newGroupEvalDelay: defaultNewGroupEvalDelay, newGroupEvalDelay: defaultNewGroupEvalDelay,
evalTime: defaultEvalTime, evalTime: defaultEvalTime,
expectedFiltered: []*v3.Series{ expectedFiltered: []*qbtypes.TimeSeries{
createTestSeries(map[string]string{"service_name": "svc-old", "env": "prod"}, nil), createTestSeries(map[string]string{"service_name": "svc-old", "env": "prod"}, nil),
createTestSeries(map[string]string{"service_name": "svc-missing", "env": "stage"}, nil), createTestSeries(map[string]string{"service_name": "svc-missing", "env": "stage"}, nil),
}, // svc-old and svc-missing should be included; svc-new is filtered out }, // svc-old and svc-missing should be included; svc-new is filtered out
@@ -227,7 +227,7 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
}, },
}, },
}, },
series: []*v3.Series{ series: []*qbtypes.TimeSeries{
createTestSeries(map[string]string{"service_name": "svc-new1", "env": "prod"}, nil), createTestSeries(map[string]string{"service_name": "svc-new1", "env": "prod"}, nil),
createTestSeries(map[string]string{"service_name": "svc-new2", "env": "stage"}, nil), createTestSeries(map[string]string{"service_name": "svc-new2", "env": "stage"}, nil),
}, },
@@ -237,7 +237,7 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
), ),
newGroupEvalDelay: defaultNewGroupEvalDelay, newGroupEvalDelay: defaultNewGroupEvalDelay,
evalTime: defaultEvalTime, evalTime: defaultEvalTime,
expectedFiltered: []*v3.Series{}, // all should be filtered out (new series) expectedFiltered: []*qbtypes.TimeSeries{}, // all should be filtered out (new series)
}, },
{ {
name: "all old series - ClickHouse query", name: "all old series - ClickHouse query",
@@ -254,7 +254,7 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
}, },
}, },
}, },
series: []*v3.Series{ series: []*qbtypes.TimeSeries{
createTestSeries(map[string]string{"service_name": "svc-old1", "env": "prod"}, nil), createTestSeries(map[string]string{"service_name": "svc-old1", "env": "prod"}, nil),
createTestSeries(map[string]string{"service_name": "svc-old2", "env": "stage"}, nil), createTestSeries(map[string]string{"service_name": "svc-old2", "env": "stage"}, nil),
}, },
@@ -264,7 +264,7 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
), ),
newGroupEvalDelay: defaultNewGroupEvalDelay, newGroupEvalDelay: defaultNewGroupEvalDelay,
evalTime: defaultEvalTime, evalTime: defaultEvalTime,
expectedFiltered: []*v3.Series{ expectedFiltered: []*qbtypes.TimeSeries{
createTestSeries(map[string]string{"service_name": "svc-old1", "env": "prod"}, nil), createTestSeries(map[string]string{"service_name": "svc-old1", "env": "prod"}, nil),
createTestSeries(map[string]string{"service_name": "svc-old2", "env": "stage"}, nil), createTestSeries(map[string]string{"service_name": "svc-old2", "env": "stage"}, nil),
}, // all should be included (old series) }, // all should be included (old series)
@@ -292,13 +292,13 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
}, },
}, },
}, },
series: []*v3.Series{ series: []*qbtypes.TimeSeries{
createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil), createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil),
}, },
firstSeenMap: make(map[telemetrytypes.MetricMetadataLookupKey]int64), firstSeenMap: make(map[telemetrytypes.MetricMetadataLookupKey]int64),
newGroupEvalDelay: defaultNewGroupEvalDelay, newGroupEvalDelay: defaultNewGroupEvalDelay,
evalTime: defaultEvalTime, evalTime: defaultEvalTime,
expectedFiltered: []*v3.Series{ expectedFiltered: []*qbtypes.TimeSeries{
createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil), createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil),
}, // early return, no filtering - all series included }, // early return, no filtering - all series included
}, },
@@ -322,13 +322,13 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
}, },
}, },
}, },
series: []*v3.Series{ series: []*qbtypes.TimeSeries{
createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil), createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil),
}, },
firstSeenMap: make(map[telemetrytypes.MetricMetadataLookupKey]int64), firstSeenMap: make(map[telemetrytypes.MetricMetadataLookupKey]int64),
newGroupEvalDelay: defaultNewGroupEvalDelay, newGroupEvalDelay: defaultNewGroupEvalDelay,
evalTime: defaultEvalTime, evalTime: defaultEvalTime,
expectedFiltered: []*v3.Series{ expectedFiltered: []*qbtypes.TimeSeries{
createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil), createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil),
}, // early return, no filtering - all series included }, // early return, no filtering - all series included
}, },
@@ -358,13 +358,13 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
}, },
}, },
}, },
series: []*v3.Series{ series: []*qbtypes.TimeSeries{
createTestSeries(map[string]string{"status": "200"}, nil), // no service_name or env createTestSeries(map[string]string{"status": "200"}, nil), // no service_name or env
}, },
firstSeenMap: make(map[telemetrytypes.MetricMetadataLookupKey]int64), firstSeenMap: make(map[telemetrytypes.MetricMetadataLookupKey]int64),
newGroupEvalDelay: defaultNewGroupEvalDelay, newGroupEvalDelay: defaultNewGroupEvalDelay,
evalTime: defaultEvalTime, evalTime: defaultEvalTime,
expectedFiltered: []*v3.Series{ expectedFiltered: []*qbtypes.TimeSeries{
createTestSeries(map[string]string{"status": "200"}, nil), createTestSeries(map[string]string{"status": "200"}, nil),
}, // series included as we can't decide if it's new or old }, // series included as we can't decide if it's new or old
}, },
@@ -385,7 +385,7 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
}, },
}, },
}, },
series: []*v3.Series{ series: []*qbtypes.TimeSeries{
createTestSeries(map[string]string{"service_name": "svc-old", "env": "prod"}, nil), createTestSeries(map[string]string{"service_name": "svc-old", "env": "prod"}, nil),
createTestSeries(map[string]string{"service_name": "svc-no-metadata", "env": "prod"}, nil), createTestSeries(map[string]string{"service_name": "svc-no-metadata", "env": "prod"}, nil),
}, },
@@ -393,7 +393,7 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
// svc-no-metadata has no entry in firstSeenMap // svc-no-metadata has no entry in firstSeenMap
newGroupEvalDelay: defaultNewGroupEvalDelay, newGroupEvalDelay: defaultNewGroupEvalDelay,
evalTime: defaultEvalTime, evalTime: defaultEvalTime,
expectedFiltered: []*v3.Series{ expectedFiltered: []*qbtypes.TimeSeries{
createTestSeries(map[string]string{"service_name": "svc-old", "env": "prod"}, nil), createTestSeries(map[string]string{"service_name": "svc-old", "env": "prod"}, nil),
createTestSeries(map[string]string{"service_name": "svc-no-metadata", "env": "prod"}, nil), createTestSeries(map[string]string{"service_name": "svc-no-metadata", "env": "prod"}, nil),
}, // both should be included - svc-old is old, svc-no-metadata can't be decided }, // both should be included - svc-old is old, svc-no-metadata can't be decided
@@ -413,7 +413,7 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
}, },
}, },
}, },
series: []*v3.Series{ series: []*qbtypes.TimeSeries{
createTestSeries(map[string]string{"service_name": "svc-partial", "env": "prod"}, nil), createTestSeries(map[string]string{"service_name": "svc-partial", "env": "prod"}, nil),
}, },
// Only provide metadata for service_name, not env // Only provide metadata for service_name, not env
@@ -423,7 +423,7 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
}, },
newGroupEvalDelay: defaultNewGroupEvalDelay, newGroupEvalDelay: defaultNewGroupEvalDelay,
evalTime: defaultEvalTime, evalTime: defaultEvalTime,
expectedFiltered: []*v3.Series{ expectedFiltered: []*qbtypes.TimeSeries{
createTestSeries(map[string]string{"service_name": "svc-partial", "env": "prod"}, nil), createTestSeries(map[string]string{"service_name": "svc-partial", "env": "prod"}, nil),
}, // has some metadata, uses max first_seen which is old }, // has some metadata, uses max first_seen which is old
}, },
@@ -453,11 +453,11 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
}, },
}, },
}, },
series: []*v3.Series{}, series: []*qbtypes.TimeSeries{},
firstSeenMap: make(map[telemetrytypes.MetricMetadataLookupKey]int64), firstSeenMap: make(map[telemetrytypes.MetricMetadataLookupKey]int64),
newGroupEvalDelay: defaultNewGroupEvalDelay, newGroupEvalDelay: defaultNewGroupEvalDelay,
evalTime: defaultEvalTime, evalTime: defaultEvalTime,
expectedFiltered: []*v3.Series{}, expectedFiltered: []*qbtypes.TimeSeries{},
}, },
{ {
name: "zero delay - Builder", name: "zero delay - Builder",
@@ -485,13 +485,13 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
}, },
}, },
}, },
series: []*v3.Series{ series: []*qbtypes.TimeSeries{
createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil), createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil),
}, },
firstSeenMap: createFirstSeenMap("request_total", defaultGroupByFields, defaultEvalTime, defaultDelay, true, "svc1", "prod"), firstSeenMap: createFirstSeenMap("request_total", defaultGroupByFields, defaultEvalTime, defaultDelay, true, "svc1", "prod"),
newGroupEvalDelay: valuer.TextDuration{}, // zero delay newGroupEvalDelay: valuer.TextDuration{}, // zero delay
evalTime: defaultEvalTime, evalTime: defaultEvalTime,
expectedFiltered: []*v3.Series{ expectedFiltered: []*qbtypes.TimeSeries{
createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil), createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil),
}, // with zero delay, all series pass }, // with zero delay, all series pass
}, },
@@ -526,7 +526,7 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
}, },
}, },
}, },
series: []*v3.Series{ series: []*qbtypes.TimeSeries{
createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil), createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil),
}, },
firstSeenMap: mergeFirstSeenMaps( firstSeenMap: mergeFirstSeenMaps(
@@ -535,7 +535,7 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
), ),
newGroupEvalDelay: defaultNewGroupEvalDelay, newGroupEvalDelay: defaultNewGroupEvalDelay,
evalTime: defaultEvalTime, evalTime: defaultEvalTime,
expectedFiltered: []*v3.Series{ expectedFiltered: []*qbtypes.TimeSeries{
createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil), createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil),
}, },
}, },
@@ -565,7 +565,7 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
}, },
}, },
}, },
series: []*v3.Series{ series: []*qbtypes.TimeSeries{
createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil), createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil),
}, },
// service_name is old, env is new - should use max (new) // service_name is old, env is new - should use max (new)
@@ -575,7 +575,7 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
), ),
newGroupEvalDelay: defaultNewGroupEvalDelay, newGroupEvalDelay: defaultNewGroupEvalDelay,
evalTime: defaultEvalTime, evalTime: defaultEvalTime,
expectedFiltered: []*v3.Series{}, // max first_seen is new, so should be filtered out expectedFiltered: []*qbtypes.TimeSeries{}, // max first_seen is new, so should be filtered out
}, },
{ {
name: "Logs query - should skip filtering and return empty skip indexes", name: "Logs query - should skip filtering and return empty skip indexes",
@@ -600,14 +600,14 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
}, },
}, },
}, },
series: []*v3.Series{ series: []*qbtypes.TimeSeries{
createTestSeries(map[string]string{"service_name": "svc1"}, nil), createTestSeries(map[string]string{"service_name": "svc1"}, nil),
createTestSeries(map[string]string{"service_name": "svc2"}, nil), createTestSeries(map[string]string{"service_name": "svc2"}, nil),
}, },
firstSeenMap: make(map[telemetrytypes.MetricMetadataLookupKey]int64), firstSeenMap: make(map[telemetrytypes.MetricMetadataLookupKey]int64),
newGroupEvalDelay: defaultNewGroupEvalDelay, newGroupEvalDelay: defaultNewGroupEvalDelay,
evalTime: defaultEvalTime, evalTime: defaultEvalTime,
expectedFiltered: []*v3.Series{ expectedFiltered: []*qbtypes.TimeSeries{
createTestSeries(map[string]string{"service_name": "svc1"}, nil), createTestSeries(map[string]string{"service_name": "svc1"}, nil),
createTestSeries(map[string]string{"service_name": "svc2"}, nil), createTestSeries(map[string]string{"service_name": "svc2"}, nil),
}, // Logs queries should return early, no filtering - all included }, // Logs queries should return early, no filtering - all included
@@ -635,14 +635,14 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
}, },
}, },
}, },
series: []*v3.Series{ series: []*qbtypes.TimeSeries{
createTestSeries(map[string]string{"service_name": "svc1"}, nil), createTestSeries(map[string]string{"service_name": "svc1"}, nil),
createTestSeries(map[string]string{"service_name": "svc2"}, nil), createTestSeries(map[string]string{"service_name": "svc2"}, nil),
}, },
firstSeenMap: make(map[telemetrytypes.MetricMetadataLookupKey]int64), firstSeenMap: make(map[telemetrytypes.MetricMetadataLookupKey]int64),
newGroupEvalDelay: defaultNewGroupEvalDelay, newGroupEvalDelay: defaultNewGroupEvalDelay,
evalTime: defaultEvalTime, evalTime: defaultEvalTime,
expectedFiltered: []*v3.Series{ expectedFiltered: []*qbtypes.TimeSeries{
createTestSeries(map[string]string{"service_name": "svc1"}, nil), createTestSeries(map[string]string{"service_name": "svc1"}, nil),
createTestSeries(map[string]string{"service_name": "svc2"}, nil), createTestSeries(map[string]string{"service_name": "svc2"}, nil),
}, // Traces queries should return early, no filtering - all included }, // Traces queries should return early, no filtering - all included
@@ -724,14 +724,14 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
// Build a map to count occurrences of each unique label combination in expected series // Build a map to count occurrences of each unique label combination in expected series
expectedCounts := make(map[string]int) expectedCounts := make(map[string]int)
for _, expected := range tt.expectedFiltered { for _, expected := range tt.expectedFiltered {
key := labelsKey(expected.Labels) key := labelsKey(expected.LabelsMap())
expectedCounts[key]++ expectedCounts[key]++
} }
// Build a map to count occurrences of each unique label combination in filtered series // Build a map to count occurrences of each unique label combination in filtered series
actualCounts := make(map[string]int) actualCounts := make(map[string]int)
for _, filtered := range filteredSeries { for _, filtered := range filteredSeries {
key := labelsKey(filtered.Labels) key := labelsKey(filtered.LabelsMap())
actualCounts[key]++ actualCounts[key]++
} }

View File

@@ -12,19 +12,18 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/formatter" "github.com/SigNoz/signoz/pkg/query-service/formatter"
"github.com/SigNoz/signoz/pkg/query-service/interfaces" "github.com/SigNoz/signoz/pkg/query-service/interfaces"
"github.com/SigNoz/signoz/pkg/query-service/model" "github.com/SigNoz/signoz/pkg/query-service/model"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
qslabels "github.com/SigNoz/signoz/pkg/query-service/utils/labels" qslabels "github.com/SigNoz/signoz/pkg/query-service/utils/labels"
"github.com/SigNoz/signoz/pkg/query-service/utils/times" "github.com/SigNoz/signoz/pkg/query-service/utils/times"
"github.com/SigNoz/signoz/pkg/query-service/utils/timestamp" "github.com/SigNoz/signoz/pkg/query-service/utils/timestamp"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/ruletypes" "github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer" "github.com/SigNoz/signoz/pkg/valuer"
"github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql"
) )
type PromRule struct { type PromRule struct {
*BaseRule *BaseRule
version string
prometheus prometheus.Prometheus prometheus prometheus.Prometheus
} }
@@ -48,7 +47,6 @@ func NewPromRule(
p := PromRule{ p := PromRule{
BaseRule: baseRule, BaseRule: baseRule,
version: postableRule.Version,
prometheus: prometheus, prometheus: prometheus,
} }
p.logger = logger p.logger = logger
@@ -83,48 +81,30 @@ func (r *PromRule) GetSelectedQuery() string {
} }
func (r *PromRule) getPqlQuery() (string, error) { func (r *PromRule) getPqlQuery() (string, error) {
if r.version == "v5" { if len(r.ruleCondition.CompositeQuery.Queries) > 0 {
if len(r.ruleCondition.CompositeQuery.Queries) > 0 { selectedQuery := r.GetSelectedQuery()
selectedQuery := r.GetSelectedQuery() for _, item := range r.ruleCondition.CompositeQuery.Queries {
for _, item := range r.ruleCondition.CompositeQuery.Queries { switch item.Type {
switch item.Type { case qbtypes.QueryTypePromQL:
case qbtypes.QueryTypePromQL: promQuery, ok := item.Spec.(qbtypes.PromQuery)
promQuery, ok := item.Spec.(qbtypes.PromQuery) if !ok {
if !ok { return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid promql query spec %T", item.Spec)
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid promql query spec %T", item.Spec) }
} if promQuery.Name == selectedQuery {
if promQuery.Name == selectedQuery { return promQuery.Query, nil
return promQuery.Query, nil
}
} }
} }
} }
return "", fmt.Errorf("invalid promql rule setup")
} }
return "", fmt.Errorf("invalid promql rule setup")
if r.ruleCondition.CompositeQuery.QueryType == v3.QueryTypePromQL {
if len(r.ruleCondition.CompositeQuery.PromQueries) > 0 {
selectedQuery := r.GetSelectedQuery()
if promQuery, ok := r.ruleCondition.CompositeQuery.PromQueries[selectedQuery]; ok {
query := promQuery.Query
if query == "" {
return query, fmt.Errorf("a promquery needs to be set for this rule to function")
}
return query, nil
}
}
}
return "", fmt.Errorf("invalid promql rule query")
} }
func (r *PromRule) matrixToV3Series(res promql.Matrix) []*v3.Series { func matrixToTimeSeries(res promql.Matrix) []*qbtypes.TimeSeries {
v3Series := make([]*v3.Series, 0, len(res)) result := make([]*qbtypes.TimeSeries, 0, len(res))
for _, series := range res { for _, series := range res {
commonSeries := toCommonSeries(series) result = append(result, promSeriesToTimeSeries(series))
v3Series = append(v3Series, &commonSeries)
} }
return v3Series return result
} }
func (r *PromRule) buildAndRunQuery(ctx context.Context, ts time.Time) (ruletypes.Vector, error) { func (r *PromRule) buildAndRunQuery(ctx context.Context, ts time.Time) (ruletypes.Vector, error) {
@@ -143,31 +123,31 @@ func (r *PromRule) buildAndRunQuery(ctx context.Context, ts time.Time) (ruletype
return nil, err return nil, err
} }
matrixToProcess := r.matrixToV3Series(res) seriesToProcess := matrixToTimeSeries(res)
hasData := len(matrixToProcess) > 0 hasData := len(seriesToProcess) > 0
if missingDataAlert := r.HandleMissingDataAlert(ctx, ts, hasData); missingDataAlert != nil { if missingDataAlert := r.HandleMissingDataAlert(ctx, ts, hasData); missingDataAlert != nil {
return ruletypes.Vector{*missingDataAlert}, nil return ruletypes.Vector{*missingDataAlert}, nil
} }
// Filter out new series if newGroupEvalDelay is configured // Filter out new series if newGroupEvalDelay is configured
if r.ShouldSkipNewGroups() { if r.ShouldSkipNewGroups() {
filteredSeries, filterErr := r.BaseRule.FilterNewSeries(ctx, ts, matrixToProcess) filteredSeries, filterErr := r.BaseRule.FilterNewSeries(ctx, ts, seriesToProcess)
// In case of error we log the error and continue with the original series // In case of error we log the error and continue with the original series
if filterErr != nil { if filterErr != nil {
r.logger.ErrorContext(ctx, "Error filtering new series, ", "error", filterErr, "rule_name", r.Name()) r.logger.ErrorContext(ctx, "Error filtering new series, ", "error", filterErr, "rule_name", r.Name())
} else { } else {
matrixToProcess = filteredSeries seriesToProcess = filteredSeries
} }
} }
var resultVector ruletypes.Vector var resultVector ruletypes.Vector
for _, series := range matrixToProcess { for _, series := range seriesToProcess {
if !r.Condition().ShouldEval(series) { if !r.Condition().ShouldEval(series) {
r.logger.InfoContext( r.logger.InfoContext(
ctx, "not enough data points to evaluate series, skipping", ctx, "not enough data points to evaluate series, skipping",
"rule_id", r.ID(), "num_points", len(series.Points), "required_points", r.Condition().RequiredNumPoints, "rule_id", r.ID(), "num_points", len(series.Values), "required_points", r.Condition().RequiredNumPoints,
) )
continue continue
} }
@@ -454,26 +434,25 @@ func (r *PromRule) RunAlertQuery(ctx context.Context, qs string, start, end time
} }
} }
func toCommonSeries(series promql.Series) v3.Series { func promSeriesToTimeSeries(series promql.Series) *qbtypes.TimeSeries {
commonSeries := v3.Series{ ts := &qbtypes.TimeSeries{
Labels: make(map[string]string), Labels: make([]*qbtypes.Label, 0, len(series.Metric)),
LabelsArray: make([]map[string]string, 0), Values: make([]*qbtypes.TimeSeriesValue, 0, len(series.Floats)),
Points: make([]v3.Point, 0),
} }
for _, lbl := range series.Metric { for _, lbl := range series.Metric {
commonSeries.Labels[lbl.Name] = lbl.Value ts.Labels = append(ts.Labels, &qbtypes.Label{
commonSeries.LabelsArray = append(commonSeries.LabelsArray, map[string]string{ Key: telemetrytypes.TelemetryFieldKey{Name: lbl.Name},
lbl.Name: lbl.Value, Value: lbl.Value,
}) })
} }
for _, f := range series.Floats { for _, f := range series.Floats {
commonSeries.Points = append(commonSeries.Points, v3.Point{ ts.Values = append(ts.Values, &qbtypes.TimeSeriesValue{
Timestamp: f.T, Timestamp: f.T,
Value: f.F, Value: f.F,
}) })
} }
return commonSeries return ts
} }

View File

@@ -20,6 +20,7 @@ import (
qslabels "github.com/SigNoz/signoz/pkg/query-service/utils/labels" qslabels "github.com/SigNoz/signoz/pkg/query-service/utils/labels"
"github.com/SigNoz/signoz/pkg/telemetrystore" "github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest" "github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/ruletypes" "github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/valuer" "github.com/SigNoz/signoz/pkg/valuer"
) )
@@ -47,9 +48,13 @@ func TestPromRuleEval(t *testing.T) {
RuleCondition: &ruletypes.RuleCondition{ RuleCondition: &ruletypes.RuleCondition{
CompositeQuery: &v3.CompositeQuery{ CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypePromQL, QueryType: v3.QueryTypePromQL,
PromQueries: map[string]*v3.PromQuery{ Queries: []qbtypes.QueryEnvelope{
"A": { {
Query: "dummy_query", // This is not used in the test Type: qbtypes.QueryTypePromQL,
Spec: qbtypes.PromQuery{
Name: "A",
Query: "dummy_query", // This is not used in the test
},
}, },
}, },
}, },
@@ -62,7 +67,7 @@ func TestPromRuleEval(t *testing.T) {
compareOp string compareOp string
matchType string matchType string
target float64 target float64
expectedAlertSample v3.Point expectedAlertSample float64
expectedVectorValues []float64 // Expected values in result vector expectedVectorValues []float64 // Expected values in result vector
}{ }{
// Test cases for Equals Always // Test cases for Equals Always
@@ -80,7 +85,7 @@ func TestPromRuleEval(t *testing.T) {
compareOp: "3", // Equals compareOp: "3", // Equals
matchType: "2", // Always matchType: "2", // Always
target: 0.0, target: 0.0,
expectedAlertSample: v3.Point{Value: 0.0}, expectedAlertSample: 0.0,
expectedVectorValues: []float64{0.0}, expectedVectorValues: []float64{0.0},
}, },
{ {
@@ -145,7 +150,7 @@ func TestPromRuleEval(t *testing.T) {
compareOp: "3", // Equals compareOp: "3", // Equals
matchType: "1", // Once matchType: "1", // Once
target: 0.0, target: 0.0,
expectedAlertSample: v3.Point{Value: 0.0}, expectedAlertSample: 0.0,
expectedVectorValues: []float64{0.0}, expectedVectorValues: []float64{0.0},
}, },
{ {
@@ -162,7 +167,7 @@ func TestPromRuleEval(t *testing.T) {
compareOp: "3", // Equals compareOp: "3", // Equals
matchType: "1", // Once matchType: "1", // Once
target: 0.0, target: 0.0,
expectedAlertSample: v3.Point{Value: 0.0}, expectedAlertSample: 0.0,
}, },
{ {
values: pql.Series{ values: pql.Series{
@@ -178,7 +183,7 @@ func TestPromRuleEval(t *testing.T) {
compareOp: "3", // Equals compareOp: "3", // Equals
matchType: "1", // Once matchType: "1", // Once
target: 0.0, target: 0.0,
expectedAlertSample: v3.Point{Value: 0.0}, expectedAlertSample: 0.0,
}, },
{ {
values: pql.Series{ values: pql.Series{
@@ -211,7 +216,7 @@ func TestPromRuleEval(t *testing.T) {
compareOp: "1", // Greater Than compareOp: "1", // Greater Than
matchType: "2", // Always matchType: "2", // Always
target: 1.5, target: 1.5,
expectedAlertSample: v3.Point{Value: 2.0}, expectedAlertSample: 2.0,
expectedVectorValues: []float64{2.0}, expectedVectorValues: []float64{2.0},
}, },
{ {
@@ -228,7 +233,7 @@ func TestPromRuleEval(t *testing.T) {
compareOp: "1", // Above compareOp: "1", // Above
matchType: "2", // Always matchType: "2", // Always
target: 2.0, target: 2.0,
expectedAlertSample: v3.Point{Value: 3.0}, expectedAlertSample: 3.0,
}, },
{ {
values: pql.Series{ values: pql.Series{
@@ -244,7 +249,7 @@ func TestPromRuleEval(t *testing.T) {
compareOp: "2", // Below compareOp: "2", // Below
matchType: "2", // Always matchType: "2", // Always
target: 13.0, target: 13.0,
expectedAlertSample: v3.Point{Value: 12.0}, expectedAlertSample: 12.0,
}, },
{ {
values: pql.Series{ values: pql.Series{
@@ -276,7 +281,7 @@ func TestPromRuleEval(t *testing.T) {
compareOp: "1", // Greater Than compareOp: "1", // Greater Than
matchType: "1", // Once matchType: "1", // Once
target: 4.5, target: 4.5,
expectedAlertSample: v3.Point{Value: 10.0}, expectedAlertSample: 10.0,
expectedVectorValues: []float64{10.0}, expectedVectorValues: []float64{10.0},
}, },
{ {
@@ -339,7 +344,7 @@ func TestPromRuleEval(t *testing.T) {
compareOp: "4", // Not Equals compareOp: "4", // Not Equals
matchType: "2", // Always matchType: "2", // Always
target: 0.0, target: 0.0,
expectedAlertSample: v3.Point{Value: 1.0}, expectedAlertSample: 1.0,
}, },
{ {
values: pql.Series{ values: pql.Series{
@@ -371,7 +376,7 @@ func TestPromRuleEval(t *testing.T) {
compareOp: "4", // Not Equals compareOp: "4", // Not Equals
matchType: "1", // Once matchType: "1", // Once
target: 0.0, target: 0.0,
expectedAlertSample: v3.Point{Value: 1.0}, expectedAlertSample: 1.0,
}, },
{ {
values: pql.Series{ values: pql.Series{
@@ -402,7 +407,7 @@ func TestPromRuleEval(t *testing.T) {
compareOp: "4", // Not Equals compareOp: "4", // Not Equals
matchType: "1", // Once matchType: "1", // Once
target: 0.0, target: 0.0,
expectedAlertSample: v3.Point{Value: 1.0}, expectedAlertSample: 1.0,
}, },
{ {
values: pql.Series{ values: pql.Series{
@@ -418,7 +423,7 @@ func TestPromRuleEval(t *testing.T) {
compareOp: "4", // Not Equals compareOp: "4", // Not Equals
matchType: "1", // Once matchType: "1", // Once
target: 0.0, target: 0.0,
expectedAlertSample: v3.Point{Value: 1.0}, expectedAlertSample: 1.0,
}, },
// Test cases for Less Than Always // Test cases for Less Than Always
{ {
@@ -435,7 +440,7 @@ func TestPromRuleEval(t *testing.T) {
compareOp: "2", // Less Than compareOp: "2", // Less Than
matchType: "2", // Always matchType: "2", // Always
target: 4, target: 4,
expectedAlertSample: v3.Point{Value: 1.5}, expectedAlertSample: 1.5,
}, },
{ {
values: pql.Series{ values: pql.Series{
@@ -467,7 +472,7 @@ func TestPromRuleEval(t *testing.T) {
compareOp: "2", // Less Than compareOp: "2", // Less Than
matchType: "1", // Once matchType: "1", // Once
target: 4, target: 4,
expectedAlertSample: v3.Point{Value: 2.5}, expectedAlertSample: 2.5,
}, },
{ {
values: pql.Series{ values: pql.Series{
@@ -499,7 +504,7 @@ func TestPromRuleEval(t *testing.T) {
compareOp: "3", // Equals compareOp: "3", // Equals
matchType: "3", // OnAverage matchType: "3", // OnAverage
target: 6.0, target: 6.0,
expectedAlertSample: v3.Point{Value: 6.0}, expectedAlertSample: 6.0,
}, },
{ {
values: pql.Series{ values: pql.Series{
@@ -530,7 +535,7 @@ func TestPromRuleEval(t *testing.T) {
compareOp: "4", // Not Equals compareOp: "4", // Not Equals
matchType: "3", // OnAverage matchType: "3", // OnAverage
target: 4.5, target: 4.5,
expectedAlertSample: v3.Point{Value: 6.0}, expectedAlertSample: 6.0,
}, },
{ {
values: pql.Series{ values: pql.Series{
@@ -561,7 +566,7 @@ func TestPromRuleEval(t *testing.T) {
compareOp: "1", // Greater Than compareOp: "1", // Greater Than
matchType: "3", // OnAverage matchType: "3", // OnAverage
target: 4.5, target: 4.5,
expectedAlertSample: v3.Point{Value: 6.0}, expectedAlertSample: 6.0,
}, },
{ {
values: pql.Series{ values: pql.Series{
@@ -577,7 +582,7 @@ func TestPromRuleEval(t *testing.T) {
compareOp: "2", // Less Than compareOp: "2", // Less Than
matchType: "3", // OnAverage matchType: "3", // OnAverage
target: 12.0, target: 12.0,
expectedAlertSample: v3.Point{Value: 6.0}, expectedAlertSample: 6.0,
}, },
// Test cases for InTotal // Test cases for InTotal
{ {
@@ -594,7 +599,7 @@ func TestPromRuleEval(t *testing.T) {
compareOp: "3", // Equals compareOp: "3", // Equals
matchType: "4", // InTotal matchType: "4", // InTotal
target: 30.0, target: 30.0,
expectedAlertSample: v3.Point{Value: 30.0}, expectedAlertSample: 30.0,
}, },
{ {
values: pql.Series{ values: pql.Series{
@@ -621,7 +626,7 @@ func TestPromRuleEval(t *testing.T) {
compareOp: "4", // Not Equals compareOp: "4", // Not Equals
matchType: "4", // InTotal matchType: "4", // InTotal
target: 9.0, target: 9.0,
expectedAlertSample: v3.Point{Value: 10.0}, expectedAlertSample: 10.0,
}, },
{ {
values: pql.Series{ values: pql.Series{
@@ -645,7 +650,7 @@ func TestPromRuleEval(t *testing.T) {
compareOp: "1", // Greater Than compareOp: "1", // Greater Than
matchType: "4", // InTotal matchType: "4", // InTotal
target: 10.0, target: 10.0,
expectedAlertSample: v3.Point{Value: 20.0}, expectedAlertSample: 20.0,
}, },
{ {
values: pql.Series{ values: pql.Series{
@@ -670,7 +675,7 @@ func TestPromRuleEval(t *testing.T) {
compareOp: "2", // Less Than compareOp: "2", // Less Than
matchType: "4", // InTotal matchType: "4", // InTotal
target: 30.0, target: 30.0,
expectedAlertSample: v3.Point{Value: 20.0}, expectedAlertSample: 20.0,
}, },
{ {
values: pql.Series{ values: pql.Series{
@@ -708,7 +713,7 @@ func TestPromRuleEval(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
} }
resultVectors, err := rule.Threshold.Eval(toCommonSeries(c.values), rule.Unit(), ruletypes.EvalData{}) resultVectors, err := rule.Threshold.Eval(*promSeriesToTimeSeries(c.values), rule.Unit(), ruletypes.EvalData{})
assert.NoError(t, err) assert.NoError(t, err)
// Compare full result vector with expected vector // Compare full result vector with expected vector
@@ -724,12 +729,12 @@ func TestPromRuleEval(t *testing.T) {
if len(resultVectors) > 0 { if len(resultVectors) > 0 {
found := false found := false
for _, sample := range resultVectors { for _, sample := range resultVectors {
if sample.V == c.expectedAlertSample.Value { if sample.V == c.expectedAlertSample {
found = true found = true
break break
} }
} }
assert.True(t, found, "Expected alert sample value %.2f not found in result vectors for case %d. Got values: %v", c.expectedAlertSample.Value, idx, actualValues) assert.True(t, found, "Expected alert sample value %.2f not found in result vectors for case %d. Got values: %v", c.expectedAlertSample, idx, actualValues)
} }
} else { } else {
assert.Empty(t, resultVectors, "Expected no alert but got result vectors for case %d", idx) assert.Empty(t, resultVectors, "Expected no alert but got result vectors for case %d", idx)
@@ -754,9 +759,13 @@ func TestPromRuleUnitCombinations(t *testing.T) {
RuleCondition: &ruletypes.RuleCondition{ RuleCondition: &ruletypes.RuleCondition{
CompositeQuery: &v3.CompositeQuery{ CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypePromQL, QueryType: v3.QueryTypePromQL,
PromQueries: map[string]*v3.PromQuery{ Queries: []qbtypes.QueryEnvelope{
"A": { {
Query: "test_metric", Type: qbtypes.QueryTypePromQL,
Spec: qbtypes.PromQuery{
Name: "A",
Query: "test_metric",
},
}, },
}, },
}, },
@@ -1013,9 +1022,13 @@ func _Enable_this_after_9146_issue_fix_is_merged_TestPromRuleNoData(t *testing.T
RuleCondition: &ruletypes.RuleCondition{ RuleCondition: &ruletypes.RuleCondition{
CompositeQuery: &v3.CompositeQuery{ CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypePromQL, QueryType: v3.QueryTypePromQL,
PromQueries: map[string]*v3.PromQuery{ Queries: []qbtypes.QueryEnvelope{
"A": { {
Query: "test_metric", Type: qbtypes.QueryTypePromQL,
Spec: qbtypes.PromQuery{
Name: "A",
Query: "test_metric",
},
}, },
}, },
}, },
@@ -1124,9 +1137,13 @@ func TestMultipleThresholdPromRule(t *testing.T) {
RuleCondition: &ruletypes.RuleCondition{ RuleCondition: &ruletypes.RuleCondition{
CompositeQuery: &v3.CompositeQuery{ CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypePromQL, QueryType: v3.QueryTypePromQL,
PromQueries: map[string]*v3.PromQuery{ Queries: []qbtypes.QueryEnvelope{
"A": { {
Query: "test_metric", Type: qbtypes.QueryTypePromQL,
Spec: qbtypes.PromQuery{
Name: "A",
Query: "test_metric",
},
}, },
}, },
}, },
@@ -1361,8 +1378,11 @@ func TestPromRule_NoData(t *testing.T) {
MatchType: ruletypes.AtleastOnce, MatchType: ruletypes.AtleastOnce,
CompositeQuery: &v3.CompositeQuery{ CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypePromQL, QueryType: v3.QueryTypePromQL,
PromQueries: map[string]*v3.PromQuery{ Queries: []qbtypes.QueryEnvelope{
"A": {Query: "test_metric"}, {
Type: qbtypes.QueryTypePromQL,
Spec: qbtypes.PromQuery{Name: "A", Query: "test_metric"},
},
}, },
}, },
Thresholds: &ruletypes.RuleThresholdData{ Thresholds: &ruletypes.RuleThresholdData{
@@ -1486,8 +1506,11 @@ func TestPromRule_NoData_AbsentFor(t *testing.T) {
Target: &target, Target: &target,
CompositeQuery: &v3.CompositeQuery{ CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypePromQL, QueryType: v3.QueryTypePromQL,
PromQueries: map[string]*v3.PromQuery{ Queries: []qbtypes.QueryEnvelope{
"A": {Query: "test_metric"}, {
Type: qbtypes.QueryTypePromQL,
Spec: qbtypes.PromQuery{Name: "A", Query: "test_metric"},
},
}, },
}, },
Thresholds: &ruletypes.RuleThresholdData{ Thresholds: &ruletypes.RuleThresholdData{
@@ -1635,8 +1658,11 @@ func TestPromRuleEval_RequireMinPoints(t *testing.T) {
MatchType: ruletypes.AtleastOnce, MatchType: ruletypes.AtleastOnce,
CompositeQuery: &v3.CompositeQuery{ CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypePromQL, QueryType: v3.QueryTypePromQL,
PromQueries: map[string]*v3.PromQuery{ Queries: []qbtypes.QueryEnvelope{
"A": {Query: "test_metric"}, {
Type: qbtypes.QueryTypePromQL,
Spec: qbtypes.PromQuery{Name: "A", Query: "test_metric"},
},
}, },
}, },
}, },

View File

@@ -1,38 +1,24 @@
package rules package rules
import ( import (
"bytes"
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"log/slog" "log/slog"
"math"
"net/url" "net/url"
"reflect"
"text/template"
"time" "time"
"github.com/SigNoz/signoz/pkg/contextlinks" "github.com/SigNoz/signoz/pkg/contextlinks"
"github.com/SigNoz/signoz/pkg/query-service/common"
"github.com/SigNoz/signoz/pkg/query-service/model" "github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/query-service/postprocess"
"github.com/SigNoz/signoz/pkg/transition"
"github.com/SigNoz/signoz/pkg/types/ruletypes" "github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes" "github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer" "github.com/SigNoz/signoz/pkg/valuer"
"github.com/SigNoz/signoz/pkg/query-service/app/querier"
querierV2 "github.com/SigNoz/signoz/pkg/query-service/app/querier/v2"
"github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder"
"github.com/SigNoz/signoz/pkg/query-service/interfaces" "github.com/SigNoz/signoz/pkg/query-service/interfaces"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/utils/labels" "github.com/SigNoz/signoz/pkg/query-service/utils/labels"
querytemplate "github.com/SigNoz/signoz/pkg/query-service/utils/queryTemplate"
"github.com/SigNoz/signoz/pkg/query-service/utils/times" "github.com/SigNoz/signoz/pkg/query-service/utils/times"
"github.com/SigNoz/signoz/pkg/query-service/utils/timestamp" "github.com/SigNoz/signoz/pkg/query-service/utils/timestamp"
logsv3 "github.com/SigNoz/signoz/pkg/query-service/app/logs/v3"
tracesV4 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v4"
"github.com/SigNoz/signoz/pkg/query-service/formatter" "github.com/SigNoz/signoz/pkg/query-service/formatter"
querierV5 "github.com/SigNoz/signoz/pkg/querier" querierV5 "github.com/SigNoz/signoz/pkg/querier"
@@ -42,23 +28,9 @@ import (
type ThresholdRule struct { type ThresholdRule struct {
*BaseRule *BaseRule
// Ever since we introduced the new metrics query builder, the version is "v4"
// for all the rules
// if the version is "v3", then we use the old querier
// if the version is "v4", then we use the new querierV2
version string
// querier is used for alerts created before the introduction of new metrics query builder // querierV5 is the query builder v5 querier used for all alert rule evaluation
querier interfaces.Querier
// querierV2 is used for alerts created after the introduction of new metrics query builder
querierV2 interfaces.Querier
// querierV5 is used for alerts migrated after the introduction of new query builder
querierV5 querierV5.Querier querierV5 querierV5.Querier
// used for attribute metadata enrichment for logs and traces
logsKeys map[string]v3.AttributeKey
spansKeys map[string]v3.AttributeKey
} }
var _ Rule = (*ThresholdRule)(nil) var _ Rule = (*ThresholdRule)(nil)
@@ -82,25 +54,10 @@ func NewThresholdRule(
} }
t := ThresholdRule{ t := ThresholdRule{
BaseRule: baseRule, BaseRule: baseRule,
version: p.Version, querierV5: querierV5,
} }
querierOption := querier.QuerierOptions{
Reader: reader,
Cache: nil,
KeyGenerator: queryBuilder.NewKeyGenerator(),
}
querierOptsV2 := querierV2.QuerierOptions{
Reader: reader,
Cache: nil,
KeyGenerator: queryBuilder.NewKeyGenerator(),
}
t.querier = querier.NewQuerier(querierOption)
t.querierV2 = querierV2.NewQuerier(querierOptsV2)
t.querierV5 = querierV5
t.reader = reader t.reader = reader
return &t, nil return &t, nil
} }
@@ -120,169 +77,9 @@ func (r *ThresholdRule) Type() ruletypes.RuleType {
return ruletypes.RuleTypeThreshold return ruletypes.RuleTypeThreshold
} }
func (r *ThresholdRule) prepareQueryRange(ctx context.Context, ts time.Time) (*v3.QueryRangeParamsV3, error) { func (r *ThresholdRule) prepareQueryRange(ctx context.Context, ts time.Time) (*qbtypes.QueryRangeRequest, error) {
r.logger.InfoContext( r.logger.InfoContext(
ctx, "prepare query range request v4", "ts", ts.UnixMilli(), "eval_window", r.evalWindow.Milliseconds(), "eval_delay", r.evalDelay.Milliseconds(), ctx, "prepare query range request", "ts", ts.UnixMilli(), "eval_window", r.evalWindow.Milliseconds(), "eval_delay", r.evalDelay.Milliseconds(),
)
startTs, endTs := r.Timestamps(ts)
start, end := startTs.UnixMilli(), endTs.UnixMilli()
if r.ruleCondition.QueryType() == v3.QueryTypeClickHouseSQL {
params := &v3.QueryRangeParamsV3{
Start: start,
End: end,
Step: int64(math.Max(float64(common.MinAllowedStepInterval(start, end)), 60)),
CompositeQuery: &v3.CompositeQuery{
QueryType: r.ruleCondition.CompositeQuery.QueryType,
PanelType: r.ruleCondition.CompositeQuery.PanelType,
BuilderQueries: make(map[string]*v3.BuilderQuery),
ClickHouseQueries: make(map[string]*v3.ClickHouseQuery),
PromQueries: make(map[string]*v3.PromQuery),
Unit: r.ruleCondition.CompositeQuery.Unit,
},
Variables: make(map[string]interface{}),
NoCache: true,
}
querytemplate.AssignReservedVarsV3(params)
for name, chQuery := range r.ruleCondition.CompositeQuery.ClickHouseQueries {
if chQuery.Disabled {
continue
}
tmpl := template.New("clickhouse-query")
tmpl, err := tmpl.Parse(chQuery.Query)
if err != nil {
return nil, err
}
var query bytes.Buffer
err = tmpl.Execute(&query, params.Variables)
if err != nil {
return nil, err
}
params.CompositeQuery.ClickHouseQueries[name] = &v3.ClickHouseQuery{
Query: query.String(),
Disabled: chQuery.Disabled,
Legend: chQuery.Legend,
}
}
return params, nil
}
if r.ruleCondition.CompositeQuery != nil && r.ruleCondition.CompositeQuery.BuilderQueries != nil {
for _, q := range r.ruleCondition.CompositeQuery.BuilderQueries {
// If the step interval is less than the minimum allowed step interval, set it to the minimum allowed step interval
if minStep := common.MinAllowedStepInterval(start, end); q.StepInterval < minStep {
q.StepInterval = minStep
}
q.SetShiftByFromFunc()
if q.DataSource == v3.DataSourceMetrics {
// if the time range is greater than 1 day, and less than 1 week set the step interval to be multiple of 5 minutes
// if the time range is greater than 1 week, set the step interval to be multiple of 30 mins
if end-start >= 24*time.Hour.Milliseconds() && end-start < 7*24*time.Hour.Milliseconds() {
q.StepInterval = int64(math.Round(float64(q.StepInterval)/300)) * 300
} else if end-start >= 7*24*time.Hour.Milliseconds() {
q.StepInterval = int64(math.Round(float64(q.StepInterval)/1800)) * 1800
}
}
}
}
if r.ruleCondition.CompositeQuery.PanelType != v3.PanelTypeGraph {
r.ruleCondition.CompositeQuery.PanelType = v3.PanelTypeGraph
}
// default mode
return &v3.QueryRangeParamsV3{
Start: start,
End: end,
Step: int64(math.Max(float64(common.MinAllowedStepInterval(start, end)), 60)),
CompositeQuery: r.ruleCondition.CompositeQuery,
Variables: make(map[string]interface{}),
NoCache: true,
}, nil
}
func (r *ThresholdRule) prepareLinksToLogs(ctx context.Context, ts time.Time, lbls labels.Labels) string {
if r.version == "v5" {
return r.prepareLinksToLogsV5(ctx, ts, lbls)
}
selectedQuery := r.GetSelectedQuery()
qr, err := r.prepareQueryRange(ctx, ts)
if err != nil {
return ""
}
start := time.UnixMilli(qr.Start)
end := time.UnixMilli(qr.End)
// TODO(srikanthccv): handle formula queries
if selectedQuery < "A" || selectedQuery > "Z" {
return ""
}
q := r.ruleCondition.CompositeQuery.BuilderQueries[selectedQuery]
if q == nil {
return ""
}
if q.DataSource != v3.DataSourceLogs {
return ""
}
queryFilter := []v3.FilterItem{}
if q.Filters != nil {
queryFilter = q.Filters.Items
}
filterItems := contextlinks.PrepareFilters(lbls.Map(), queryFilter, q.GroupBy, r.logsKeys)
return contextlinks.PrepareLinksToLogs(start, end, filterItems)
}
func (r *ThresholdRule) prepareLinksToTraces(ctx context.Context, ts time.Time, lbls labels.Labels) string {
if r.version == "v5" {
return r.prepareLinksToTracesV5(ctx, ts, lbls)
}
selectedQuery := r.GetSelectedQuery()
qr, err := r.prepareQueryRange(ctx, ts)
if err != nil {
return ""
}
start := time.UnixMilli(qr.Start)
end := time.UnixMilli(qr.End)
// TODO(srikanthccv): handle formula queries
if selectedQuery < "A" || selectedQuery > "Z" {
return ""
}
q := r.ruleCondition.CompositeQuery.BuilderQueries[selectedQuery]
if q == nil {
return ""
}
if q.DataSource != v3.DataSourceTraces {
return ""
}
queryFilter := []v3.FilterItem{}
if q.Filters != nil {
queryFilter = q.Filters.Items
}
filterItems := contextlinks.PrepareFilters(lbls.Map(), queryFilter, q.GroupBy, r.spansKeys)
return contextlinks.PrepareLinksToTraces(start, end, filterItems)
}
func (r *ThresholdRule) prepareQueryRangeV5(ctx context.Context, ts time.Time) (*qbtypes.QueryRangeRequest, error) {
r.logger.InfoContext(
ctx, "prepare query range request v5", "ts", ts.UnixMilli(), "eval_window", r.evalWindow.Milliseconds(), "eval_delay", r.evalDelay.Milliseconds(),
) )
startTs, endTs := r.Timestamps(ts) startTs, endTs := r.Timestamps(ts)
@@ -302,10 +99,10 @@ func (r *ThresholdRule) prepareQueryRangeV5(ctx context.Context, ts time.Time) (
return req, nil return req, nil
} }
func (r *ThresholdRule) prepareLinksToLogsV5(ctx context.Context, ts time.Time, lbls labels.Labels) string { func (r *ThresholdRule) prepareLinksToLogs(ctx context.Context, ts time.Time, lbls labels.Labels) string {
selectedQuery := r.GetSelectedQuery() selectedQuery := r.GetSelectedQuery()
qr, err := r.prepareQueryRangeV5(ctx, ts) qr, err := r.prepareQueryRange(ctx, ts)
if err != nil { if err != nil {
return "" return ""
} }
@@ -342,10 +139,10 @@ func (r *ThresholdRule) prepareLinksToLogsV5(ctx context.Context, ts time.Time,
return contextlinks.PrepareLinksToLogsV5(start, end, whereClause) return contextlinks.PrepareLinksToLogsV5(start, end, whereClause)
} }
func (r *ThresholdRule) prepareLinksToTracesV5(ctx context.Context, ts time.Time, lbls labels.Labels) string { func (r *ThresholdRule) prepareLinksToTraces(ctx context.Context, ts time.Time, lbls labels.Labels) string {
selectedQuery := r.GetSelectedQuery() selectedQuery := r.GetSelectedQuery()
qr, err := r.prepareQueryRangeV5(ctx, ts) qr, err := r.prepareQueryRange(ctx, ts)
if err != nil { if err != nil {
return "" return ""
} }
@@ -391,115 +188,6 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID,
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = r.PopulateTemporality(ctx, orgID, params)
if err != nil {
return nil, fmt.Errorf("internal error while setting temporality")
}
if params.CompositeQuery.QueryType == v3.QueryTypeBuilder {
hasLogsQuery := false
hasTracesQuery := false
for _, query := range params.CompositeQuery.BuilderQueries {
if query.DataSource == v3.DataSourceLogs {
hasLogsQuery = true
}
if query.DataSource == v3.DataSourceTraces {
hasTracesQuery = true
}
}
if hasLogsQuery {
// check if any enrichment is required for logs if yes then enrich them
if logsv3.EnrichmentRequired(params) {
logsFields, apiErr := r.reader.GetLogFieldsFromNames(ctx, logsv3.GetFieldNames(params.CompositeQuery))
if apiErr != nil {
return nil, apiErr.ToError()
}
logsKeys := model.GetLogFieldsV3(ctx, params, logsFields)
r.logsKeys = logsKeys
logsv3.Enrich(params, logsKeys)
}
}
if hasTracesQuery {
spanKeys, err := r.reader.GetSpanAttributeKeysByNames(ctx, logsv3.GetFieldNames(params.CompositeQuery))
if err != nil {
return nil, err
}
r.spansKeys = spanKeys
tracesV4.Enrich(params, spanKeys)
}
}
var results []*v3.Result
var queryErrors map[string]error
if r.version == "v4" {
results, queryErrors, err = r.querierV2.QueryRange(ctx, orgID, params)
} else {
results, queryErrors, err = r.querier.QueryRange(ctx, orgID, params)
}
if err != nil {
r.logger.ErrorContext(ctx, "failed to get alert query range result", "rule_name", r.Name(), "error", err, "query_errors", queryErrors)
return nil, fmt.Errorf("internal error while querying")
}
if params.CompositeQuery.QueryType == v3.QueryTypeBuilder {
results, err = postprocess.PostProcessResult(results, params)
if err != nil {
r.logger.ErrorContext(ctx, "failed to post process result", "rule_name", r.Name(), "error", err)
return nil, fmt.Errorf("internal error while post processing")
}
}
selectedQuery := r.GetSelectedQuery()
var queryResult *v3.Result
for _, res := range results {
if res.QueryName == selectedQuery {
queryResult = res
break
}
}
hasData := queryResult != nil && len(queryResult.Series) > 0
if missingDataAlert := r.HandleMissingDataAlert(ctx, ts, hasData); missingDataAlert != nil {
return ruletypes.Vector{*missingDataAlert}, nil
}
var resultVector ruletypes.Vector
if queryResult == nil {
r.logger.WarnContext(ctx, "query result is nil", "rule_name", r.Name(), "query_name", selectedQuery)
return resultVector, nil
}
for _, series := range queryResult.Series {
if !r.Condition().ShouldEval(series) {
r.logger.InfoContext(ctx, "not enough data points to evaluate series, skipping", "ruleid", r.ID(), "numPoints", len(series.Points), "requiredPoints", r.Condition().RequiredNumPoints)
continue
}
resultSeries, err := r.Threshold.Eval(*series, r.Unit(), ruletypes.EvalData{
ActiveAlerts: r.ActiveAlertsLabelFP(),
SendUnmatched: r.ShouldSendUnmatched(),
})
if err != nil {
return nil, err
}
resultVector = append(resultVector, resultSeries...)
}
return resultVector, nil
}
func (r *ThresholdRule) buildAndRunQueryV5(ctx context.Context, orgID valuer.UUID, ts time.Time) (ruletypes.Vector, error) {
params, err := r.prepareQueryRangeV5(ctx, ts)
if err != nil {
return nil, err
}
var results []*v3.Result
v5Result, err := r.querierV5.QueryRange(ctx, orgID, params) v5Result, err := r.querierV5.QueryRange(ctx, orgID, params)
if err != nil { if err != nil {
@@ -507,26 +195,24 @@ func (r *ThresholdRule) buildAndRunQueryV5(ctx context.Context, orgID valuer.UUI
return nil, fmt.Errorf("internal error while querying") return nil, fmt.Errorf("internal error while querying")
} }
for _, item := range v5Result.Data.Results {
if tsData, ok := item.(*qbtypes.TimeSeriesData); ok {
results = append(results, transition.ConvertV5TimeSeriesDataToV4Result(tsData))
} else {
// NOTE: should not happen but just to ensure we don't miss it if it happens for some reason
r.logger.WarnContext(ctx, "expected qbtypes.TimeSeriesData but got", "item_type", reflect.TypeOf(item))
}
}
selectedQuery := r.GetSelectedQuery() selectedQuery := r.GetSelectedQuery()
var queryResult *v3.Result var queryResult *qbtypes.TimeSeriesData
for _, res := range results { for _, item := range v5Result.Data.Results {
if res.QueryName == selectedQuery { if tsData, ok := item.(*qbtypes.TimeSeriesData); ok && tsData.QueryName == selectedQuery {
queryResult = res queryResult = tsData
break break
} }
} }
hasData := queryResult != nil && len(queryResult.Series) > 0 var allSeries []*qbtypes.TimeSeries
if queryResult != nil {
for _, bucket := range queryResult.Aggregations {
allSeries = append(allSeries, bucket.Series...)
}
}
hasData := len(allSeries) > 0
if missingDataAlert := r.HandleMissingDataAlert(ctx, ts, hasData); missingDataAlert != nil { if missingDataAlert := r.HandleMissingDataAlert(ctx, ts, hasData); missingDataAlert != nil {
return ruletypes.Vector{*missingDataAlert}, nil return ruletypes.Vector{*missingDataAlert}, nil
} }
@@ -539,7 +225,7 @@ func (r *ThresholdRule) buildAndRunQueryV5(ctx context.Context, orgID valuer.UUI
} }
// Filter out new series if newGroupEvalDelay is configured // Filter out new series if newGroupEvalDelay is configured
seriesToProcess := queryResult.Series seriesToProcess := allSeries
if r.ShouldSkipNewGroups() { if r.ShouldSkipNewGroups() {
filteredSeries, filterErr := r.BaseRule.FilterNewSeries(ctx, ts, seriesToProcess) filteredSeries, filterErr := r.BaseRule.FilterNewSeries(ctx, ts, seriesToProcess)
// In case of error we log the error and continue with the original series // In case of error we log the error and continue with the original series
@@ -552,7 +238,7 @@ func (r *ThresholdRule) buildAndRunQueryV5(ctx context.Context, orgID valuer.UUI
for _, series := range seriesToProcess { for _, series := range seriesToProcess {
if !r.Condition().ShouldEval(series) { if !r.Condition().ShouldEval(series) {
r.logger.InfoContext(ctx, "not enough data points to evaluate series, skipping", "ruleid", r.ID(), "numPoints", len(series.Points), "requiredPoints", r.Condition().RequiredNumPoints) r.logger.InfoContext(ctx, "not enough data points to evaluate series, skipping", "ruleid", r.ID(), "numPoints", len(series.Values), "requiredPoints", r.Condition().RequiredNumPoints)
continue continue
} }
resultSeries, err := r.Threshold.Eval(*series, r.Unit(), ruletypes.EvalData{ resultSeries, err := r.Threshold.Eval(*series, r.Unit(), ruletypes.EvalData{
@@ -573,16 +259,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (int, error) {
valueFormatter := formatter.FromUnit(r.Unit()) valueFormatter := formatter.FromUnit(r.Unit())
var res ruletypes.Vector res, err := r.buildAndRunQuery(ctx, r.orgID, ts)
var err error
if r.version == "v5" {
r.logger.InfoContext(ctx, "running v5 query")
res, err = r.buildAndRunQueryV5(ctx, r.orgID, ts)
} else {
r.logger.InfoContext(ctx, "running v4 query")
res, err = r.buildAndRunQuery(ctx, r.orgID, ts)
}
if err != nil { if err != nil {
return 0, err return 0, err

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -169,6 +169,7 @@ func NewSQLMigrationProviderFactories(
sqlmigration.NewAddAnonymousPublicDashboardTransactionFactory(sqlstore), sqlmigration.NewAddAnonymousPublicDashboardTransactionFactory(sqlstore),
sqlmigration.NewAddRootUserFactory(sqlstore, sqlschema), sqlmigration.NewAddRootUserFactory(sqlstore, sqlschema),
sqlmigration.NewAddUserEmailOrgIDIndexFactory(sqlstore, sqlschema), sqlmigration.NewAddUserEmailOrgIDIndexFactory(sqlstore, sqlschema),
sqlmigration.NewMigrateRulesV4ToV5Factory(sqlstore, telemetryStore),
) )
} }

View File

@@ -0,0 +1,194 @@
package sqlmigration
import (
"context"
"database/sql"
"encoding/json"
"log/slog"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/transition"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
)
type migrateRulesV4ToV5 struct {
store sqlstore.SQLStore
telemetryStore telemetrystore.TelemetryStore
logger *slog.Logger
}
func NewMigrateRulesV4ToV5Factory(
store sqlstore.SQLStore,
telemetryStore telemetrystore.TelemetryStore,
) factory.ProviderFactory[SQLMigration, Config] {
return factory.NewProviderFactory(
factory.MustNewName("migrate_rules_v4_to_v5"),
func(ctx context.Context, ps factory.ProviderSettings, c Config) (SQLMigration, error) {
return &migrateRulesV4ToV5{
store: store,
telemetryStore: telemetryStore,
logger: ps.Logger,
}, nil
})
}
func (migration *migrateRulesV4ToV5) Register(migrations *migrate.Migrations) error {
if err := migrations.Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *migrateRulesV4ToV5) getLogDuplicateKeys(ctx context.Context) ([]string, error) {
query := `
SELECT name
FROM (
SELECT DISTINCT name FROM signoz_logs.distributed_logs_attribute_keys
INTERSECT
SELECT DISTINCT name FROM signoz_logs.distributed_logs_resource_keys
)
ORDER BY name
`
rows, err := migration.telemetryStore.ClickhouseDB().Query(ctx, query)
if err != nil {
migration.logger.WarnContext(ctx, "failed to query log duplicate keys", "error", err)
return nil, nil
}
defer rows.Close()
var keys []string
for rows.Next() {
var key string
if err := rows.Scan(&key); err != nil {
migration.logger.WarnContext(ctx, "failed to scan log duplicate key", "error", err)
continue
}
keys = append(keys, key)
}
return keys, nil
}
func (migration *migrateRulesV4ToV5) getTraceDuplicateKeys(ctx context.Context) ([]string, error) {
query := `
SELECT tagKey
FROM signoz_traces.distributed_span_attributes_keys
WHERE tagType IN ('tag', 'resource')
GROUP BY tagKey
HAVING COUNT(DISTINCT tagType) > 1
ORDER BY tagKey
`
rows, err := migration.telemetryStore.ClickhouseDB().Query(ctx, query)
if err != nil {
migration.logger.WarnContext(ctx, "failed to query trace duplicate keys", "error", err)
return nil, nil
}
defer rows.Close()
var keys []string
for rows.Next() {
var key string
if err := rows.Scan(&key); err != nil {
migration.logger.WarnContext(ctx, "failed to scan trace duplicate key", "error", err)
continue
}
keys = append(keys, key)
}
return keys, nil
}
func (migration *migrateRulesV4ToV5) Up(ctx context.Context, db *bun.DB) error {
logsKeys, err := migration.getLogDuplicateKeys(ctx)
if err != nil {
return err
}
tracesKeys, err := migration.getTraceDuplicateKeys(ctx)
if err != nil {
return err
}
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer func() {
_ = tx.Rollback()
}()
var rules []struct {
ID string `bun:"id"`
Data map[string]any `bun:"data"`
}
err = tx.NewSelect().
Table("rule").
Column("id", "data").
Scan(ctx, &rules)
if err != nil {
if err == sql.ErrNoRows {
return nil
}
return err
}
alertsMigrator := transition.NewAlertMigrateV5(migration.logger, logsKeys, tracesKeys)
for _, rule := range rules {
version, _ := rule.Data["version"].(string)
if version == "v5" {
continue
}
migration.logger.InfoContext(ctx, "migrating rule v4 to v5", "rule_id", rule.ID, "current_version", version)
// Check if the queries envelope already exists and is non-empty
hasQueriesEnvelope := false
if condition, ok := rule.Data["condition"].(map[string]any); ok {
if compositeQuery, ok := condition["compositeQuery"].(map[string]any); ok {
if queries, ok := compositeQuery["queries"].([]any); ok && len(queries) > 0 {
hasQueriesEnvelope = true
}
}
}
if hasQueriesEnvelope {
// Case 2: Already has queries envelope, just bump version
migration.logger.InfoContext(ctx, "rule already has queries envelope, bumping version", "rule_id", rule.ID)
rule.Data["version"] = "v5"
} else {
// Case 1: Old format, run full migration
migration.logger.InfoContext(ctx, "rule has old format, running full migration", "rule_id", rule.ID)
alertsMigrator.Migrate(ctx, rule.Data)
// Force version to v5 regardless of Migrate return value
rule.Data["version"] = "v5"
}
dataJSON, err := json.Marshal(rule.Data)
if err != nil {
return err
}
_, err = tx.NewUpdate().
Table("rule").
Set("data = ?", string(dataJSON)).
Where("id = ?", rule.ID).
Exec(ctx)
if err != nil {
return err
}
}
return tx.Commit()
}
func (migration *migrateRulesV4ToV5) Down(ctx context.Context, db *bun.DB) error {
return nil
}

View File

@@ -1,102 +0,0 @@
package transition
import (
"fmt"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
)
// ConvertV5TimeSeriesDataToV4Result converts v5 TimeSeriesData to v4 Result
func ConvertV5TimeSeriesDataToV4Result(v5Data *qbtypes.TimeSeriesData) *v3.Result {
if v5Data == nil {
return nil
}
result := &v3.Result{
QueryName: v5Data.QueryName,
Series: make([]*v3.Series, 0),
}
toV4Series := func(ts *qbtypes.TimeSeries) *v3.Series {
series := &v3.Series{
Labels: make(map[string]string),
LabelsArray: make([]map[string]string, 0),
Points: make([]v3.Point, 0, len(ts.Values)),
}
for _, label := range ts.Labels {
valueStr := fmt.Sprintf("%v", label.Value)
series.Labels[label.Key.Name] = valueStr
}
if len(series.Labels) > 0 {
series.LabelsArray = append(series.LabelsArray, series.Labels)
}
for _, tsValue := range ts.Values {
if tsValue.Partial {
continue
}
point := v3.Point{
Timestamp: tsValue.Timestamp,
Value: tsValue.Value,
}
series.Points = append(series.Points, point)
}
return series
}
for _, aggBucket := range v5Data.Aggregations {
for _, ts := range aggBucket.Series {
result.Series = append(result.Series, toV4Series(ts))
}
if len(aggBucket.AnomalyScores) != 0 {
result.AnomalyScores = make([]*v3.Series, 0)
for _, ts := range aggBucket.AnomalyScores {
result.AnomalyScores = append(result.AnomalyScores, toV4Series(ts))
}
}
if len(aggBucket.PredictedSeries) != 0 {
result.PredictedSeries = make([]*v3.Series, 0)
for _, ts := range aggBucket.PredictedSeries {
result.PredictedSeries = append(result.PredictedSeries, toV4Series(ts))
}
}
if len(aggBucket.LowerBoundSeries) != 0 {
result.LowerBoundSeries = make([]*v3.Series, 0)
for _, ts := range aggBucket.LowerBoundSeries {
result.LowerBoundSeries = append(result.LowerBoundSeries, toV4Series(ts))
}
}
if len(aggBucket.UpperBoundSeries) != 0 {
result.UpperBoundSeries = make([]*v3.Series, 0)
for _, ts := range aggBucket.UpperBoundSeries {
result.UpperBoundSeries = append(result.UpperBoundSeries, toV4Series(ts))
}
}
}
return result
}
// ConvertV5TimeSeriesDataSliceToV4Results converts a slice of v5 TimeSeriesData to v4 QueryRangeResponse
func ConvertV5TimeSeriesDataSliceToV4Results(v5DataSlice []*qbtypes.TimeSeriesData) *v3.QueryRangeResponse {
response := &v3.QueryRangeResponse{
ResultType: "matrix", // Time series data is typically "matrix" type
Result: make([]*v3.Result, 0, len(v5DataSlice)),
}
for _, v5Data := range v5DataSlice {
if result := ConvertV5TimeSeriesDataToV4Result(v5Data); result != nil {
response.Result = append(response.Result, result)
}
}
return response
}

View File

@@ -76,6 +76,33 @@ type TimeSeries struct {
Values []*TimeSeriesValue `json:"values"` Values []*TimeSeriesValue `json:"values"`
} }
// LabelsMap converts the label slice to a map[string]string for use in
// alert evaluation helpers that operate on flat label maps.
func (ts *TimeSeries) LabelsMap() map[string]string {
if ts == nil {
return nil
}
m := make(map[string]string, len(ts.Labels))
for _, l := range ts.Labels {
m[l.Key.Name] = fmt.Sprintf("%v", l.Value)
}
return m
}
// NonPartialValues returns only the values where Partial is false.
func (ts *TimeSeries) NonPartialValues() []*TimeSeriesValue {
if ts == nil {
return nil
}
result := make([]*TimeSeriesValue, 0, len(ts.Values))
for _, v := range ts.Values {
if !v.Partial {
result = append(result, v)
}
}
return result
}
type Label struct { type Label struct {
Key telemetrytypes.TelemetryFieldKey `json:"key"` Key telemetrytypes.TelemetryFieldKey `json:"key"`
Value any `json:"value"` Value any `json:"value"`

View File

@@ -203,11 +203,11 @@ func (rc *RuleCondition) IsValid() bool {
} }
// ShouldEval checks if the further series should be evaluated at all for alerts. // ShouldEval checks if the further series should be evaluated at all for alerts.
func (rc *RuleCondition) ShouldEval(series *v3.Series) bool { func (rc *RuleCondition) ShouldEval(series *qbtypes.TimeSeries) bool {
if rc == nil { if rc == nil {
return true return true
} }
return !rc.RequireMinPoints || len(series.Points) >= rc.RequiredNumPoints return !rc.RequireMinPoints || len(series.NonPartialValues()) >= rc.RequiredNumPoints
} }
// QueryType is a shorthand method to get query type // QueryType is a shorthand method to get query type

View File

@@ -355,6 +355,14 @@ func (r *PostableRule) validate() error {
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "composite query is required")) errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "composite query is required"))
} }
if r.Version != "" && r.Version != "v5" {
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "only version v5 is supported, got %q", r.Version))
}
if r.Version == "v5" && r.RuleCondition.CompositeQuery != nil && len(r.RuleCondition.CompositeQuery.Queries) == 0 {
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "queries envelope is required in compositeQuery"))
}
if isAllQueriesDisabled(r.RuleCondition.CompositeQuery) { if isAllQueriesDisabled(r.RuleCondition.CompositeQuery) {
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "all queries are disabled in rule condition")) errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "all queries are disabled in rule condition"))
} }

View File

@@ -8,6 +8,8 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3" v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
) )
func TestIsAllQueriesDisabled(t *testing.T) { func TestIsAllQueriesDisabled(t *testing.T) {
@@ -621,9 +623,9 @@ func TestParseIntoRuleThresholdGeneration(t *testing.T) {
} }
// Test that threshold can evaluate properly // Test that threshold can evaluate properly
vector, err := threshold.Eval(v3.Series{ vector, err := threshold.Eval(qbtypes.TimeSeries{
Points: []v3.Point{{Value: 0.15, Timestamp: 1000}}, // 150ms in seconds Values: []*qbtypes.TimeSeriesValue{{Value: 0.15, Timestamp: 1000}}, // 150ms in seconds
Labels: map[string]string{"test": "label"}, Labels: []*qbtypes.Label{{Key: telemetrytypes.TelemetryFieldKey{Name: "test"}, Value: "label"}},
}, "", EvalData{}) }, "", EvalData{})
if err != nil { if err != nil {
t.Fatalf("Unexpected error in shouldAlert: %v", err) t.Fatalf("Unexpected error in shouldAlert: %v", err)
@@ -698,9 +700,9 @@ func TestParseIntoRuleMultipleThresholds(t *testing.T) {
} }
// Test with a value that should trigger both WARNING and CRITICAL thresholds // Test with a value that should trigger both WARNING and CRITICAL thresholds
vector, err := threshold.Eval(v3.Series{ vector, err := threshold.Eval(qbtypes.TimeSeries{
Points: []v3.Point{{Value: 95.0, Timestamp: 1000}}, // 95% CPU usage Values: []*qbtypes.TimeSeriesValue{{Value: 95.0, Timestamp: 1000}}, // 95% CPU usage
Labels: map[string]string{"service": "test"}, Labels: []*qbtypes.Label{{Key: telemetrytypes.TelemetryFieldKey{Name: "service"}, Value: "test"}},
}, "", EvalData{}) }, "", EvalData{})
if err != nil { if err != nil {
t.Fatalf("Unexpected error in shouldAlert: %v", err) t.Fatalf("Unexpected error in shouldAlert: %v", err)
@@ -708,9 +710,9 @@ func TestParseIntoRuleMultipleThresholds(t *testing.T) {
assert.Equal(t, 2, len(vector)) assert.Equal(t, 2, len(vector))
vector, err = threshold.Eval(v3.Series{ vector, err = threshold.Eval(qbtypes.TimeSeries{
Points: []v3.Point{{Value: 75.0, Timestamp: 1000}}, // 75% CPU usage Values: []*qbtypes.TimeSeriesValue{{Value: 75.0, Timestamp: 1000}}, // 75% CPU usage
Labels: map[string]string{"service": "test"}, Labels: []*qbtypes.Label{{Key: telemetrytypes.TelemetryFieldKey{Name: "service"}, Value: "test"}},
}, "", EvalData{}) }, "", EvalData{})
if err != nil { if err != nil {
t.Fatalf("Unexpected error in shouldAlert: %v", err) t.Fatalf("Unexpected error in shouldAlert: %v", err)
@@ -723,7 +725,7 @@ func TestAnomalyNegationEval(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
ruleJSON []byte ruleJSON []byte
series v3.Series series qbtypes.TimeSeries
shouldAlert bool shouldAlert bool
expectedValue float64 expectedValue float64
}{ }{
@@ -751,9 +753,9 @@ func TestAnomalyNegationEval(t *testing.T) {
"selectedQuery": "A" "selectedQuery": "A"
} }
}`), }`),
series: v3.Series{ series: qbtypes.TimeSeries{
Labels: map[string]string{"host": "server1"}, Labels: []*qbtypes.Label{{Key: telemetrytypes.TelemetryFieldKey{Name: "host"}, Value: "server1"}},
Points: []v3.Point{ Values: []*qbtypes.TimeSeriesValue{
{Timestamp: 1000, Value: -2.1}, // below & at least once, should alert {Timestamp: 1000, Value: -2.1}, // below & at least once, should alert
{Timestamp: 2000, Value: -2.3}, {Timestamp: 2000, Value: -2.3},
}, },
@@ -785,9 +787,9 @@ func TestAnomalyNegationEval(t *testing.T) {
"selectedQuery": "A" "selectedQuery": "A"
} }
}`), // below & at least once, no value below -2.0 }`), // below & at least once, no value below -2.0
series: v3.Series{ series: qbtypes.TimeSeries{
Labels: map[string]string{"host": "server1"}, Labels: []*qbtypes.Label{{Key: telemetrytypes.TelemetryFieldKey{Name: "host"}, Value: "server1"}},
Points: []v3.Point{ Values: []*qbtypes.TimeSeriesValue{
{Timestamp: 1000, Value: -1.9}, {Timestamp: 1000, Value: -1.9},
{Timestamp: 2000, Value: -1.8}, {Timestamp: 2000, Value: -1.8},
}, },
@@ -818,9 +820,9 @@ func TestAnomalyNegationEval(t *testing.T) {
"selectedQuery": "A" "selectedQuery": "A"
} }
}`), // above & at least once, should alert }`), // above & at least once, should alert
series: v3.Series{ series: qbtypes.TimeSeries{
Labels: map[string]string{"host": "server1"}, Labels: []*qbtypes.Label{{Key: telemetrytypes.TelemetryFieldKey{Name: "host"}, Value: "server1"}},
Points: []v3.Point{ Values: []*qbtypes.TimeSeriesValue{
{Timestamp: 1000, Value: 2.1}, // above 2.0, should alert {Timestamp: 1000, Value: 2.1}, // above 2.0, should alert
{Timestamp: 2000, Value: 2.2}, {Timestamp: 2000, Value: 2.2},
}, },
@@ -852,9 +854,9 @@ func TestAnomalyNegationEval(t *testing.T) {
"selectedQuery": "A" "selectedQuery": "A"
} }
}`), }`),
series: v3.Series{ series: qbtypes.TimeSeries{
Labels: map[string]string{"host": "server1"}, Labels: []*qbtypes.Label{{Key: telemetrytypes.TelemetryFieldKey{Name: "host"}, Value: "server1"}},
Points: []v3.Point{ Values: []*qbtypes.TimeSeriesValue{
{Timestamp: 1000, Value: 1.1}, {Timestamp: 1000, Value: 1.1},
{Timestamp: 2000, Value: 1.2}, {Timestamp: 2000, Value: 1.2},
}, },
@@ -885,9 +887,9 @@ func TestAnomalyNegationEval(t *testing.T) {
"selectedQuery": "A" "selectedQuery": "A"
} }
}`), // below and all the times }`), // below and all the times
series: v3.Series{ series: qbtypes.TimeSeries{
Labels: map[string]string{"host": "server1"}, Labels: []*qbtypes.Label{{Key: telemetrytypes.TelemetryFieldKey{Name: "host"}, Value: "server1"}},
Points: []v3.Point{ Values: []*qbtypes.TimeSeriesValue{
{Timestamp: 1000, Value: -2.1}, // all below -2 {Timestamp: 1000, Value: -2.1}, // all below -2
{Timestamp: 2000, Value: -2.2}, {Timestamp: 2000, Value: -2.2},
{Timestamp: 3000, Value: -2.5}, {Timestamp: 3000, Value: -2.5},
@@ -920,9 +922,9 @@ func TestAnomalyNegationEval(t *testing.T) {
"selectedQuery": "A" "selectedQuery": "A"
} }
}`), }`),
series: v3.Series{ series: qbtypes.TimeSeries{
Labels: map[string]string{"host": "server1"}, Labels: []*qbtypes.Label{{Key: telemetrytypes.TelemetryFieldKey{Name: "host"}, Value: "server1"}},
Points: []v3.Point{ Values: []*qbtypes.TimeSeriesValue{
{Timestamp: 1000, Value: -3.0}, {Timestamp: 1000, Value: -3.0},
{Timestamp: 2000, Value: -1.0}, // above -2, breaks condition {Timestamp: 2000, Value: -1.0}, // above -2, breaks condition
{Timestamp: 3000, Value: -2.5}, {Timestamp: 3000, Value: -2.5},
@@ -954,10 +956,10 @@ func TestAnomalyNegationEval(t *testing.T) {
"selectedQuery": "A" "selectedQuery": "A"
} }
}`), }`),
series: v3.Series{ series: qbtypes.TimeSeries{
Labels: map[string]string{"host": "server1"}, Labels: []*qbtypes.Label{{Key: telemetrytypes.TelemetryFieldKey{Name: "host"}, Value: "server1"}},
Points: []v3.Point{ Values: []*qbtypes.TimeSeriesValue{
{Timestamp: 1000, Value: -8.0}, // abs(8) >= 7, alert {Timestamp: 1000, Value: -8.0}, // abs(-8) >= 7, alert
{Timestamp: 2000, Value: 5.0}, {Timestamp: 2000, Value: 5.0},
}, },
}, },
@@ -988,9 +990,9 @@ func TestAnomalyNegationEval(t *testing.T) {
"selectedQuery": "A" "selectedQuery": "A"
} }
}`), }`),
series: v3.Series{ series: qbtypes.TimeSeries{
Labels: map[string]string{"host": "server1"}, Labels: []*qbtypes.Label{{Key: telemetrytypes.TelemetryFieldKey{Name: "host"}, Value: "server1"}},
Points: []v3.Point{ Values: []*qbtypes.TimeSeriesValue{
{Timestamp: 1000, Value: 80.0}, // below 90, should alert {Timestamp: 1000, Value: 80.0}, // below 90, should alert
{Timestamp: 2000, Value: 85.0}, {Timestamp: 2000, Value: 85.0},
}, },
@@ -1022,9 +1024,9 @@ func TestAnomalyNegationEval(t *testing.T) {
"selectedQuery": "A" "selectedQuery": "A"
} }
}`), }`),
series: v3.Series{ series: qbtypes.TimeSeries{
Labels: map[string]string{"host": "server1"}, Labels: []*qbtypes.Label{{Key: telemetrytypes.TelemetryFieldKey{Name: "host"}, Value: "server1"}},
Points: []v3.Point{ Values: []*qbtypes.TimeSeriesValue{
{Timestamp: 1000, Value: 60.0}, // below, should alert {Timestamp: 1000, Value: 60.0}, // below, should alert
{Timestamp: 2000, Value: 90.0}, {Timestamp: 2000, Value: 90.0},
}, },

View File

@@ -8,8 +8,8 @@ import (
"github.com/SigNoz/signoz/pkg/errors" "github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/converter" "github.com/SigNoz/signoz/pkg/query-service/converter"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/utils/labels" "github.com/SigNoz/signoz/pkg/query-service/utils/labels"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/valuer" "github.com/SigNoz/signoz/pkg/valuer"
) )
@@ -83,7 +83,7 @@ func (eval EvalData) HasActiveAlert(sampleLabelFp uint64) bool {
type RuleThreshold interface { type RuleThreshold interface {
// Eval runs the given series through the threshold rules // Eval runs the given series through the threshold rules
// using the given EvalData and returns the matching series // using the given EvalData and returns the matching series
Eval(series v3.Series, unit string, evalData EvalData) (Vector, error) Eval(series qbtypes.TimeSeries, unit string, evalData EvalData) (Vector, error)
GetRuleReceivers() []RuleReceivers GetRuleReceivers() []RuleReceivers
} }
@@ -122,10 +122,11 @@ func (r BasicRuleThresholds) Validate() error {
return errors.Join(errs...) return errors.Join(errs...)
} }
func (r BasicRuleThresholds) Eval(series v3.Series, unit string, evalData EvalData) (Vector, error) { func (r BasicRuleThresholds) Eval(series qbtypes.TimeSeries, unit string, evalData EvalData) (Vector, error) {
var resultVector Vector var resultVector Vector
thresholds := []BasicRuleThreshold(r) thresholds := []BasicRuleThreshold(r)
sortThresholds(thresholds) sortThresholds(thresholds)
seriesLabels := series.LabelsMap()
for _, threshold := range thresholds { for _, threshold := range thresholds {
smpl, shouldAlert := threshold.shouldAlert(series, unit) smpl, shouldAlert := threshold.shouldAlert(series, unit)
if shouldAlert { if shouldAlert {
@@ -137,15 +138,15 @@ func (r BasicRuleThresholds) Eval(series v3.Series, unit string, evalData EvalDa
resultVector = append(resultVector, smpl) resultVector = append(resultVector, smpl)
continue continue
} else if evalData.SendUnmatched { } else if evalData.SendUnmatched {
// Sanitise the series points to remove any NaN or Inf values // Sanitise the series values to remove any NaN, Inf, or partial values
series.Points = removeGroupinSetPoints(series) values := filterValidValues(series.Values)
if len(series.Points) == 0 { if len(values) == 0 {
continue continue
} }
// prepare the sample with the first point of the series // prepare the sample with the first value of the series
smpl := Sample{ smpl := Sample{
Point: Point{T: series.Points[0].Timestamp, V: series.Points[0].Value}, Point: Point{T: values[0].Timestamp, V: values[0].Value},
Metric: PrepareSampleLabelsForRule(series.Labels, threshold.Name), Metric: PrepareSampleLabelsForRule(seriesLabels, threshold.Name),
Target: *threshold.TargetValue, Target: *threshold.TargetValue,
TargetUnit: threshold.TargetUnit, TargetUnit: threshold.TargetUnit,
} }
@@ -160,7 +161,7 @@ func (r BasicRuleThresholds) Eval(series v3.Series, unit string, evalData EvalDa
if threshold.RecoveryTarget == nil { if threshold.RecoveryTarget == nil {
continue continue
} }
sampleLabels := PrepareSampleLabelsForRule(series.Labels, threshold.Name) sampleLabels := PrepareSampleLabelsForRule(seriesLabels, threshold.Name)
alertHash := sampleLabels.Hash() alertHash := sampleLabels.Hash()
// check if alert is active and then check if recovery threshold matches // check if alert is active and then check if recovery threshold matches
if evalData.HasActiveAlert(alertHash) { if evalData.HasActiveAlert(alertHash) {
@@ -255,18 +256,23 @@ func (b BasicRuleThreshold) Validate() error {
return errors.Join(errs...) return errors.Join(errs...)
} }
func (b BasicRuleThreshold) matchesRecoveryThreshold(series v3.Series, ruleUnit string) (Sample, bool) { func (b BasicRuleThreshold) matchesRecoveryThreshold(series qbtypes.TimeSeries, ruleUnit string) (Sample, bool) {
return b.shouldAlertWithTarget(series, b.recoveryTarget(ruleUnit)) return b.shouldAlertWithTarget(series, b.recoveryTarget(ruleUnit))
} }
func (b BasicRuleThreshold) shouldAlert(series v3.Series, ruleUnit string) (Sample, bool) { func (b BasicRuleThreshold) shouldAlert(series qbtypes.TimeSeries, ruleUnit string) (Sample, bool) {
return b.shouldAlertWithTarget(series, b.target(ruleUnit)) return b.shouldAlertWithTarget(series, b.target(ruleUnit))
} }
func removeGroupinSetPoints(series v3.Series) []v3.Point { // filterValidValues returns only the values that are valid for alert evaluation:
var result []v3.Point // non-partial, non-NaN, non-Inf, and with non-negative timestamps.
for _, s := range series.Points { func filterValidValues(values []*qbtypes.TimeSeriesValue) []*qbtypes.TimeSeriesValue {
if s.Timestamp >= 0 && !math.IsNaN(s.Value) && !math.IsInf(s.Value, 0) { var result []*qbtypes.TimeSeriesValue
result = append(result, s) for _, v := range values {
if v.Partial {
continue
}
if v.Timestamp >= 0 && !math.IsNaN(v.Value) && !math.IsInf(v.Value, 0) {
result = append(result, v)
} }
} }
return result return result
@@ -284,15 +290,15 @@ func PrepareSampleLabelsForRule(seriesLabels map[string]string, thresholdName st
return lb.Labels() return lb.Labels()
} }
func (b BasicRuleThreshold) shouldAlertWithTarget(series v3.Series, target float64) (Sample, bool) { func (b BasicRuleThreshold) shouldAlertWithTarget(series qbtypes.TimeSeries, target float64) (Sample, bool) {
var shouldAlert bool var shouldAlert bool
var alertSmpl Sample var alertSmpl Sample
lbls := PrepareSampleLabelsForRule(series.Labels, b.Name) lbls := PrepareSampleLabelsForRule(series.LabelsMap(), b.Name)
series.Points = removeGroupinSetPoints(series) values := filterValidValues(series.Values)
// nothing to evaluate // nothing to evaluate
if len(series.Points) == 0 { if len(values) == 0 {
return alertSmpl, false return alertSmpl, false
} }
@@ -300,7 +306,7 @@ func (b BasicRuleThreshold) shouldAlertWithTarget(series v3.Series, target float
case AtleastOnce: case AtleastOnce:
// If any sample matches the condition, the rule is firing. // If any sample matches the condition, the rule is firing.
if b.CompareOp == ValueIsAbove { if b.CompareOp == ValueIsAbove {
for _, smpl := range series.Points { for _, smpl := range values {
if smpl.Value > target { if smpl.Value > target {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls} alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
shouldAlert = true shouldAlert = true
@@ -308,7 +314,7 @@ func (b BasicRuleThreshold) shouldAlertWithTarget(series v3.Series, target float
} }
} }
} else if b.CompareOp == ValueIsBelow { } else if b.CompareOp == ValueIsBelow {
for _, smpl := range series.Points { for _, smpl := range values {
if smpl.Value < target { if smpl.Value < target {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls} alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
shouldAlert = true shouldAlert = true
@@ -316,7 +322,7 @@ func (b BasicRuleThreshold) shouldAlertWithTarget(series v3.Series, target float
} }
} }
} else if b.CompareOp == ValueIsEq { } else if b.CompareOp == ValueIsEq {
for _, smpl := range series.Points { for _, smpl := range values {
if smpl.Value == target { if smpl.Value == target {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls} alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
shouldAlert = true shouldAlert = true
@@ -324,7 +330,7 @@ func (b BasicRuleThreshold) shouldAlertWithTarget(series v3.Series, target float
} }
} }
} else if b.CompareOp == ValueIsNotEq { } else if b.CompareOp == ValueIsNotEq {
for _, smpl := range series.Points { for _, smpl := range values {
if smpl.Value != target { if smpl.Value != target {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls} alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
shouldAlert = true shouldAlert = true
@@ -332,7 +338,7 @@ func (b BasicRuleThreshold) shouldAlertWithTarget(series v3.Series, target float
} }
} }
} else if b.CompareOp == ValueOutsideBounds { } else if b.CompareOp == ValueOutsideBounds {
for _, smpl := range series.Points { for _, smpl := range values {
if math.Abs(smpl.Value) >= target { if math.Abs(smpl.Value) >= target {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls} alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
shouldAlert = true shouldAlert = true
@@ -345,7 +351,7 @@ func (b BasicRuleThreshold) shouldAlertWithTarget(series v3.Series, target float
shouldAlert = true shouldAlert = true
alertSmpl = Sample{Point: Point{V: target}, Metric: lbls} alertSmpl = Sample{Point: Point{V: target}, Metric: lbls}
if b.CompareOp == ValueIsAbove { if b.CompareOp == ValueIsAbove {
for _, smpl := range series.Points { for _, smpl := range values {
if smpl.Value <= target { if smpl.Value <= target {
shouldAlert = false shouldAlert = false
break break
@@ -354,7 +360,7 @@ func (b BasicRuleThreshold) shouldAlertWithTarget(series v3.Series, target float
// use min value from the series // use min value from the series
if shouldAlert { if shouldAlert {
var minValue float64 = math.Inf(1) var minValue float64 = math.Inf(1)
for _, smpl := range series.Points { for _, smpl := range values {
if smpl.Value < minValue { if smpl.Value < minValue {
minValue = smpl.Value minValue = smpl.Value
} }
@@ -362,7 +368,7 @@ func (b BasicRuleThreshold) shouldAlertWithTarget(series v3.Series, target float
alertSmpl = Sample{Point: Point{V: minValue}, Metric: lbls} alertSmpl = Sample{Point: Point{V: minValue}, Metric: lbls}
} }
} else if b.CompareOp == ValueIsBelow { } else if b.CompareOp == ValueIsBelow {
for _, smpl := range series.Points { for _, smpl := range values {
if smpl.Value >= target { if smpl.Value >= target {
shouldAlert = false shouldAlert = false
break break
@@ -370,7 +376,7 @@ func (b BasicRuleThreshold) shouldAlertWithTarget(series v3.Series, target float
} }
if shouldAlert { if shouldAlert {
var maxValue float64 = math.Inf(-1) var maxValue float64 = math.Inf(-1)
for _, smpl := range series.Points { for _, smpl := range values {
if smpl.Value > maxValue { if smpl.Value > maxValue {
maxValue = smpl.Value maxValue = smpl.Value
} }
@@ -378,14 +384,14 @@ func (b BasicRuleThreshold) shouldAlertWithTarget(series v3.Series, target float
alertSmpl = Sample{Point: Point{V: maxValue}, Metric: lbls} alertSmpl = Sample{Point: Point{V: maxValue}, Metric: lbls}
} }
} else if b.CompareOp == ValueIsEq { } else if b.CompareOp == ValueIsEq {
for _, smpl := range series.Points { for _, smpl := range values {
if smpl.Value != target { if smpl.Value != target {
shouldAlert = false shouldAlert = false
break break
} }
} }
} else if b.CompareOp == ValueIsNotEq { } else if b.CompareOp == ValueIsNotEq {
for _, smpl := range series.Points { for _, smpl := range values {
if smpl.Value == target { if smpl.Value == target {
shouldAlert = false shouldAlert = false
break break
@@ -393,7 +399,7 @@ func (b BasicRuleThreshold) shouldAlertWithTarget(series v3.Series, target float
} }
// use any non-inf or nan value from the series // use any non-inf or nan value from the series
if shouldAlert { if shouldAlert {
for _, smpl := range series.Points { for _, smpl := range values {
if !math.IsInf(smpl.Value, 0) && !math.IsNaN(smpl.Value) { if !math.IsInf(smpl.Value, 0) && !math.IsNaN(smpl.Value) {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls} alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
break break
@@ -401,7 +407,7 @@ func (b BasicRuleThreshold) shouldAlertWithTarget(series v3.Series, target float
} }
} }
} else if b.CompareOp == ValueOutsideBounds { } else if b.CompareOp == ValueOutsideBounds {
for _, smpl := range series.Points { for _, smpl := range values {
if math.Abs(smpl.Value) < target { if math.Abs(smpl.Value) < target {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls} alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
shouldAlert = false shouldAlert = false
@@ -412,7 +418,7 @@ func (b BasicRuleThreshold) shouldAlertWithTarget(series v3.Series, target float
case OnAverage: case OnAverage:
// If the average of all samples matches the condition, the rule is firing. // If the average of all samples matches the condition, the rule is firing.
var sum, count float64 var sum, count float64
for _, smpl := range series.Points { for _, smpl := range values {
if math.IsNaN(smpl.Value) || math.IsInf(smpl.Value, 0) { if math.IsNaN(smpl.Value) || math.IsInf(smpl.Value, 0) {
continue continue
} }
@@ -446,7 +452,7 @@ func (b BasicRuleThreshold) shouldAlertWithTarget(series v3.Series, target float
// If the sum of all samples matches the condition, the rule is firing. // If the sum of all samples matches the condition, the rule is firing.
var sum float64 var sum float64
for _, smpl := range series.Points { for _, smpl := range values {
if math.IsNaN(smpl.Value) || math.IsInf(smpl.Value, 0) { if math.IsNaN(smpl.Value) || math.IsInf(smpl.Value, 0) {
continue continue
} }
@@ -477,21 +483,22 @@ func (b BasicRuleThreshold) shouldAlertWithTarget(series v3.Series, target float
case Last: case Last:
// If the last sample matches the condition, the rule is firing. // If the last sample matches the condition, the rule is firing.
shouldAlert = false shouldAlert = false
alertSmpl = Sample{Point: Point{V: series.Points[len(series.Points)-1].Value}, Metric: lbls} lastValue := values[len(values)-1].Value
alertSmpl = Sample{Point: Point{V: lastValue}, Metric: lbls}
if b.CompareOp == ValueIsAbove { if b.CompareOp == ValueIsAbove {
if series.Points[len(series.Points)-1].Value > target { if lastValue > target {
shouldAlert = true shouldAlert = true
} }
} else if b.CompareOp == ValueIsBelow { } else if b.CompareOp == ValueIsBelow {
if series.Points[len(series.Points)-1].Value < target { if lastValue < target {
shouldAlert = true shouldAlert = true
} }
} else if b.CompareOp == ValueIsEq { } else if b.CompareOp == ValueIsEq {
if series.Points[len(series.Points)-1].Value == target { if lastValue == target {
shouldAlert = true shouldAlert = true
} }
} else if b.CompareOp == ValueIsNotEq { } else if b.CompareOp == ValueIsNotEq {
if series.Points[len(series.Points)-1].Value != target { if lastValue != target {
shouldAlert = true shouldAlert = true
} }
} }

View File

@@ -6,16 +6,24 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3" qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
) )
func TestBasicRuleThresholdEval_UnitConversion(t *testing.T) { func TestBasicRuleThresholdEval_UnitConversion(t *testing.T) {
target := 100.0 target := 100.0
makeLabel := func(name, value string) *qbtypes.Label {
return &qbtypes.Label{Key: telemetrytypes.TelemetryFieldKey{Name: name}, Value: value}
}
makeValue := func(value float64, ts int64) *qbtypes.TimeSeriesValue {
return &qbtypes.TimeSeriesValue{Value: value, Timestamp: ts}
}
tests := []struct { tests := []struct {
name string name string
threshold BasicRuleThreshold threshold BasicRuleThreshold
series v3.Series series qbtypes.TimeSeries
ruleUnit string ruleUnit string
shouldAlert bool shouldAlert bool
}{ }{
@@ -28,11 +36,9 @@ func TestBasicRuleThresholdEval_UnitConversion(t *testing.T) {
MatchType: AtleastOnce, MatchType: AtleastOnce,
CompareOp: ValueIsAbove, CompareOp: ValueIsAbove,
}, },
series: v3.Series{ series: qbtypes.TimeSeries{
Labels: map[string]string{"service": "test"}, Labels: []*qbtypes.Label{makeLabel("service", "test")},
Points: []v3.Point{ Values: []*qbtypes.TimeSeriesValue{makeValue(0.15, 1000)}, // 150ms in seconds
{Value: 0.15, Timestamp: 1000}, // 150ms in seconds
},
}, },
ruleUnit: "s", ruleUnit: "s",
shouldAlert: true, shouldAlert: true,
@@ -46,11 +52,9 @@ func TestBasicRuleThresholdEval_UnitConversion(t *testing.T) {
MatchType: AtleastOnce, MatchType: AtleastOnce,
CompareOp: ValueIsAbove, CompareOp: ValueIsAbove,
}, },
series: v3.Series{ series: qbtypes.TimeSeries{
Labels: map[string]string{"service": "test"}, Labels: []*qbtypes.Label{makeLabel("service", "test")},
Points: []v3.Point{ Values: []*qbtypes.TimeSeriesValue{makeValue(0.05, 1000)}, // 50ms in seconds
{Value: 0.05, Timestamp: 1000}, // 50ms in seconds
},
}, },
ruleUnit: "s", ruleUnit: "s",
shouldAlert: false, shouldAlert: false,
@@ -64,11 +68,9 @@ func TestBasicRuleThresholdEval_UnitConversion(t *testing.T) {
MatchType: AtleastOnce, MatchType: AtleastOnce,
CompareOp: ValueIsAbove, CompareOp: ValueIsAbove,
}, },
series: v3.Series{ series: qbtypes.TimeSeries{
Labels: map[string]string{"service": "test"}, Labels: []*qbtypes.Label{makeLabel("service", "test")},
Points: []v3.Point{ Values: []*qbtypes.TimeSeriesValue{makeValue(150000, 1000)}, // 150000ms = 150s
{Value: 150000, Timestamp: 1000}, // 150000ms = 150s
},
}, },
ruleUnit: "ms", ruleUnit: "ms",
shouldAlert: true, shouldAlert: true,
@@ -83,11 +85,9 @@ func TestBasicRuleThresholdEval_UnitConversion(t *testing.T) {
MatchType: AtleastOnce, MatchType: AtleastOnce,
CompareOp: ValueIsAbove, CompareOp: ValueIsAbove,
}, },
series: v3.Series{ series: qbtypes.TimeSeries{
Labels: map[string]string{"service": "test"}, Labels: []*qbtypes.Label{makeLabel("service", "test")},
Points: []v3.Point{ Values: []*qbtypes.TimeSeriesValue{makeValue(0.15, 1000)}, // 0.15KiB ≈ 153.6 bytes
{Value: 0.15, Timestamp: 1000}, // 0.15KiB ≈ 153.6 bytes
},
}, },
ruleUnit: "kbytes", ruleUnit: "kbytes",
shouldAlert: true, shouldAlert: true,
@@ -101,11 +101,9 @@ func TestBasicRuleThresholdEval_UnitConversion(t *testing.T) {
MatchType: AtleastOnce, MatchType: AtleastOnce,
CompareOp: ValueIsAbove, CompareOp: ValueIsAbove,
}, },
series: v3.Series{ series: qbtypes.TimeSeries{
Labels: map[string]string{"service": "test"}, Labels: []*qbtypes.Label{makeLabel("service", "test")},
Points: []v3.Point{ Values: []*qbtypes.TimeSeriesValue{makeValue(0.15, 1000)},
{Value: 0.15, Timestamp: 1000},
},
}, },
ruleUnit: "mbytes", ruleUnit: "mbytes",
shouldAlert: true, shouldAlert: true,
@@ -120,11 +118,9 @@ func TestBasicRuleThresholdEval_UnitConversion(t *testing.T) {
MatchType: AtleastOnce, MatchType: AtleastOnce,
CompareOp: ValueIsBelow, CompareOp: ValueIsBelow,
}, },
series: v3.Series{ series: qbtypes.TimeSeries{
Labels: map[string]string{"service": "test"}, Labels: []*qbtypes.Label{makeLabel("service", "test")},
Points: []v3.Point{ Values: []*qbtypes.TimeSeriesValue{makeValue(0.05, 1000)}, // 50ms in seconds
{Value: 0.05, Timestamp: 1000}, // 50ms in seconds
},
}, },
ruleUnit: "s", ruleUnit: "s",
shouldAlert: true, shouldAlert: true,
@@ -138,12 +134,12 @@ func TestBasicRuleThresholdEval_UnitConversion(t *testing.T) {
MatchType: OnAverage, MatchType: OnAverage,
CompareOp: ValueIsAbove, CompareOp: ValueIsAbove,
}, },
series: v3.Series{ series: qbtypes.TimeSeries{
Labels: map[string]string{"service": "test"}, Labels: []*qbtypes.Label{makeLabel("service", "test")},
Points: []v3.Point{ Values: []*qbtypes.TimeSeriesValue{
{Value: 0.08, Timestamp: 1000}, // 80ms makeValue(0.08, 1000), // 80ms
{Value: 0.12, Timestamp: 2000}, // 120ms makeValue(0.12, 2000), // 120ms
{Value: 0.15, Timestamp: 3000}, // 150ms makeValue(0.15, 3000), // 150ms
}, },
}, },
ruleUnit: "s", ruleUnit: "s",
@@ -158,12 +154,12 @@ func TestBasicRuleThresholdEval_UnitConversion(t *testing.T) {
MatchType: InTotal, MatchType: InTotal,
CompareOp: ValueIsAbove, CompareOp: ValueIsAbove,
}, },
series: v3.Series{ series: qbtypes.TimeSeries{
Labels: map[string]string{"service": "test"}, Labels: []*qbtypes.Label{makeLabel("service", "test")},
Points: []v3.Point{ Values: []*qbtypes.TimeSeriesValue{
{Value: 0.04, Timestamp: 1000}, // 40MB makeValue(0.04, 1000), // 40MB
{Value: 0.05, Timestamp: 2000}, // 50MB makeValue(0.05, 2000), // 50MB
{Value: 0.03, Timestamp: 3000}, // 30MB makeValue(0.03, 3000), // 30MB
}, },
}, },
ruleUnit: "decgbytes", ruleUnit: "decgbytes",
@@ -178,12 +174,12 @@ func TestBasicRuleThresholdEval_UnitConversion(t *testing.T) {
MatchType: AllTheTimes, MatchType: AllTheTimes,
CompareOp: ValueIsAbove, CompareOp: ValueIsAbove,
}, },
series: v3.Series{ series: qbtypes.TimeSeries{
Labels: map[string]string{"service": "test"}, Labels: []*qbtypes.Label{makeLabel("service", "test")},
Points: []v3.Point{ Values: []*qbtypes.TimeSeriesValue{
{Value: 0.11, Timestamp: 1000}, // 110ms makeValue(0.11, 1000), // 110ms
{Value: 0.12, Timestamp: 2000}, // 120ms makeValue(0.12, 2000), // 120ms
{Value: 0.15, Timestamp: 3000}, // 150ms makeValue(0.15, 3000), // 150ms
}, },
}, },
ruleUnit: "s", ruleUnit: "s",
@@ -198,11 +194,11 @@ func TestBasicRuleThresholdEval_UnitConversion(t *testing.T) {
MatchType: Last, MatchType: Last,
CompareOp: ValueIsAbove, CompareOp: ValueIsAbove,
}, },
series: v3.Series{ series: qbtypes.TimeSeries{
Labels: map[string]string{"service": "test"}, Labels: []*qbtypes.Label{makeLabel("service", "test")},
Points: []v3.Point{ Values: []*qbtypes.TimeSeriesValue{
{Value: 0.15, Timestamp: 1000}, // 150kB makeValue(0.15, 1000), // 150kB
{Value: 0.05, Timestamp: 2000}, // 50kB (last value) makeValue(0.05, 2000), // 50kB (last value)
}, },
}, },
ruleUnit: "decmbytes", ruleUnit: "decmbytes",
@@ -218,11 +214,9 @@ func TestBasicRuleThresholdEval_UnitConversion(t *testing.T) {
MatchType: AtleastOnce, MatchType: AtleastOnce,
CompareOp: ValueIsAbove, CompareOp: ValueIsAbove,
}, },
series: v3.Series{ series: qbtypes.TimeSeries{
Labels: map[string]string{"service": "test"}, Labels: []*qbtypes.Label{makeLabel("service", "test")},
Points: []v3.Point{ Values: []*qbtypes.TimeSeriesValue{makeValue(0.15, 1000)},
{Value: 0.15, Timestamp: 1000},
},
}, },
ruleUnit: "KBs", ruleUnit: "KBs",
shouldAlert: true, shouldAlert: true,
@@ -237,11 +231,9 @@ func TestBasicRuleThresholdEval_UnitConversion(t *testing.T) {
MatchType: AtleastOnce, MatchType: AtleastOnce,
CompareOp: ValueIsAbove, CompareOp: ValueIsAbove,
}, },
series: v3.Series{ series: qbtypes.TimeSeries{
Labels: map[string]string{"service": "test"}, Labels: []*qbtypes.Label{makeLabel("service", "test")},
Points: []v3.Point{ Values: []*qbtypes.TimeSeriesValue{makeValue(150, 1000)}, // 150ms
{Value: 150, Timestamp: 1000}, // 150ms
},
}, },
ruleUnit: "ms", ruleUnit: "ms",
shouldAlert: true, shouldAlert: true,
@@ -256,11 +248,9 @@ func TestBasicRuleThresholdEval_UnitConversion(t *testing.T) {
MatchType: AtleastOnce, MatchType: AtleastOnce,
CompareOp: ValueIsAbove, CompareOp: ValueIsAbove,
}, },
series: v3.Series{ series: qbtypes.TimeSeries{
Labels: map[string]string{"service": "test"}, Labels: []*qbtypes.Label{makeLabel("service", "test")},
Points: []v3.Point{ Values: []*qbtypes.TimeSeriesValue{makeValue(150, 1000)}, // 150 (unitless)
{Value: 150, Timestamp: 1000}, // 150 (unitless)
},
}, },
ruleUnit: "", ruleUnit: "",
shouldAlert: true, shouldAlert: true,