Compare commits

..

15 Commits

Author SHA1 Message Date
Piyush Singariya
cb71c9c3f7 Merge branch 'main' into fix/array-json 2026-03-31 15:42:09 +05:30
Piyush Singariya
1cd4ce6509 Merge branch 'main' into fix/array-json 2026-03-31 14:55:36 +05:30
Piyush Singariya
9299c8ab18 fix: indexed unit tests 2026-03-30 15:47:47 +05:30
Piyush Singariya
24749de269 fix: comment 2026-03-30 15:16:28 +05:30
Piyush Singariya
39098ec3f4 fix: unit tests 2026-03-30 15:12:17 +05:30
Piyush Singariya
fe554f5c94 fix: remove not used paths from testdata 2026-03-30 14:24:48 +05:30
Piyush Singariya
8a60a041a6 fix: unit tests 2026-03-30 14:14:49 +05:30
Piyush Singariya
541f19c34a fix: array type filtering from dynamic arrays 2026-03-30 12:59:31 +05:30
Piyush Singariya
010db03d6e fix: indexed tests passing 2026-03-30 12:24:26 +05:30
Piyush Singariya
5408acbd8c fix: primitive conditions working 2026-03-30 12:01:35 +05:30
Piyush Singariya
0de6c85f81 feat: align negative operators to include other logs 2026-03-28 10:30:11 +05:30
Piyush Singariya
69ec24fa05 test: fix unit tests 2026-03-27 15:12:49 +05:30
Piyush Singariya
539d732b65 fix: contextual path index usage 2026-03-27 14:44:51 +05:30
Piyush Singariya
843d5fb199 Merge branch 'main' into feat/json-index 2026-03-27 14:17:52 +05:30
Piyush Singariya
fabdfb8cc1 feat: enable JSON Path index 2026-03-27 14:07:37 +05:30
57 changed files with 5420 additions and 4614 deletions

View File

@@ -49,6 +49,7 @@ import (
opAmpModel "github.com/SigNoz/signoz/pkg/query-service/app/opamp/model"
baseconst "github.com/SigNoz/signoz/pkg/query-service/constants"
"github.com/SigNoz/signoz/pkg/query-service/healthcheck"
baseint "github.com/SigNoz/signoz/pkg/query-service/interfaces"
baserules "github.com/SigNoz/signoz/pkg/query-service/rules"
"github.com/SigNoz/signoz/pkg/query-service/utils"
)
@@ -98,6 +99,7 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
)
rm, err := makeRulesManager(
reader,
signoz.Cache,
signoz.Alertmanager,
signoz.SQLStore,
@@ -343,7 +345,7 @@ func (s *Server) Stop(ctx context.Context) error {
return nil
}
func makeRulesManager(cache cache.Cache, alertmanager alertmanager.Alertmanager, sqlstore sqlstore.SQLStore, telemetryStore telemetrystore.TelemetryStore, metadataStore telemetrytypes.MetadataStore, prometheus prometheus.Prometheus, orgGetter organization.Getter, ruleStateHistoryModule rulestatehistory.Module, querier querier.Querier, providerSettings factory.ProviderSettings, queryParser queryparser.QueryParser) (*baserules.Manager, error) {
func makeRulesManager(ch baseint.Reader, cache cache.Cache, alertmanager alertmanager.Alertmanager, sqlstore sqlstore.SQLStore, telemetryStore telemetrystore.TelemetryStore, metadataStore telemetrytypes.MetadataStore, prometheus prometheus.Prometheus, orgGetter organization.Getter, ruleStateHistoryModule rulestatehistory.Module, querier querier.Querier, providerSettings factory.ProviderSettings, queryParser queryparser.QueryParser) (*baserules.Manager, error) {
ruleStore := sqlrulestore.NewRuleStore(sqlstore, queryParser, providerSettings)
maintenanceStore := sqlrulestore.NewMaintenanceStore(sqlstore)
// create manager opts
@@ -352,6 +354,7 @@ func makeRulesManager(cache cache.Cache, alertmanager alertmanager.Alertmanager,
MetadataStore: metadataStore,
Prometheus: prometheus,
Context: context.Background(),
Reader: ch,
Querier: querier,
Logger: providerSettings.Logger,
Cache: cache,
@@ -362,7 +365,7 @@ func makeRulesManager(cache cache.Cache, alertmanager alertmanager.Alertmanager,
OrgGetter: orgGetter,
RuleStore: ruleStore,
MaintenanceStore: maintenanceStore,
SQLStore: sqlstore,
SqlStore: sqlstore,
QueryParser: queryParser,
RuleStateHistoryModule: ruleStateHistoryModule,
}

View File

@@ -5,30 +5,41 @@ import (
"encoding/json"
"fmt"
"log/slog"
"math"
"strings"
"sync"
"time"
"github.com/SigNoz/signoz/ee/query-service/anomaly"
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/types/rulestatehistorytypes"
"github.com/SigNoz/signoz/pkg/query-service/common"
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/transition"
"github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/valuer"
querierV2 "github.com/SigNoz/signoz/pkg/query-service/app/querier/v2"
"github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder"
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
"github.com/SigNoz/signoz/pkg/query-service/utils/times"
"github.com/SigNoz/signoz/pkg/query-service/utils/timestamp"
"github.com/SigNoz/signoz/pkg/units"
baserules "github.com/SigNoz/signoz/pkg/query-service/rules"
querierV5 "github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/ee/anomaly"
anomalyV2 "github.com/SigNoz/signoz/ee/anomaly"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
)
var (
RuleTypeAnomaly = ruletypes.RuleType{String: valuer.NewString("anomaly_rule")}
const (
RuleTypeAnomaly = "anomaly_rule"
)
type AnomalyRule struct {
@@ -36,10 +47,16 @@ type AnomalyRule struct {
mtx sync.Mutex
// querierV5 is used for alerts migrated after the introduction of new query builder
querier querierV5.Querier
reader interfaces.Reader
provider anomaly.Provider
// querierV2 is used for alerts created after the introduction of new metrics query builder
querierV2 interfaces.Querier
// querierV5 is used for alerts migrated after the introduction of new query builder
querierV5 querierV5.Querier
provider anomaly.Provider
providerV2 anomalyV2.Provider
version string
logger *slog.Logger
@@ -53,7 +70,8 @@ func NewAnomalyRule(
id string,
orgID valuer.UUID,
p *ruletypes.PostableRule,
querier querier.Querier,
reader interfaces.Reader,
querierV5 querierV5.Querier,
logger *slog.Logger,
cache cache.Cache,
opts ...baserules.RuleOption,
@@ -63,7 +81,7 @@ func NewAnomalyRule(
opts = append(opts, baserules.WithLogger(logger))
baseRule, err := baserules.NewBaseRule(id, orgID, p, opts...)
baseRule, err := baserules.NewBaseRule(id, orgID, p, reader, opts...)
if err != nil {
return nil, err
}
@@ -83,26 +101,54 @@ func NewAnomalyRule(
t.seasonality = anomaly.SeasonalityDaily
}
logger.Info("using seasonality", "seasonality", t.seasonality)
logger.Info("using seasonality", "seasonality", t.seasonality.String())
querierOptsV2 := querierV2.QuerierOptions{
Reader: reader,
Cache: cache,
KeyGenerator: queryBuilder.NewKeyGenerator(),
}
t.querierV2 = querierV2.NewQuerier(querierOptsV2)
t.reader = reader
if t.seasonality == anomaly.SeasonalityHourly {
t.provider = anomaly.NewHourlyProvider(
anomaly.WithQuerier[*anomaly.HourlyProvider](querier),
anomaly.WithLogger[*anomaly.HourlyProvider](logger),
anomaly.WithCache[*anomaly.HourlyProvider](cache),
anomaly.WithKeyGenerator[*anomaly.HourlyProvider](queryBuilder.NewKeyGenerator()),
anomaly.WithReader[*anomaly.HourlyProvider](reader),
)
} else if t.seasonality == anomaly.SeasonalityDaily {
t.provider = anomaly.NewDailyProvider(
anomaly.WithQuerier[*anomaly.DailyProvider](querier),
anomaly.WithLogger[*anomaly.DailyProvider](logger),
anomaly.WithCache[*anomaly.DailyProvider](cache),
anomaly.WithKeyGenerator[*anomaly.DailyProvider](queryBuilder.NewKeyGenerator()),
anomaly.WithReader[*anomaly.DailyProvider](reader),
)
} else if t.seasonality == anomaly.SeasonalityWeekly {
t.provider = anomaly.NewWeeklyProvider(
anomaly.WithQuerier[*anomaly.WeeklyProvider](querier),
anomaly.WithLogger[*anomaly.WeeklyProvider](logger),
anomaly.WithCache[*anomaly.WeeklyProvider](cache),
anomaly.WithKeyGenerator[*anomaly.WeeklyProvider](queryBuilder.NewKeyGenerator()),
anomaly.WithReader[*anomaly.WeeklyProvider](reader),
)
}
t.querier = querier
if t.seasonality == anomaly.SeasonalityHourly {
t.providerV2 = anomalyV2.NewHourlyProvider(
anomalyV2.WithQuerier[*anomalyV2.HourlyProvider](querierV5),
anomalyV2.WithLogger[*anomalyV2.HourlyProvider](logger),
)
} else if t.seasonality == anomaly.SeasonalityDaily {
t.providerV2 = anomalyV2.NewDailyProvider(
anomalyV2.WithQuerier[*anomalyV2.DailyProvider](querierV5),
anomalyV2.WithLogger[*anomalyV2.DailyProvider](logger),
)
} else if t.seasonality == anomaly.SeasonalityWeekly {
t.providerV2 = anomalyV2.NewWeeklyProvider(
anomalyV2.WithQuerier[*anomalyV2.WeeklyProvider](querierV5),
anomalyV2.WithLogger[*anomalyV2.WeeklyProvider](logger),
)
}
t.querierV5 = querierV5
t.version = p.Version
t.logger = logger
return &t, nil
@@ -112,7 +158,34 @@ func (r *AnomalyRule) Type() ruletypes.RuleType {
return RuleTypeAnomaly
}
func (r *AnomalyRule) prepareQueryRange(ctx context.Context, ts time.Time) (*qbtypes.QueryRangeRequest, error) {
func (r *AnomalyRule) prepareQueryRange(ctx context.Context, ts time.Time) (*v3.QueryRangeParamsV3, error) {
r.logger.InfoContext(
ctx, "prepare query range request v4", "ts", ts.UnixMilli(), "eval_window", r.EvalWindow().Milliseconds(), "eval_delay", r.EvalDelay().Milliseconds(),
)
st, en := r.Timestamps(ts)
start := st.UnixMilli()
end := en.UnixMilli()
compositeQuery := r.Condition().CompositeQuery
if compositeQuery.PanelType != v3.PanelTypeGraph {
compositeQuery.PanelType = v3.PanelTypeGraph
}
// default mode
return &v3.QueryRangeParamsV3{
Start: start,
End: end,
Step: int64(math.Max(float64(common.MinAllowedStepInterval(start, end)), 60)),
CompositeQuery: compositeQuery,
Variables: make(map[string]interface{}, 0),
NoCache: false,
}, nil
}
func (r *AnomalyRule) prepareQueryRangeV5(ctx context.Context, ts time.Time) (*qbtypes.QueryRangeRequest, error) {
r.logger.InfoContext(ctx, "prepare query range request v5", "ts", ts.UnixMilli(), "eval_window", r.EvalWindow().Milliseconds(), "eval_delay", r.EvalDelay().Milliseconds())
@@ -134,7 +207,7 @@ func (r *AnomalyRule) prepareQueryRange(ctx context.Context, ts time.Time) (*qbt
}
func (r *AnomalyRule) GetSelectedQuery() string {
return r.Condition().SelectedQueryName()
return r.Condition().GetSelectedQueryName()
}
func (r *AnomalyRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID, ts time.Time) (ruletypes.Vector, error) {
@@ -143,16 +216,20 @@ func (r *AnomalyRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID, t
if err != nil {
return nil, err
}
err = r.PopulateTemporality(ctx, orgID, params)
if err != nil {
return nil, fmt.Errorf("internal error while setting temporality")
}
anomalies, err := r.provider.GetAnomalies(ctx, orgID, &anomaly.AnomaliesRequest{
Params: *params,
anomalies, err := r.provider.GetAnomalies(ctx, orgID, &anomaly.GetAnomaliesRequest{
Params: params,
Seasonality: r.seasonality,
})
if err != nil {
return nil, err
}
var queryResult *qbtypes.TimeSeriesData
var queryResult *v3.Result
for _, result := range anomalies.Results {
if result.QueryName == r.GetSelectedQuery() {
queryResult = result
@@ -160,26 +237,74 @@ func (r *AnomalyRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID, t
}
}
if queryResult == nil {
r.logger.WarnContext(ctx, "nil qb result", "ts", ts.UnixMilli())
return ruletypes.Vector{}, nil
}
hasData := len(queryResult.Aggregations) > 0 &&
queryResult.Aggregations[0] != nil &&
len(queryResult.Aggregations[0].AnomalyScores) > 0
hasData := len(queryResult.AnomalyScores) > 0
if missingDataAlert := r.HandleMissingDataAlert(ctx, ts, hasData); missingDataAlert != nil {
return ruletypes.Vector{*missingDataAlert}, nil
}
var resultVector ruletypes.Vector
scoresJSON, _ := json.Marshal(queryResult.Aggregations[0].AnomalyScores)
scoresJSON, _ := json.Marshal(queryResult.AnomalyScores)
r.logger.InfoContext(ctx, "anomaly scores", "scores", string(scoresJSON))
for _, series := range queryResult.AnomalyScores {
if !r.Condition().ShouldEval(series) {
r.logger.InfoContext(ctx, "not enough data points to evaluate series, skipping", "ruleid", r.ID(), "numPoints", len(series.Points), "requiredPoints", r.Condition().RequiredNumPoints)
continue
}
results, err := r.Threshold.Eval(*series, r.Unit(), ruletypes.EvalData{
ActiveAlerts: r.ActiveAlertsLabelFP(),
SendUnmatched: r.ShouldSendUnmatched(),
})
if err != nil {
return nil, err
}
resultVector = append(resultVector, results...)
}
return resultVector, nil
}
func (r *AnomalyRule) buildAndRunQueryV5(ctx context.Context, orgID valuer.UUID, ts time.Time) (ruletypes.Vector, error) {
params, err := r.prepareQueryRangeV5(ctx, ts)
if err != nil {
return nil, err
}
anomalies, err := r.providerV2.GetAnomalies(ctx, orgID, &anomalyV2.AnomaliesRequest{
Params: *params,
Seasonality: anomalyV2.Seasonality{String: valuer.NewString(r.seasonality.String())},
})
if err != nil {
return nil, err
}
var qbResult *qbtypes.TimeSeriesData
for _, result := range anomalies.Results {
if result.QueryName == r.GetSelectedQuery() {
qbResult = result
break
}
}
if qbResult == nil {
r.logger.WarnContext(ctx, "nil qb result", "ts", ts.UnixMilli())
}
queryResult := transition.ConvertV5TimeSeriesDataToV4Result(qbResult)
hasData := len(queryResult.AnomalyScores) > 0
if missingDataAlert := r.HandleMissingDataAlert(ctx, ts, hasData); missingDataAlert != nil {
return ruletypes.Vector{*missingDataAlert}, nil
}
var resultVector ruletypes.Vector
scoresJSON, _ := json.Marshal(queryResult.AnomalyScores)
r.logger.InfoContext(ctx, "anomaly scores", "scores", string(scoresJSON))
// Filter out new series if newGroupEvalDelay is configured
seriesToProcess := queryResult.Aggregations[0].AnomalyScores
seriesToProcess := queryResult.AnomalyScores
if r.ShouldSkipNewGroups() {
filteredSeries, filterErr := r.BaseRule.FilterNewSeries(ctx, ts, seriesToProcess)
// In case of error we log the error and continue with the original series
@@ -192,10 +317,10 @@ func (r *AnomalyRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID, t
for _, series := range seriesToProcess {
if !r.Condition().ShouldEval(series) {
r.logger.InfoContext(ctx, "not enough data points to evaluate series, skipping", "ruleid", r.ID(), "numPoints", len(series.Values), "requiredPoints", r.Condition().RequiredNumPoints)
r.logger.InfoContext(ctx, "not enough data points to evaluate series, skipping", "ruleid", r.ID(), "numPoints", len(series.Points), "requiredPoints", r.Condition().RequiredNumPoints)
continue
}
results, err := r.Threshold.Eval(series, r.Unit(), ruletypes.EvalData{
results, err := r.Threshold.Eval(*series, r.Unit(), ruletypes.EvalData{
ActiveAlerts: r.ActiveAlertsLabelFP(),
SendUnmatched: r.ShouldSendUnmatched(),
})
@@ -216,9 +341,13 @@ func (r *AnomalyRule) Eval(ctx context.Context, ts time.Time) (int, error) {
var res ruletypes.Vector
var err error
r.logger.InfoContext(ctx, "running v5 query")
res, err = r.buildAndRunQuery(ctx, r.OrgID(), ts)
if r.version == "v5" {
r.logger.InfoContext(ctx, "running v5 query")
res, err = r.buildAndRunQueryV5(ctx, r.OrgID(), ts)
} else {
r.logger.InfoContext(ctx, "running v4 query")
res, err = r.buildAndRunQuery(ctx, r.OrgID(), ts)
}
if err != nil {
return 0, err
}
@@ -257,6 +386,7 @@ func (r *AnomalyRule) Eval(ctx context.Context, ts time.Time) (int, error) {
defs+text,
"__alert_"+r.Name(),
tmplData,
times.Time(timestamp.FromTime(ts)),
nil,
)
result, err := tmpl.Expand()
@@ -267,24 +397,24 @@ func (r *AnomalyRule) Eval(ctx context.Context, ts time.Time) (int, error) {
return result
}
lb := ruletypes.NewBuilder(smpl.Metric...).Del(ruletypes.MetricNameLabel).Del(ruletypes.TemporalityLabel)
resultLabels := ruletypes.NewBuilder(smpl.Metric...).Del(ruletypes.MetricNameLabel).Del(ruletypes.TemporalityLabel).Labels()
lb := labels.NewBuilder(smpl.Metric).Del(labels.MetricNameLabel).Del(labels.TemporalityLabel)
resultLabels := labels.NewBuilder(smpl.Metric).Del(labels.MetricNameLabel).Del(labels.TemporalityLabel).Labels()
for name, value := range r.Labels().Map() {
lb.Set(name, expand(value))
}
lb.Set(ruletypes.AlertNameLabel, r.Name())
lb.Set(ruletypes.AlertRuleIDLabel, r.ID())
lb.Set(ruletypes.RuleSourceLabel, r.GeneratorURL())
lb.Set(labels.AlertNameLabel, r.Name())
lb.Set(labels.AlertRuleIdLabel, r.ID())
lb.Set(labels.RuleSourceLabel, r.GeneratorURL())
annotations := make(ruletypes.Labels, 0, len(r.Annotations().Map()))
annotations := make(labels.Labels, 0, len(r.Annotations().Map()))
for name, value := range r.Annotations().Map() {
annotations = append(annotations, ruletypes.Label{Name: name, Value: expand(value)})
annotations = append(annotations, labels.Label{Name: name, Value: expand(value)})
}
if smpl.IsMissing {
lb.Set(ruletypes.AlertNameLabel, "[No data] "+r.Name())
lb.Set(ruletypes.NoDataLabel, "true")
lb.Set(labels.AlertNameLabel, "[No data] "+r.Name())
lb.Set(labels.NoDataLabel, "true")
}
lbs := lb.Labels()
@@ -293,16 +423,16 @@ func (r *AnomalyRule) Eval(ctx context.Context, ts time.Time) (int, error) {
if _, ok := alerts[h]; ok {
r.logger.ErrorContext(ctx, "the alert query returns duplicate records", "rule_id", r.ID(), "alert", alerts[h])
err = errors.NewInternalf(errors.CodeInternal, "duplicate alert found, vector contains metrics with the same labelset after applying alert labels")
err = fmt.Errorf("duplicate alert found, vector contains metrics with the same labelset after applying alert labels")
return 0, err
}
alerts[h] = &ruletypes.Alert{
Labels: lbs,
QueryResultLabels: resultLabels,
QueryResultLables: resultLabels,
Annotations: annotations,
ActiveAt: ts,
State: ruletypes.StatePending,
State: model.StatePending,
Value: smpl.V,
GeneratorURL: r.GeneratorURL(),
Receivers: ruleReceiverMap[lbs.Map()[ruletypes.LabelThresholdName]],
@@ -316,7 +446,7 @@ func (r *AnomalyRule) Eval(ctx context.Context, ts time.Time) (int, error) {
for h, a := range alerts {
// Check whether we already have alerting state for the identifying label set.
// Update the last value and annotations if so, create a new alert entry otherwise.
if alert, ok := r.Active[h]; ok && alert.State != ruletypes.StateInactive {
if alert, ok := r.Active[h]; ok && alert.State != model.StateInactive {
alert.Value = a.Value
alert.Annotations = a.Annotations
@@ -332,76 +462,76 @@ func (r *AnomalyRule) Eval(ctx context.Context, ts time.Time) (int, error) {
r.Active[h] = a
}
itemsToAdd := []rulestatehistorytypes.RuleStateHistory{}
itemsToAdd := []model.RuleStateHistory{}
// Check if any pending alerts should be removed or fire now. Write out alert timeseries.
for fp, a := range r.Active {
labelsJSON, err := json.Marshal(a.QueryResultLabels)
labelsJSON, err := json.Marshal(a.QueryResultLables)
if err != nil {
r.logger.ErrorContext(ctx, "error marshaling labels", errors.Attr(err), "labels", a.Labels)
}
if _, ok := resultFPs[fp]; !ok {
// If the alert was previously firing, keep it around for a given
// retention time so it is reported as resolved to the AlertManager.
if a.State == ruletypes.StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > ruletypes.ResolvedRetention) {
if a.State == model.StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > ruletypes.ResolvedRetention) {
delete(r.Active, fp)
}
if a.State != ruletypes.StateInactive {
a.State = ruletypes.StateInactive
if a.State != model.StateInactive {
a.State = model.StateInactive
a.ResolvedAt = ts
itemsToAdd = append(itemsToAdd, rulestatehistorytypes.RuleStateHistory{
itemsToAdd = append(itemsToAdd, model.RuleStateHistory{
RuleID: r.ID(),
RuleName: r.Name(),
State: ruletypes.StateInactive,
State: model.StateInactive,
StateChanged: true,
UnixMilli: ts.UnixMilli(),
Labels: rulestatehistorytypes.LabelsString(labelsJSON),
Fingerprint: a.QueryResultLabels.Hash(),
Labels: model.LabelsString(labelsJSON),
Fingerprint: a.QueryResultLables.Hash(),
Value: a.Value,
})
}
continue
}
if a.State == ruletypes.StatePending && ts.Sub(a.ActiveAt) >= r.HoldDuration().Duration() {
a.State = ruletypes.StateFiring
if a.State == model.StatePending && ts.Sub(a.ActiveAt) >= r.HoldDuration().Duration() {
a.State = model.StateFiring
a.FiredAt = ts
state := ruletypes.StateFiring
state := model.StateFiring
if a.Missing {
state = ruletypes.StateNoData
state = model.StateNoData
}
itemsToAdd = append(itemsToAdd, rulestatehistorytypes.RuleStateHistory{
itemsToAdd = append(itemsToAdd, model.RuleStateHistory{
RuleID: r.ID(),
RuleName: r.Name(),
State: state,
StateChanged: true,
UnixMilli: ts.UnixMilli(),
Labels: rulestatehistorytypes.LabelsString(labelsJSON),
Fingerprint: a.QueryResultLabels.Hash(),
Labels: model.LabelsString(labelsJSON),
Fingerprint: a.QueryResultLables.Hash(),
Value: a.Value,
})
}
// We need to change firing alert to recovering if the returned sample meets recovery threshold
changeFiringToRecovering := a.State == ruletypes.StateFiring && a.IsRecovering
changeFiringToRecovering := a.State == model.StateFiring && a.IsRecovering
// We need to change recovering alerts to firing if the returned sample meets target threshold
changeRecoveringToFiring := a.State == ruletypes.StateRecovering && !a.IsRecovering && !a.Missing
changeRecoveringToFiring := a.State == model.StateRecovering && !a.IsRecovering && !a.Missing
// in any of the above case we need to update the status of alert
if changeFiringToRecovering || changeRecoveringToFiring {
state := ruletypes.StateRecovering
state := model.StateRecovering
if changeRecoveringToFiring {
state = ruletypes.StateFiring
state = model.StateFiring
}
a.State = state
r.logger.DebugContext(ctx, "converting alert state", "name", r.Name(), "state", state)
itemsToAdd = append(itemsToAdd, rulestatehistorytypes.RuleStateHistory{
itemsToAdd = append(itemsToAdd, model.RuleStateHistory{
RuleID: r.ID(),
RuleName: r.Name(),
State: state,
StateChanged: true,
UnixMilli: ts.UnixMilli(),
Labels: rulestatehistorytypes.LabelsString(labelsJSON),
Fingerprint: a.QueryResultLabels.Hash(),
Labels: model.LabelsString(labelsJSON),
Fingerprint: a.QueryResultLables.Hash(),
Value: a.Value,
})
}

View File

@@ -2,19 +2,21 @@ package rules
import (
"context"
"log/slog"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/SigNoz/signoz/ee/query-service/anomaly"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
"github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/SigNoz/signoz/ee/anomaly"
)
// mockAnomalyProvider is a mock implementation of anomaly.Provider for testing.
@@ -22,13 +24,13 @@ import (
// time periods (current, past period, current season, past season, past 2 seasons,
// past 3 seasons), making it cumbersome to create mock data.
type mockAnomalyProvider struct {
responses []*anomaly.AnomaliesResponse
responses []*anomaly.GetAnomaliesResponse
callCount int
}
func (m *mockAnomalyProvider) GetAnomalies(ctx context.Context, orgID valuer.UUID, req *anomaly.AnomaliesRequest) (*anomaly.AnomaliesResponse, error) {
func (m *mockAnomalyProvider) GetAnomalies(ctx context.Context, orgID valuer.UUID, req *anomaly.GetAnomaliesRequest) (*anomaly.GetAnomaliesResponse, error) {
if m.callCount >= len(m.responses) {
return &anomaly.AnomaliesResponse{Results: []*qbtypes.TimeSeriesData{}}, nil
return &anomaly.GetAnomaliesResponse{Results: []*v3.Result{}}, nil
}
resp := m.responses[m.callCount]
m.callCount++
@@ -53,40 +55,39 @@ func TestAnomalyRule_NoData_AlertOnAbsent(t *testing.T) {
Frequency: valuer.MustParseTextDuration("1m"),
}},
RuleCondition: &ruletypes.RuleCondition{
CompareOperator: ruletypes.ValueIsAbove,
MatchType: ruletypes.AtleastOnce,
Target: &target,
CompositeQuery: &ruletypes.AlertCompositeQuery{
QueryType: ruletypes.QueryTypeBuilder,
Queries: []qbtypes.QueryEnvelope{{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "A",
Signal: telemetrytypes.SignalMetrics,
CompareOp: ruletypes.ValueIsAbove,
MatchType: ruletypes.AtleastOnce,
Target: &target,
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
Expression: "A",
DataSource: v3.DataSourceMetrics,
Temporality: v3.Unspecified,
},
}},
},
},
SelectedQuery: "A",
Seasonality: "daily",
Thresholds: &ruletypes.RuleThresholdData{
Kind: ruletypes.BasicThresholdKind,
Spec: ruletypes.BasicRuleThresholds{{
Name: "Test anomaly no data",
TargetValue: &target,
MatchType: ruletypes.AtleastOnce,
CompareOperator: ruletypes.ValueIsAbove,
Name: "Test anomaly no data",
TargetValue: &target,
MatchType: ruletypes.AtleastOnce,
CompareOp: ruletypes.ValueIsAbove,
}},
},
},
}
responseNoData := &anomaly.AnomaliesResponse{
Results: []*qbtypes.TimeSeriesData{
responseNoData := &anomaly.GetAnomaliesResponse{
Results: []*v3.Result{
{
QueryName: "A",
Aggregations: []*qbtypes.AggregationBucket{{
AnomalyScores: []*qbtypes.TimeSeries{},
}},
QueryName: "A",
AnomalyScores: []*v3.Series{},
},
},
}
@@ -114,10 +115,15 @@ func TestAnomalyRule_NoData_AlertOnAbsent(t *testing.T) {
t.Run(c.description, func(t *testing.T) {
postableRule.RuleCondition.AlertOnAbsent = c.alertOnAbsent
telemetryStore := telemetrystoretest.New(telemetrystore.Config{}, nil)
options := clickhouseReader.NewOptions("primaryNamespace")
reader := clickhouseReader.NewReader(slog.Default(), nil, telemetryStore, nil, "", time.Second, nil, nil, options)
rule, err := NewAnomalyRule(
"test-anomaly-rule",
valuer.GenerateUUID(),
&postableRule,
reader,
nil,
logger,
nil,
@@ -125,7 +131,7 @@ func TestAnomalyRule_NoData_AlertOnAbsent(t *testing.T) {
require.NoError(t, err)
rule.provider = &mockAnomalyProvider{
responses: []*anomaly.AnomaliesResponse{responseNoData},
responses: []*anomaly.GetAnomaliesResponse{responseNoData},
}
alertsFound, err := rule.Eval(context.Background(), evalTime)
@@ -156,41 +162,40 @@ func TestAnomalyRule_NoData_AbsentFor(t *testing.T) {
Frequency: valuer.MustParseTextDuration("1m"),
}},
RuleCondition: &ruletypes.RuleCondition{
CompareOperator: ruletypes.ValueIsAbove,
MatchType: ruletypes.AtleastOnce,
AlertOnAbsent: true,
Target: &target,
CompositeQuery: &ruletypes.AlertCompositeQuery{
QueryType: ruletypes.QueryTypeBuilder,
Queries: []qbtypes.QueryEnvelope{{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "A",
Signal: telemetrytypes.SignalMetrics,
CompareOp: ruletypes.ValueIsAbove,
MatchType: ruletypes.AtleastOnce,
AlertOnAbsent: true,
Target: &target,
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
Expression: "A",
DataSource: v3.DataSourceMetrics,
Temporality: v3.Unspecified,
},
}},
},
},
SelectedQuery: "A",
Seasonality: "daily",
Thresholds: &ruletypes.RuleThresholdData{
Kind: ruletypes.BasicThresholdKind,
Spec: ruletypes.BasicRuleThresholds{{
Name: "Test anomaly no data with AbsentFor",
TargetValue: &target,
MatchType: ruletypes.AtleastOnce,
CompareOperator: ruletypes.ValueIsAbove,
Name: "Test anomaly no data with AbsentFor",
TargetValue: &target,
MatchType: ruletypes.AtleastOnce,
CompareOp: ruletypes.ValueIsAbove,
}},
},
},
}
responseNoData := &anomaly.AnomaliesResponse{
Results: []*qbtypes.TimeSeriesData{
responseNoData := &anomaly.GetAnomaliesResponse{
Results: []*v3.Result{
{
QueryName: "A",
Aggregations: []*qbtypes.AggregationBucket{{
AnomalyScores: []*qbtypes.TimeSeries{},
}},
QueryName: "A",
AnomalyScores: []*v3.Series{},
},
},
}
@@ -224,35 +229,32 @@ func TestAnomalyRule_NoData_AbsentFor(t *testing.T) {
t1 := baseTime.Add(5 * time.Minute)
t2 := t1.Add(c.timeBetweenEvals)
responseWithData := &anomaly.AnomaliesResponse{
Results: []*qbtypes.TimeSeriesData{
responseWithData := &anomaly.GetAnomaliesResponse{
Results: []*v3.Result{
{
QueryName: "A",
Aggregations: []*qbtypes.AggregationBucket{{
AnomalyScores: []*qbtypes.TimeSeries{
{
Labels: []*qbtypes.Label{
{
Key: telemetrytypes.TelemetryFieldKey{Name: "Test"},
Value: "labels",
},
},
Values: []*qbtypes.TimeSeriesValue{
{Timestamp: baseTime.UnixMilli(), Value: 1.0},
{Timestamp: baseTime.Add(time.Minute).UnixMilli(), Value: 1.5},
},
AnomalyScores: []*v3.Series{
{
Labels: map[string]string{"test": "label"},
Points: []v3.Point{
{Timestamp: baseTime.UnixMilli(), Value: 1.0},
{Timestamp: baseTime.Add(time.Minute).UnixMilli(), Value: 1.5},
},
},
}},
},
},
},
}
rule, err := NewAnomalyRule("test-anomaly-rule", valuer.GenerateUUID(), &postableRule, nil, logger, nil)
telemetryStore := telemetrystoretest.New(telemetrystore.Config{}, nil)
options := clickhouseReader.NewOptions("primaryNamespace")
reader := clickhouseReader.NewReader(slog.Default(), nil, telemetryStore, nil, "", time.Second, nil, nil, options)
rule, err := NewAnomalyRule("test-anomaly-rule", valuer.GenerateUUID(), &postableRule, reader, nil, logger, nil)
require.NoError(t, err)
rule.provider = &mockAnomalyProvider{
responses: []*anomaly.AnomaliesResponse{responseWithData, responseNoData},
responses: []*anomaly.GetAnomaliesResponse{responseWithData, responseNoData},
}
alertsFound1, err := rule.Eval(context.Background(), t1)

View File

@@ -11,7 +11,9 @@ import (
"github.com/google/uuid"
"github.com/SigNoz/signoz/pkg/errors"
basemodel "github.com/SigNoz/signoz/pkg/query-service/model"
baserules "github.com/SigNoz/signoz/pkg/query-service/rules"
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
"github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
@@ -21,7 +23,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
rules := make([]baserules.Rule, 0)
var task baserules.Task
ruleID := baserules.RuleIDFromTaskName(opts.TaskName)
ruleId := baserules.RuleIdFromTaskName(opts.TaskName)
evaluation, err := opts.Rule.Evaluation.GetEvaluation()
if err != nil {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "evaluation is invalid: %v", err)
@@ -30,9 +32,10 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
if opts.Rule.RuleType == ruletypes.RuleTypeThreshold {
// create a threshold rule
tr, err := baserules.NewThresholdRule(
ruleID,
ruleId,
opts.OrgID,
opts.Rule,
opts.Reader,
opts.Querier,
opts.Logger,
baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay),
@@ -55,10 +58,11 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
// create promql rule
pr, err := baserules.NewPromRule(
ruleID,
ruleId,
opts.OrgID,
opts.Rule,
opts.Logger,
opts.Reader,
opts.ManagerOpts.Prometheus,
baserules.WithSQLStore(opts.SQLStore),
baserules.WithQueryParser(opts.ManagerOpts.QueryParser),
@@ -78,9 +82,10 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
} else if opts.Rule.RuleType == ruletypes.RuleTypeAnomaly {
// create anomaly rule
ar, err := NewAnomalyRule(
ruleID,
ruleId,
opts.OrgID,
opts.Rule,
opts.Reader,
opts.Querier,
opts.Logger,
opts.Cache,
@@ -100,7 +105,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
task = newTask(baserules.TaskTypeCh, opts.TaskName, evaluation.GetFrequency().Duration(), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
} else {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported rule type %s. Supported types: %s, %s", opts.Rule.RuleType, ruletypes.RuleTypeProm, ruletypes.RuleTypeThreshold)
return nil, fmt.Errorf("unsupported rule type %s. Supported types: %s, %s", opts.Rule.RuleType, ruletypes.RuleTypeProm, ruletypes.RuleTypeThreshold)
}
return task, nil
@@ -108,12 +113,12 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
// TestNotification prepares a dummy rule for given rule parameters and
// sends a test notification. returns alert count and error (if any)
func TestNotification(opts baserules.PrepareTestRuleOptions) (int, error) {
func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.ApiError) {
ctx := context.Background()
if opts.Rule == nil {
return 0, errors.NewInvalidInputf(errors.CodeInvalidInput, "rule is required")
return 0, basemodel.BadRequest(fmt.Errorf("rule is required"))
}
parsedRule := opts.Rule
@@ -133,14 +138,15 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, error) {
if parsedRule.RuleType == ruletypes.RuleTypeThreshold {
// add special labels for test alerts
parsedRule.Labels[ruletypes.RuleSourceLabel] = ""
parsedRule.Labels[ruletypes.AlertRuleIDLabel] = ""
parsedRule.Labels[labels.RuleSourceLabel] = ""
parsedRule.Labels[labels.AlertRuleIdLabel] = ""
// create a threshold rule
rule, err = baserules.NewThresholdRule(
alertname,
opts.OrgID,
parsedRule,
opts.Reader,
opts.Querier,
opts.Logger,
baserules.WithSendAlways(),
@@ -152,7 +158,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, error) {
if err != nil {
slog.Error("failed to prepare a new threshold rule for test", "name", alertname, errors.Attr(err))
return 0, err
return 0, basemodel.BadRequest(err)
}
} else if parsedRule.RuleType == ruletypes.RuleTypeProm {
@@ -163,6 +169,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, error) {
opts.OrgID,
parsedRule,
opts.Logger,
opts.Reader,
opts.ManagerOpts.Prometheus,
baserules.WithSendAlways(),
baserules.WithSendUnmatched(),
@@ -173,7 +180,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, error) {
if err != nil {
slog.Error("failed to prepare a new promql rule for test", "name", alertname, errors.Attr(err))
return 0, err
return 0, basemodel.BadRequest(err)
}
} else if parsedRule.RuleType == ruletypes.RuleTypeAnomaly {
// create anomaly rule
@@ -181,6 +188,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, error) {
alertname,
opts.OrgID,
parsedRule,
opts.Reader,
opts.Querier,
opts.Logger,
opts.Cache,
@@ -192,10 +200,10 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, error) {
)
if err != nil {
slog.Error("failed to prepare a new anomaly rule for test", "name", alertname, errors.Attr(err))
return 0, err
return 0, basemodel.BadRequest(err)
}
} else {
return 0, errors.NewInvalidInputf(errors.CodeInvalidInput, "failed to derive ruletype with given information")
return 0, basemodel.BadRequest(fmt.Errorf("failed to derive ruletype with given information"))
}
// set timestamp to current utc time
@@ -204,7 +212,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, error) {
alertsFound, err := rule.Eval(ctx, ts)
if err != nil {
slog.Error("evaluating rule failed", "rule", rule.Name(), errors.Attr(err))
return 0, err
return 0, basemodel.InternalError(fmt.Errorf("rule evaluation failed"))
}
rule.SendAlerts(ctx, ts, 0, time.Minute, opts.NotifyFunc)

View File

@@ -114,8 +114,11 @@ func TestManager_TestNotification_SendUnmatched_ThresholdRule(t *testing.T) {
},
})
count, err := mgr.TestNotification(context.Background(), orgID, string(ruleBytes))
require.Nil(t, err)
count, apiErr := mgr.TestNotification(context.Background(), orgID, string(ruleBytes))
if apiErr != nil {
t.Logf("TestNotification error: %v, type: %s", apiErr.Err, apiErr.Typ)
}
require.Nil(t, apiErr)
assert.Equal(t, tc.ExpectAlerts, count)
if tc.ExpectAlerts > 0 {
@@ -265,8 +268,11 @@ func TestManager_TestNotification_SendUnmatched_PromRule(t *testing.T) {
},
})
count, err := mgr.TestNotification(context.Background(), orgID, string(ruleBytes))
require.Nil(t, err)
count, apiErr := mgr.TestNotification(context.Background(), orgID, string(ruleBytes))
if apiErr != nil {
t.Logf("TestNotification error: %v, type: %s", apiErr.Err, apiErr.Typ)
}
require.Nil(t, apiErr)
assert.Equal(t, tc.ExpectAlerts, count)
if tc.ExpectAlerts > 0 {

View File

@@ -7,7 +7,6 @@ import (
"github.com/SigNoz/signoz/pkg/modules/rulestatehistory"
"github.com/SigNoz/signoz/pkg/types/rulestatehistorytypes"
"github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
@@ -124,8 +123,8 @@ func (m *module) RecordRuleStateHistory(ctx context.Context, ruleID string, hand
for _, item := range lastSavedState {
currentState, ok := currentItemsByFingerprint[item.Fingerprint]
if !ok {
if item.State == ruletypes.StateFiring || item.State == ruletypes.StateNoData {
item.State = ruletypes.StateInactive
if item.State == rulestatehistorytypes.StateFiring || item.State == rulestatehistorytypes.StateNoData {
item.State = rulestatehistorytypes.StateInactive
item.StateChanged = true
item.UnixMilli = time.Now().UnixMilli()
revisedItemsToAdd[item.Fingerprint] = item
@@ -146,10 +145,10 @@ func (m *module) RecordRuleStateHistory(ctx context.Context, ruleID string, hand
}
}
newState := ruletypes.StateInactive
newState := rulestatehistorytypes.StateInactive
for _, item := range revisedItemsToAdd {
if item.State == ruletypes.StateFiring || item.State == ruletypes.StateNoData {
newState = ruletypes.StateFiring
if item.State == rulestatehistorytypes.StateFiring || item.State == rulestatehistorytypes.StateNoData {
newState = rulestatehistorytypes.StateFiring
break
}
}

View File

@@ -11,7 +11,6 @@ import (
"github.com/SigNoz/signoz/pkg/telemetrystore"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/rulestatehistorytypes"
"github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
sqlbuilder "github.com/huandu/go-sqlbuilder"
)
@@ -301,7 +300,7 @@ func (s *store) ReadRuleStateHistoryTopContributorsByRuleID(ctx context.Context,
sb.From(historyTable())
sb.Where(sb.E("rule_id", ruleID))
sb.Where(sb.E("state_changed", true))
sb.Where(sb.E("state", ruletypes.StateFiring.StringValue()))
sb.Where(sb.E("state", rulestatehistorytypes.StateFiring.StringValue()))
sb.Where(sb.GE("unix_milli", query.Start))
sb.Where(sb.LT("unix_milli", query.End))
@@ -342,7 +341,7 @@ WHERE rule_id = %s
AND unix_milli < %s
GROUP BY unix_milli`,
innerSB.Var(query.Start),
innerSB.Var(ruletypes.StateInactive.StringValue()),
innerSB.Var(rulestatehistorytypes.StateInactive.StringValue()),
historyTable(),
innerSB.Var(ruleID),
innerSB.Var(query.Start),
@@ -412,7 +411,7 @@ func (s *store) GetTotalTriggers(ctx context.Context, ruleID string, query *rule
sb.From(historyTable())
sb.Where(sb.E("rule_id", ruleID))
sb.Where(sb.E("state_changed", true))
sb.Where(sb.E("state", ruletypes.StateFiring.StringValue()))
sb.Where(sb.E("state", rulestatehistorytypes.StateFiring.StringValue()))
sb.Where(sb.GE("unix_milli", query.Start))
sb.Where(sb.LT("unix_milli", query.End))
selectQuery, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
@@ -433,7 +432,7 @@ func (s *store) GetTriggersByInterval(ctx context.Context, ruleID string, query
sb.From(historyTable())
sb.Where(sb.E("rule_id", ruleID))
sb.Where(sb.E("state_changed", true))
sb.Where(sb.E("state", ruletypes.StateFiring.StringValue()))
sb.Where(sb.E("state", rulestatehistorytypes.StateFiring.StringValue()))
sb.Where(sb.GE("unix_milli", query.Start))
sb.Where(sb.LT("unix_milli", query.End))
sb.GroupBy("ts")
@@ -529,7 +528,7 @@ func (s *store) buildMatchedEventsCTE(ruleID string, query *rulestatehistorytype
firingSB := sqlbuilder.NewSelectBuilder()
firingSB.Select("rule_id", "unix_milli AS firing_time")
firingSB.From(historyTable())
firingSB.Where(firingSB.E("overall_state", ruletypes.StateFiring.StringValue()))
firingSB.Where(firingSB.E("overall_state", rulestatehistorytypes.StateFiring.StringValue()))
firingSB.Where(firingSB.E("overall_state_changed", true))
firingSB.Where(firingSB.E("rule_id", ruleID))
firingSB.Where(firingSB.GE("unix_milli", query.Start))
@@ -538,7 +537,7 @@ func (s *store) buildMatchedEventsCTE(ruleID string, query *rulestatehistorytype
resolutionSB := sqlbuilder.NewSelectBuilder()
resolutionSB.Select("rule_id", "unix_milli AS resolution_time")
resolutionSB.From(historyTable())
resolutionSB.Where(resolutionSB.E("overall_state", ruletypes.StateInactive.StringValue()))
resolutionSB.Where(resolutionSB.E("overall_state", rulestatehistorytypes.StateInactive.StringValue()))
resolutionSB.Where(resolutionSB.E("overall_state_changed", true))
resolutionSB.Where(resolutionSB.E("rule_id", ruleID))
resolutionSB.Where(resolutionSB.GE("unix_milli", query.Start))

View File

@@ -299,36 +299,6 @@ type ApiResponse struct {
Error string `json:"error,omitempty"`
}
// toApiError translates a pkg/errors typed error into the legacy
// model.ApiError to preserve the v1 JSON response shape.
func toApiError(err error) *model.ApiError {
t, _, _, _, _, _ := errors.Unwrapb(err)
var typ model.ErrorType
switch t {
case errors.TypeInvalidInput:
typ = model.ErrorBadData
case errors.TypeNotFound:
typ = model.ErrorNotFound
case errors.TypeAlreadyExists:
typ = model.ErrorConflict
case errors.TypeUnauthenticated:
typ = model.ErrorUnauthorized
case errors.TypeForbidden:
typ = model.ErrorForbidden
case errors.TypeUnsupported:
typ = model.ErrorNotImplemented
case errors.TypeTimeout:
typ = model.ErrorTimeout
case errors.TypeCanceled:
typ = model.ErrorCanceled
default:
typ = model.ErrorInternal
}
return &model.ApiError{Typ: typ, Err: err}
}
// todo(remove): Implemented at render package (github.com/SigNoz/signoz/pkg/http/render) with the new error structure
func RespondError(w http.ResponseWriter, apiErr model.BaseApiError, data interface{}) {
json := jsoniter.ConfigCompatibleWithStandardLibrary
@@ -921,6 +891,48 @@ func (aH *APIHandler) getOverallStateTransitions(w http.ResponseWriter, r *http.
aH.Respond(w, stateItems)
}
func (aH *APIHandler) metaForLinks(ctx context.Context, rule *ruletypes.GettableRule) ([]v3.FilterItem, []v3.AttributeKey, map[string]v3.AttributeKey) {
filterItems := []v3.FilterItem{}
groupBy := []v3.AttributeKey{}
keys := make(map[string]v3.AttributeKey)
if rule.AlertType == ruletypes.AlertTypeLogs {
logFields, apiErr := aH.reader.GetLogFieldsFromNames(ctx, logsv3.GetFieldNames(rule.PostableRule.RuleCondition.CompositeQuery))
if apiErr == nil {
params := &v3.QueryRangeParamsV3{
CompositeQuery: rule.RuleCondition.CompositeQuery,
}
keys = model.GetLogFieldsV3(ctx, params, logFields)
} else {
aH.logger.ErrorContext(ctx, "failed to get log fields using empty keys", errors.Attr(apiErr))
}
} else if rule.AlertType == ruletypes.AlertTypeTraces {
traceFields, err := aH.reader.GetSpanAttributeKeysByNames(ctx, logsv3.GetFieldNames(rule.PostableRule.RuleCondition.CompositeQuery))
if err == nil {
keys = traceFields
} else {
aH.logger.ErrorContext(ctx, "failed to get span attributes using empty keys", errors.Attr(err))
}
}
if rule.AlertType == ruletypes.AlertTypeLogs || rule.AlertType == ruletypes.AlertTypeTraces {
if rule.RuleCondition.CompositeQuery != nil {
if rule.RuleCondition.QueryType() == v3.QueryTypeBuilder {
selectedQuery := rule.RuleCondition.GetSelectedQueryName()
if rule.RuleCondition.CompositeQuery.BuilderQueries[selectedQuery] != nil &&
rule.RuleCondition.CompositeQuery.BuilderQueries[selectedQuery].Filters != nil {
filterItems = rule.RuleCondition.CompositeQuery.BuilderQueries[selectedQuery].Filters.Items
}
if rule.RuleCondition.CompositeQuery.BuilderQueries[selectedQuery] != nil &&
rule.RuleCondition.CompositeQuery.BuilderQueries[selectedQuery].GroupBy != nil {
groupBy = rule.RuleCondition.CompositeQuery.BuilderQueries[selectedQuery].GroupBy
}
}
}
}
return filterItems, groupBy, keys
}
func (aH *APIHandler) getRuleStateHistory(w http.ResponseWriter, r *http.Request) {
idStr := mux.Vars(r)["id"]
id, err := valuer.NewUUID(idStr)
@@ -954,6 +966,8 @@ func (aH *APIHandler) getRuleStateHistory(w http.ResponseWriter, r *http.Request
if err != nil {
continue
}
filterItems, groupBy, keys := aH.metaForLinks(r.Context(), rule)
newFilters := contextlinks.PrepareFilters(lbls, filterItems, groupBy, keys)
end := time.Unix(res.Items[idx].UnixMilli/1000, 0)
// why are we subtracting 3 minutes?
// the query range is calculated based on the rule's evalWindow and evalDelay
@@ -961,46 +975,54 @@ func (aH *APIHandler) getRuleStateHistory(w http.ResponseWriter, r *http.Request
// to get the correct query range
start := end.Add(-rule.EvalWindow.Duration() - 3*time.Minute)
if rule.AlertType == ruletypes.AlertTypeLogs {
// TODO(srikanthccv): re-visit this and support multiple queries
var q qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]
if rule.Version != "v5" {
res.Items[idx].RelatedLogsLink = contextlinks.PrepareLinksToLogs(start, end, newFilters)
} else {
// TODO(srikanthccv): re-visit this and support multiple queries
var q qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]
for _, query := range rule.RuleCondition.CompositeQuery.Queries {
if query.Type == qbtypes.QueryTypeBuilder {
switch spec := query.Spec.(type) {
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
q = spec
for _, query := range rule.RuleCondition.CompositeQuery.Queries {
if query.Type == qbtypes.QueryTypeBuilder {
switch spec := query.Spec.(type) {
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
q = spec
}
}
}
filterExpr := ""
if q.Filter != nil && q.Filter.Expression != "" {
filterExpr = q.Filter.Expression
}
whereClause := contextlinks.PrepareFilterExpression(lbls, filterExpr, q.GroupBy)
res.Items[idx].RelatedLogsLink = contextlinks.PrepareLinksToLogsV5(start, end, whereClause)
}
filterExpr := ""
if q.Filter != nil && q.Filter.Expression != "" {
filterExpr = q.Filter.Expression
}
whereClause := contextlinks.PrepareFilterExpression(lbls, filterExpr, q.GroupBy)
res.Items[idx].RelatedLogsLink = contextlinks.PrepareLinksToLogsV5(start, end, whereClause)
} else if rule.AlertType == ruletypes.AlertTypeTraces {
// TODO(srikanthccv): re-visit this and support multiple queries
var q qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]
if rule.Version != "v5" {
res.Items[idx].RelatedTracesLink = contextlinks.PrepareLinksToTraces(start, end, newFilters)
} else {
// TODO(srikanthccv): re-visit this and support multiple queries
var q qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]
for _, query := range rule.RuleCondition.CompositeQuery.Queries {
if query.Type == qbtypes.QueryTypeBuilder {
switch spec := query.Spec.(type) {
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
q = spec
for _, query := range rule.RuleCondition.CompositeQuery.Queries {
if query.Type == qbtypes.QueryTypeBuilder {
switch spec := query.Spec.(type) {
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
q = spec
}
}
}
}
filterExpr := ""
if q.Filter != nil && q.Filter.Expression != "" {
filterExpr = q.Filter.Expression
}
filterExpr := ""
if q.Filter != nil && q.Filter.Expression != "" {
filterExpr = q.Filter.Expression
}
whereClause := contextlinks.PrepareFilterExpression(lbls, filterExpr, q.GroupBy)
res.Items[idx].RelatedTracesLink = contextlinks.PrepareLinksToTracesV5(start, end, whereClause)
whereClause := contextlinks.PrepareFilterExpression(lbls, filterExpr, q.GroupBy)
res.Items[idx].RelatedTracesLink = contextlinks.PrepareLinksToTracesV5(start, end, whereClause)
}
}
}
}
@@ -1029,6 +1051,26 @@ func (aH *APIHandler) getRuleStateHistoryTopContributors(w http.ResponseWriter,
return
}
rule, err := aH.ruleManager.GetRule(r.Context(), id)
if err == nil {
for idx := range res {
lbls := make(map[string]string)
err := json.Unmarshal([]byte(res[idx].Labels), &lbls)
if err != nil {
continue
}
filterItems, groupBy, keys := aH.metaForLinks(r.Context(), rule)
newFilters := contextlinks.PrepareFilters(lbls, filterItems, groupBy, keys)
end := time.Unix(params.End/1000, 0)
start := time.Unix(params.Start/1000, 0)
if rule.AlertType == ruletypes.AlertTypeLogs {
res[idx].RelatedLogsLink = contextlinks.PrepareLinksToLogs(start, end, newFilters)
} else if rule.AlertType == ruletypes.AlertTypeTraces {
res[idx].RelatedTracesLink = contextlinks.PrepareLinksToTraces(start, end, newFilters)
}
}
}
aH.Respond(w, res)
}
@@ -1259,9 +1301,9 @@ func (aH *APIHandler) testRule(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
alertCount, err := aH.ruleManager.TestNotification(ctx, orgID, string(body))
if err != nil {
RespondError(w, toApiError(err), nil)
alertCount, apiRrr := aH.ruleManager.TestNotification(ctx, orgID, string(body))
if apiRrr != nil {
RespondError(w, apiRrr, nil)
return
}
@@ -1283,7 +1325,7 @@ func (aH *APIHandler) deleteRule(w http.ResponseWriter, r *http.Request) {
RespondError(w, &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("rule not found")}, nil)
return
}
RespondError(w, toApiError(err), nil)
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
@@ -1315,7 +1357,7 @@ func (aH *APIHandler) patchRule(w http.ResponseWriter, r *http.Request) {
RespondError(w, &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("rule not found")}, nil)
return
}
RespondError(w, toApiError(err), nil)
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
@@ -1345,7 +1387,7 @@ func (aH *APIHandler) editRule(w http.ResponseWriter, r *http.Request) {
RespondError(w, &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("rule not found")}, nil)
return
}
RespondError(w, toApiError(err), nil)
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}
@@ -1365,7 +1407,7 @@ func (aH *APIHandler) createRule(w http.ResponseWriter, r *http.Request) {
rule, err := aH.ruleManager.CreateRule(r.Context(), string(body))
if err != nil {
RespondError(w, toApiError(err), nil)
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}

View File

@@ -345,6 +345,7 @@ func makeRulesManager(
MetadataStore: metadataStore,
Prometheus: prometheus,
Context: context.Background(),
Reader: ch,
Querier: querier,
Logger: providerSettings.Logger,
Cache: cache,
@@ -353,7 +354,7 @@ func makeRulesManager(
Alertmanager: alertmanager,
RuleStore: ruleStore,
MaintenanceStore: maintenanceStore,
SQLStore: sqlstore,
SqlStore: sqlstore,
QueryParser: queryParser,
RuleStateHistoryModule: ruleStateHistoryModule,
}

View File

@@ -2,8 +2,10 @@ package common
import (
"math"
"regexp"
"sort"
"time"
"unicode"
"github.com/SigNoz/signoz/pkg/query-service/constants"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
@@ -74,6 +76,23 @@ func LCMList(nums []int64) int64 {
return result
}
func NormalizeLabelName(name string) string {
// See https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
// Regular expression to match non-alphanumeric characters except underscores
reg := regexp.MustCompile(`[^a-zA-Z0-9_]`)
// Replace all non-alphanumeric characters except underscores with underscores
normalized := reg.ReplaceAllString(name, "_")
// If the first character is not a letter or an underscore, prepend an underscore
if len(normalized) > 0 && !unicode.IsLetter(rune(normalized[0])) && normalized[0] != '_' {
normalized = "_" + normalized
}
return normalized
}
func GetSeriesFromCachedData(data []querycache.CachedSeriesData, start, end int64) []*v3.Series {
series := make(map[uint64]*v3.Series)

View File

@@ -9,6 +9,12 @@ import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/modules/rulestatehistory"
"github.com/SigNoz/signoz/pkg/query-service/constants"
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
"github.com/SigNoz/signoz/pkg/query-service/model"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
qslabels "github.com/SigNoz/signoz/pkg/query-service/utils/labels"
"github.com/SigNoz/signoz/pkg/queryparser"
"github.com/SigNoz/signoz/pkg/sqlstore"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
@@ -45,8 +51,8 @@ type BaseRule struct {
// holds the static set of labels and annotations for the rule
// these are the same for all alerts created for this rule
labels ruletypes.Labels
annotations ruletypes.Labels
labels qslabels.BaseLabels
annotations qslabels.BaseLabels
// preferredChannels is the list of channels to send the alert to
// if the rule is triggered
preferredChannels []string
@@ -65,6 +71,8 @@ type BaseRule struct {
// This is used for missing data alerts.
lastTimestampWithDatapoints time.Time
reader interfaces.Reader
logger *slog.Logger
// sendUnmatched sends observed metric values even if they don't match the
@@ -74,6 +82,12 @@ type BaseRule struct {
// sendAlways will send alert irrespective of resendDelay or other params
sendAlways bool
// TemporalityMap is a map of metric name to temporality to avoid fetching
// temporality for the same metric multiple times.
// Querying the v4 table on low cardinal temporality column should be fast,
// but we can still avoid the query if we have the data in memory.
TemporalityMap map[string]map[v3.Temporality]bool
sqlstore sqlstore.SQLStore
metadataStore telemetrytypes.MetadataStore
@@ -138,9 +152,9 @@ func WithRuleStateHistoryModule(module rulestatehistory.Module) RuleOption {
}
}
func NewBaseRule(id string, orgID valuer.UUID, p *ruletypes.PostableRule, opts ...RuleOption) (*BaseRule, error) {
func NewBaseRule(id string, orgID valuer.UUID, p *ruletypes.PostableRule, reader interfaces.Reader, opts ...RuleOption) (*BaseRule, error) {
if p.RuleCondition == nil || !p.RuleCondition.IsValid() {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid rule condition")
return nil, fmt.Errorf("invalid rule condition")
}
threshold, err := p.RuleCondition.Thresholds.GetRuleThreshold()
if err != nil {
@@ -159,11 +173,13 @@ func NewBaseRule(id string, orgID valuer.UUID, p *ruletypes.PostableRule, opts .
typ: p.AlertType,
ruleCondition: p.RuleCondition,
evalWindow: p.EvalWindow,
labels: ruletypes.FromMap(p.Labels),
annotations: ruletypes.FromMap(p.Annotations),
labels: qslabels.FromMap(p.Labels),
annotations: qslabels.FromMap(p.Annotations),
preferredChannels: p.PreferredChannels,
health: ruletypes.HealthUnknown,
Active: map[uint64]*ruletypes.Alert{},
reader: reader,
TemporalityMap: make(map[string]map[v3.Temporality]bool),
Threshold: threshold,
evaluation: evaluation,
}
@@ -184,6 +200,20 @@ func NewBaseRule(id string, orgID valuer.UUID, p *ruletypes.PostableRule, opts .
return baseRule, nil
}
func (r *BaseRule) matchType() ruletypes.MatchType {
if r.ruleCondition == nil {
return ruletypes.AtleastOnce
}
return r.ruleCondition.MatchType
}
func (r *BaseRule) compareOp() ruletypes.CompareOp {
if r.ruleCondition == nil {
return ruletypes.ValueIsEq
}
return r.ruleCondition.CompareOp
}
func (r *BaseRule) currentAlerts() []*ruletypes.Alert {
r.mtx.Lock()
defer r.mtx.Unlock()
@@ -215,10 +245,10 @@ func (r *BaseRule) ActiveAlertsLabelFP() map[uint64]struct{} {
activeAlerts := make(map[uint64]struct{}, len(r.Active))
for _, alert := range r.Active {
if alert == nil || alert.QueryResultLabels == nil {
if alert == nil || alert.QueryResultLables == nil {
continue
}
activeAlerts[alert.QueryResultLabels.Hash()] = struct{}{}
activeAlerts[alert.QueryResultLables.Hash()] = struct{}{}
}
return activeAlerts
}
@@ -239,8 +269,8 @@ func (r *BaseRule) ID() string { return r.id }
func (r *BaseRule) OrgID() valuer.UUID { return r.orgID }
func (r *BaseRule) Name() string { return r.name }
func (r *BaseRule) Condition() *ruletypes.RuleCondition { return r.ruleCondition }
func (r *BaseRule) Labels() ruletypes.Labels { return r.labels }
func (r *BaseRule) Annotations() ruletypes.Labels { return r.annotations }
func (r *BaseRule) Labels() qslabels.BaseLabels { return r.labels }
func (r *BaseRule) Annotations() qslabels.BaseLabels { return r.annotations }
func (r *BaseRule) PreferredChannels() []string { return r.preferredChannels }
func (r *BaseRule) GeneratorURL() string {
@@ -248,7 +278,10 @@ func (r *BaseRule) GeneratorURL() string {
}
func (r *BaseRule) Unit() string {
return r.ruleCondition.CompositeQuery.Unit
if r.ruleCondition != nil && r.ruleCondition.CompositeQuery != nil {
return r.ruleCondition.CompositeQuery.Unit
}
return ""
}
func (r *BaseRule) Timestamps(ts time.Time) (time.Time, time.Time) {
@@ -315,10 +348,10 @@ func (r *BaseRule) GetEvaluationTimestamp() time.Time {
return r.evaluationTimestamp
}
func (r *BaseRule) State() ruletypes.AlertState {
maxState := ruletypes.StateInactive
func (r *BaseRule) State() model.AlertState {
maxState := model.StateInactive
for _, a := range r.Active {
if a.State.Severity() > maxState.Severity() {
if a.State > maxState {
maxState = a.State
}
}
@@ -375,12 +408,12 @@ func (r *BaseRule) ForEachActiveAlert(f func(*ruletypes.Alert)) {
}
}
func (r *BaseRule) RecordRuleStateHistory(ctx context.Context, prevState, currentState ruletypes.AlertState, itemsToAdd []rulestatehistorytypes.RuleStateHistory) error {
func (r *BaseRule) RecordRuleStateHistory(ctx context.Context, prevState, currentState model.AlertState, itemsToAdd []model.RuleStateHistory) error {
if r.ruleStateHistoryModule == nil {
return nil
}
if err := r.ruleStateHistoryModule.RecordRuleStateHistory(ctx, r.ID(), r.handledRestart, itemsToAdd); err != nil {
if err := r.ruleStateHistoryModule.RecordRuleStateHistory(ctx, r.ID(), r.handledRestart, toRuleStateHistoryTypes(itemsToAdd)); err != nil {
r.logger.ErrorContext(ctx, "error while recording rule state history", errors.Attr(err), slog.Any("itemsToAdd", itemsToAdd))
return err
}
@@ -389,6 +422,100 @@ func (r *BaseRule) RecordRuleStateHistory(ctx context.Context, prevState, curren
return nil
}
// TODO(srikanthccv): remove these when v3 is cleaned up
func toRuleStateHistoryTypes(entries []model.RuleStateHistory) []rulestatehistorytypes.RuleStateHistory {
converted := make([]rulestatehistorytypes.RuleStateHistory, 0, len(entries))
for _, entry := range entries {
converted = append(converted, rulestatehistorytypes.RuleStateHistory{
RuleID: entry.RuleID,
RuleName: entry.RuleName,
OverallState: toRuleStateHistoryAlertState(entry.OverallState),
OverallStateChanged: entry.OverallStateChanged,
State: toRuleStateHistoryAlertState(entry.State),
StateChanged: entry.StateChanged,
UnixMilli: entry.UnixMilli,
Labels: rulestatehistorytypes.LabelsString(entry.Labels),
Fingerprint: entry.Fingerprint,
Value: entry.Value,
})
}
return converted
}
func toRuleStateHistoryAlertState(state model.AlertState) rulestatehistorytypes.AlertState {
switch state {
case model.StateInactive:
return rulestatehistorytypes.StateInactive
case model.StatePending:
return rulestatehistorytypes.StatePending
case model.StateRecovering:
return rulestatehistorytypes.StateRecovering
case model.StateFiring:
return rulestatehistorytypes.StateFiring
case model.StateNoData:
return rulestatehistorytypes.StateNoData
case model.StateDisabled:
return rulestatehistorytypes.StateDisabled
default:
return rulestatehistorytypes.StateInactive
}
}
func (r *BaseRule) PopulateTemporality(ctx context.Context, orgID valuer.UUID, qp *v3.QueryRangeParamsV3) error {
missingTemporality := make([]string, 0)
metricNameToTemporality := make(map[string]map[v3.Temporality]bool)
if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 {
for _, query := range qp.CompositeQuery.BuilderQueries {
// if there is no temporality specified in the query but we have it in the map
// then use the value from the map
if query.Temporality == "" && r.TemporalityMap[query.AggregateAttribute.Key] != nil {
// We prefer delta if it is available
if r.TemporalityMap[query.AggregateAttribute.Key][v3.Delta] {
query.Temporality = v3.Delta
} else if r.TemporalityMap[query.AggregateAttribute.Key][v3.Cumulative] {
query.Temporality = v3.Cumulative
} else {
query.Temporality = v3.Unspecified
}
}
// we don't have temporality for this metric
if query.DataSource == v3.DataSourceMetrics && query.Temporality == "" {
missingTemporality = append(missingTemporality, query.AggregateAttribute.Key)
}
if _, ok := metricNameToTemporality[query.AggregateAttribute.Key]; !ok {
metricNameToTemporality[query.AggregateAttribute.Key] = make(map[v3.Temporality]bool)
}
}
}
var nameToTemporality map[string]map[v3.Temporality]bool
var err error
if len(missingTemporality) > 0 {
nameToTemporality, err = r.reader.FetchTemporality(ctx, orgID, missingTemporality)
if err != nil {
return err
}
}
if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 {
for name := range qp.CompositeQuery.BuilderQueries {
query := qp.CompositeQuery.BuilderQueries[name]
if query.DataSource == v3.DataSourceMetrics && query.Temporality == "" {
if nameToTemporality[query.AggregateAttribute.Key][v3.Delta] {
query.Temporality = v3.Delta
} else if nameToTemporality[query.AggregateAttribute.Key][v3.Cumulative] {
query.Temporality = v3.Cumulative
} else {
query.Temporality = v3.Unspecified
}
r.TemporalityMap[query.AggregateAttribute.Key] = nameToTemporality[query.AggregateAttribute.Key]
}
}
}
return nil
}
// ShouldSkipNewGroups returns true if new group filtering should be applied
func (r *BaseRule) ShouldSkipNewGroups() bool {
return r.newGroupEvalDelay.IsPositive()
@@ -396,7 +523,7 @@ func (r *BaseRule) ShouldSkipNewGroups() bool {
// isFilterNewSeriesSupported checks if the query is supported for new series filtering
func (r *BaseRule) isFilterNewSeriesSupported() bool {
if r.ruleCondition.CompositeQuery.QueryType == ruletypes.QueryTypeBuilder {
if r.ruleCondition.CompositeQuery.QueryType == v3.QueryTypeBuilder {
for _, query := range r.ruleCondition.CompositeQuery.Queries {
if query.Type != qbtypes.QueryTypeBuilder {
continue
@@ -465,7 +592,7 @@ func (r *BaseRule) extractMetricAndGroupBys(ctx context.Context) (map[string][]s
// FilterNewSeries filters out items that are too new based on metadata first_seen timestamps.
// Returns the filtered series (old ones) excluding new series that are still within the grace period.
func (r *BaseRule) FilterNewSeries(ctx context.Context, ts time.Time, series []*qbtypes.TimeSeries) ([]*qbtypes.TimeSeries, error) {
func (r *BaseRule) FilterNewSeries(ctx context.Context, ts time.Time, series []*v3.Series) ([]*v3.Series, error) {
// Extract metric names and groupBy keys
metricToGroupedFields, err := r.extractMetricAndGroupBys(ctx)
if err != nil {
@@ -482,22 +609,14 @@ func (r *BaseRule) FilterNewSeries(ctx context.Context, ts time.Time, series []*
seriesIdxToLookupKeys := make(map[int][]telemetrytypes.MetricMetadataLookupKey) // series index -> lookup keys
for i := 0; i < len(series); i++ {
valueForKey := func(key string) (string, bool) {
for _, item := range series[i].Labels {
if item.Key.Name == key {
return fmt.Sprint(item.Value), true
}
}
return "", false
}
metricLabelMap := series[i].Labels
// Collect groupBy attribute-value pairs for this series
seriesKeys := make([]telemetrytypes.MetricMetadataLookupKey, 0)
for metricName, groupedFields := range metricToGroupedFields {
for _, groupByKey := range groupedFields {
if attrValue, ok := valueForKey(groupByKey); ok {
if attrValue, ok := metricLabelMap[groupByKey]; ok {
lookupKey := telemetrytypes.MetricMetadataLookupKey{
MetricName: metricName,
AttributeName: groupByKey,
@@ -537,7 +656,7 @@ func (r *BaseRule) FilterNewSeries(ctx context.Context, ts time.Time, series []*
}
// Filter series based on first_seen + delay
filteredSeries := make([]*qbtypes.TimeSeries, 0, len(series))
filteredSeries := make([]*v3.Series, 0, len(series))
evalTimeMs := ts.UnixMilli()
newGroupEvalDelayMs := r.newGroupEvalDelay.Milliseconds()
@@ -607,9 +726,9 @@ func (r *BaseRule) HandleMissingDataAlert(ctx context.Context, ts time.Time, has
}
r.logger.InfoContext(ctx, "no data found for rule condition", "rule_id", r.ID())
lbls := ruletypes.NewBuilder()
lbls := labels.NewBuilder(labels.Labels{})
if !r.lastTimestampWithDatapoints.IsZero() {
lbls.Set(ruletypes.LabelLastSeen, r.lastTimestampWithDatapoints.Format(ruletypes.AlertTimeFormat))
lbls.Set(ruletypes.LabelLastSeen, r.lastTimestampWithDatapoints.Format(constants.AlertTimeFormat))
}
return &ruletypes.Sample{Metric: lbls.Labels(), IsMissing: true}
}

View File

@@ -2,14 +2,23 @@ package rules
import (
"context"
"fmt"
"log/slog"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/cache/cachetest"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/prometheus/prometheustest"
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
"github.com/SigNoz/signoz/pkg/queryparser"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/ruletypes"
@@ -18,29 +27,22 @@ import (
"github.com/SigNoz/signoz/pkg/valuer"
)
// createTestSeries creates a *qbtypes.TimeSeries with the given labels and optional points
// createTestSeries creates a *v3.Series with the given labels and optional points
// so we don't exactly need the points in the series because the labels are used to determine if the series is new or old
// we use the labels to create a lookup key for the series and then check the first_seen timestamp for the series in the metadata table
func createTestSeries(kvMap map[string]string, values []*qbtypes.TimeSeriesValue) *qbtypes.TimeSeries {
if values == nil {
values = []*qbtypes.TimeSeriesValue{}
func createTestSeries(labels map[string]string, points []v3.Point) *v3.Series {
if points == nil {
points = []v3.Point{}
}
lbls := make([]*qbtypes.Label, 0)
for k, v := range kvMap {
lbls = append(lbls, &qbtypes.Label{
Key: telemetrytypes.TelemetryFieldKey{Name: k},
Value: v,
})
}
return &qbtypes.TimeSeries{
Labels: lbls,
Values: values,
return &v3.Series{
Labels: labels,
Points: points,
}
}
// seriesEqual compares two v3.Series by their labels
// Returns true if the series have the same labels (order doesn't matter)
func seriesEqual(s1, s2 *qbtypes.TimeSeries) bool {
func seriesEqual(s1, s2 *v3.Series) bool {
if s1 == nil && s2 == nil {
return true
}
@@ -115,7 +117,7 @@ func mergeFirstSeenMaps(maps ...map[telemetrytypes.MetricMetadataLookupKey]int64
}
// createPostableRule creates a PostableRule with the given CompositeQuery
func createPostableRule(compositeQuery *ruletypes.AlertCompositeQuery) ruletypes.PostableRule {
func createPostableRule(compositeQuery *v3.CompositeQuery) ruletypes.PostableRule {
return ruletypes.PostableRule{
AlertName: "Test Rule",
AlertType: ruletypes.AlertTypeMetric,
@@ -133,10 +135,10 @@ func createPostableRule(compositeQuery *ruletypes.AlertCompositeQuery) ruletypes
Kind: ruletypes.BasicThresholdKind,
Spec: ruletypes.BasicRuleThresholds{
{
Name: "test-threshold",
TargetValue: func() *float64 { v := 1.0; return &v }(),
CompareOperator: ruletypes.ValueIsAbove,
MatchType: ruletypes.AtleastOnce,
Name: "test-threshold",
TargetValue: func() *float64 { v := 1.0; return &v }(),
CompareOp: ruletypes.ValueIsAbove,
MatchType: ruletypes.AtleastOnce,
},
},
},
@@ -147,12 +149,12 @@ func createPostableRule(compositeQuery *ruletypes.AlertCompositeQuery) ruletypes
// filterNewSeriesTestCase represents a test case for FilterNewSeries
type filterNewSeriesTestCase struct {
name string
compositeQuery *ruletypes.AlertCompositeQuery
series []*qbtypes.TimeSeries
compositeQuery *v3.CompositeQuery
series []*v3.Series
firstSeenMap map[telemetrytypes.MetricMetadataLookupKey]int64
newGroupEvalDelay valuer.TextDuration
evalTime time.Time
expectedFiltered []*qbtypes.TimeSeries // series that should be in the final filtered result (old enough)
expectedFiltered []*v3.Series // series that should be in the final filtered result (old enough)
expectError bool
}
@@ -168,8 +170,8 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
tests := []filterNewSeriesTestCase{
{
name: "mixed old and new series - Builder query",
compositeQuery: &ruletypes.AlertCompositeQuery{
QueryType: ruletypes.QueryTypeBuilder,
compositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
@@ -192,7 +194,7 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
},
},
},
series: []*qbtypes.TimeSeries{
series: []*v3.Series{
createTestSeries(map[string]string{"service_name": "svc-old", "env": "prod"}, nil),
createTestSeries(map[string]string{"service_name": "svc-new", "env": "prod"}, nil),
createTestSeries(map[string]string{"service_name": "svc-missing", "env": "stage"}, nil),
@@ -204,15 +206,15 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
),
newGroupEvalDelay: defaultNewGroupEvalDelay,
evalTime: defaultEvalTime,
expectedFiltered: []*qbtypes.TimeSeries{
expectedFiltered: []*v3.Series{
createTestSeries(map[string]string{"service_name": "svc-old", "env": "prod"}, nil),
createTestSeries(map[string]string{"service_name": "svc-missing", "env": "stage"}, nil),
}, // svc-old and svc-missing should be included; svc-new is filtered out
},
{
name: "all new series - PromQL query",
compositeQuery: &ruletypes.AlertCompositeQuery{
QueryType: ruletypes.QueryTypePromQL,
compositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypePromQL,
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypePromQL,
@@ -226,7 +228,7 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
},
},
},
series: []*qbtypes.TimeSeries{
series: []*v3.Series{
createTestSeries(map[string]string{"service_name": "svc-new1", "env": "prod"}, nil),
createTestSeries(map[string]string{"service_name": "svc-new2", "env": "stage"}, nil),
},
@@ -236,12 +238,12 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
),
newGroupEvalDelay: defaultNewGroupEvalDelay,
evalTime: defaultEvalTime,
expectedFiltered: []*qbtypes.TimeSeries{}, // all should be filtered out (new series)
expectedFiltered: []*v3.Series{}, // all should be filtered out (new series)
},
{
name: "all old series - ClickHouse query",
compositeQuery: &ruletypes.AlertCompositeQuery{
QueryType: ruletypes.QueryTypeClickHouseSQL,
compositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeClickHouseSQL,
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeClickHouseSQL,
@@ -253,7 +255,7 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
},
},
},
series: []*qbtypes.TimeSeries{
series: []*v3.Series{
createTestSeries(map[string]string{"service_name": "svc-old1", "env": "prod"}, nil),
createTestSeries(map[string]string{"service_name": "svc-old2", "env": "stage"}, nil),
},
@@ -263,15 +265,15 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
),
newGroupEvalDelay: defaultNewGroupEvalDelay,
evalTime: defaultEvalTime,
expectedFiltered: []*qbtypes.TimeSeries{
expectedFiltered: []*v3.Series{
createTestSeries(map[string]string{"service_name": "svc-old1", "env": "prod"}, nil),
createTestSeries(map[string]string{"service_name": "svc-old2", "env": "stage"}, nil),
}, // all should be included (old series)
},
{
name: "no grouping in query - Builder",
compositeQuery: &ruletypes.AlertCompositeQuery{
QueryType: ruletypes.QueryTypeBuilder,
compositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
@@ -291,20 +293,20 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
},
},
},
series: []*qbtypes.TimeSeries{
series: []*v3.Series{
createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil),
},
firstSeenMap: make(map[telemetrytypes.MetricMetadataLookupKey]int64),
newGroupEvalDelay: defaultNewGroupEvalDelay,
evalTime: defaultEvalTime,
expectedFiltered: []*qbtypes.TimeSeries{
expectedFiltered: []*v3.Series{
createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil),
}, // early return, no filtering - all series included
},
{
name: "no metric names - Builder",
compositeQuery: &ruletypes.AlertCompositeQuery{
QueryType: ruletypes.QueryTypeBuilder,
compositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
@@ -321,20 +323,20 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
},
},
},
series: []*qbtypes.TimeSeries{
series: []*v3.Series{
createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil),
},
firstSeenMap: make(map[telemetrytypes.MetricMetadataLookupKey]int64),
newGroupEvalDelay: defaultNewGroupEvalDelay,
evalTime: defaultEvalTime,
expectedFiltered: []*qbtypes.TimeSeries{
expectedFiltered: []*v3.Series{
createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil),
}, // early return, no filtering - all series included
},
{
name: "series with no matching labels - Builder",
compositeQuery: &ruletypes.AlertCompositeQuery{
QueryType: ruletypes.QueryTypeBuilder,
compositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
@@ -357,20 +359,20 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
},
},
},
series: []*qbtypes.TimeSeries{
series: []*v3.Series{
createTestSeries(map[string]string{"status": "200"}, nil), // no service_name or env
},
firstSeenMap: make(map[telemetrytypes.MetricMetadataLookupKey]int64),
newGroupEvalDelay: defaultNewGroupEvalDelay,
evalTime: defaultEvalTime,
expectedFiltered: []*qbtypes.TimeSeries{
expectedFiltered: []*v3.Series{
createTestSeries(map[string]string{"status": "200"}, nil),
}, // series included as we can't decide if it's new or old
},
{
name: "series with missing metadata - PromQL",
compositeQuery: &ruletypes.AlertCompositeQuery{
QueryType: ruletypes.QueryTypePromQL,
compositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypePromQL,
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypePromQL,
@@ -384,7 +386,7 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
},
},
},
series: []*qbtypes.TimeSeries{
series: []*v3.Series{
createTestSeries(map[string]string{"service_name": "svc-old", "env": "prod"}, nil),
createTestSeries(map[string]string{"service_name": "svc-no-metadata", "env": "prod"}, nil),
},
@@ -392,15 +394,15 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
// svc-no-metadata has no entry in firstSeenMap
newGroupEvalDelay: defaultNewGroupEvalDelay,
evalTime: defaultEvalTime,
expectedFiltered: []*qbtypes.TimeSeries{
expectedFiltered: []*v3.Series{
createTestSeries(map[string]string{"service_name": "svc-old", "env": "prod"}, nil),
createTestSeries(map[string]string{"service_name": "svc-no-metadata", "env": "prod"}, nil),
}, // both should be included - svc-old is old, svc-no-metadata can't be decided
},
{
name: "series with partial metadata - ClickHouse",
compositeQuery: &ruletypes.AlertCompositeQuery{
QueryType: ruletypes.QueryTypeClickHouseSQL,
compositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeClickHouseSQL,
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeClickHouseSQL,
@@ -412,7 +414,7 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
},
},
},
series: []*qbtypes.TimeSeries{
series: []*v3.Series{
createTestSeries(map[string]string{"service_name": "svc-partial", "env": "prod"}, nil),
},
// Only provide metadata for service_name, not env
@@ -422,14 +424,14 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
},
newGroupEvalDelay: defaultNewGroupEvalDelay,
evalTime: defaultEvalTime,
expectedFiltered: []*qbtypes.TimeSeries{
expectedFiltered: []*v3.Series{
createTestSeries(map[string]string{"service_name": "svc-partial", "env": "prod"}, nil),
}, // has some metadata, uses max first_seen which is old
},
{
name: "empty series array - Builder",
compositeQuery: &ruletypes.AlertCompositeQuery{
QueryType: ruletypes.QueryTypeBuilder,
compositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
@@ -452,16 +454,16 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
},
},
},
series: []*qbtypes.TimeSeries{},
series: []*v3.Series{},
firstSeenMap: make(map[telemetrytypes.MetricMetadataLookupKey]int64),
newGroupEvalDelay: defaultNewGroupEvalDelay,
evalTime: defaultEvalTime,
expectedFiltered: []*qbtypes.TimeSeries{},
expectedFiltered: []*v3.Series{},
},
{
name: "zero delay - Builder",
compositeQuery: &ruletypes.AlertCompositeQuery{
QueryType: ruletypes.QueryTypeBuilder,
compositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
@@ -484,20 +486,20 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
},
},
},
series: []*qbtypes.TimeSeries{
series: []*v3.Series{
createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil),
},
firstSeenMap: createFirstSeenMap("request_total", defaultGroupByFields, defaultEvalTime, defaultDelay, true, "svc1", "prod"),
newGroupEvalDelay: valuer.TextDuration{}, // zero delay
evalTime: defaultEvalTime,
expectedFiltered: []*qbtypes.TimeSeries{
expectedFiltered: []*v3.Series{
createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil),
}, // with zero delay, all series pass
},
{
name: "multiple metrics with same groupBy keys - Builder",
compositeQuery: &ruletypes.AlertCompositeQuery{
QueryType: ruletypes.QueryTypeBuilder,
compositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
@@ -525,7 +527,7 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
},
},
},
series: []*qbtypes.TimeSeries{
series: []*v3.Series{
createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil),
},
firstSeenMap: mergeFirstSeenMaps(
@@ -534,14 +536,14 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
),
newGroupEvalDelay: defaultNewGroupEvalDelay,
evalTime: defaultEvalTime,
expectedFiltered: []*qbtypes.TimeSeries{
expectedFiltered: []*v3.Series{
createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil),
},
},
{
name: "series with multiple groupBy attributes where one is new and one is old - Builder",
compositeQuery: &ruletypes.AlertCompositeQuery{
QueryType: ruletypes.QueryTypeBuilder,
compositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
@@ -564,7 +566,7 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
},
},
},
series: []*qbtypes.TimeSeries{
series: []*v3.Series{
createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil),
},
// service_name is old, env is new - should use max (new)
@@ -574,12 +576,12 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
),
newGroupEvalDelay: defaultNewGroupEvalDelay,
evalTime: defaultEvalTime,
expectedFiltered: []*qbtypes.TimeSeries{}, // max first_seen is new, so should be filtered out
expectedFiltered: []*v3.Series{}, // max first_seen is new, so should be filtered out
},
{
name: "Logs query - should skip filtering and return empty skip indexes",
compositeQuery: &ruletypes.AlertCompositeQuery{
QueryType: ruletypes.QueryTypeBuilder,
compositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
@@ -599,22 +601,22 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
},
},
},
series: []*qbtypes.TimeSeries{
series: []*v3.Series{
createTestSeries(map[string]string{"service_name": "svc1"}, nil),
createTestSeries(map[string]string{"service_name": "svc2"}, nil),
},
firstSeenMap: make(map[telemetrytypes.MetricMetadataLookupKey]int64),
newGroupEvalDelay: defaultNewGroupEvalDelay,
evalTime: defaultEvalTime,
expectedFiltered: []*qbtypes.TimeSeries{
expectedFiltered: []*v3.Series{
createTestSeries(map[string]string{"service_name": "svc1"}, nil),
createTestSeries(map[string]string{"service_name": "svc2"}, nil),
}, // Logs queries should return early, no filtering - all included
},
{
name: "Traces query - should skip filtering and return empty skip indexes",
compositeQuery: &ruletypes.AlertCompositeQuery{
QueryType: ruletypes.QueryTypeBuilder,
compositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
@@ -634,14 +636,14 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
},
},
},
series: []*qbtypes.TimeSeries{
series: []*v3.Series{
createTestSeries(map[string]string{"service_name": "svc1"}, nil),
createTestSeries(map[string]string{"service_name": "svc2"}, nil),
},
firstSeenMap: make(map[telemetrytypes.MetricMetadataLookupKey]int64),
newGroupEvalDelay: defaultNewGroupEvalDelay,
evalTime: defaultEvalTime,
expectedFiltered: []*qbtypes.TimeSeries{
expectedFiltered: []*v3.Series{
createTestSeries(map[string]string{"service_name": "svc1"}, nil),
createTestSeries(map[string]string{"service_name": "svc2"}, nil),
}, // Traces queries should return early, no filtering - all included
@@ -653,6 +655,9 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
// Create postableRule from compositeQuery
postableRule := createPostableRule(tt.compositeQuery)
// Setup telemetry store mock
telemetryStore := telemetrystoretest.New(telemetrystore.Config{}, &queryMatcherAny{})
// Setup mock metadata store
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
@@ -676,12 +681,37 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
// Setup metadata query mock
mockMetadataStore.SetFirstSeenFromMetricMetadata(tt.firstSeenMap)
// Create reader with mocked telemetry store
readerCache, err := cachetest.New(
cache.Config{
Provider: "memory",
Memory: cache.Memory{
NumCounters: 10 * 1000,
MaxCost: 1 << 26,
},
},
)
require.NoError(t, err)
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
reader := clickhouseReader.NewReader(
slog.Default(),
nil,
telemetryStore,
prometheustest.New(context.Background(), settings, prometheus.Config{Timeout: 2 * time.Minute}, telemetryStore),
"",
time.Second,
nil,
readerCache,
options,
)
postableRule.NotificationSettings = &ruletypes.NotificationSettings{
NewGroupEvalDelay: tt.newGroupEvalDelay,
}
// Create BaseRule using NewBaseRule
rule, err := NewBaseRule("test-rule", valuer.GenerateUUID(), &postableRule, WithQueryParser(queryParser), WithLogger(logger), WithMetadataStore(mockMetadataStore))
rule, err := NewBaseRule("test-rule", valuer.GenerateUUID(), &postableRule, reader, WithQueryParser(queryParser), WithLogger(logger), WithMetadataStore(mockMetadataStore))
require.NoError(t, err)
filteredSeries, err := rule.FilterNewSeries(context.Background(), tt.evalTime, tt.series)
@@ -725,13 +755,9 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
// labelsKey creates a deterministic string key from a labels map
// This is used to group series by their unique label combinations
func labelsKey(lbls []*qbtypes.Label) string {
func labelsKey(lbls map[string]string) string {
if len(lbls) == 0 {
return ""
}
temp := ruletypes.NewBuilder()
for _, item := range lbls {
temp.Set(item.Key.Name, fmt.Sprint(item.Value))
}
return temp.Labels().String()
return labels.FromMap(lbls).String()
}

View File

@@ -10,7 +10,7 @@ import (
"sync"
"time"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
"github.com/SigNoz/signoz/pkg/queryparser"
"github.com/go-openapi/strfmt"
@@ -21,6 +21,9 @@ import (
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/rulestatehistory"
"github.com/SigNoz/signoz/pkg/prometheus"
querierV5 "github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/types"
@@ -36,7 +39,8 @@ type PrepareTaskOptions struct {
TaskName string
RuleStore ruletypes.RuleStore
MaintenanceStore ruletypes.MaintenanceStore
Querier querier.Querier
Reader interfaces.Reader
Querier querierV5.Querier
Logger *slog.Logger
Cache cache.Cache
ManagerOpts *ManagerOptions
@@ -49,7 +53,8 @@ type PrepareTestRuleOptions struct {
Rule *ruletypes.PostableRule
RuleStore ruletypes.RuleStore
MaintenanceStore ruletypes.MaintenanceStore
Querier querier.Querier
Reader interfaces.Reader
Querier querierV5.Querier
Logger *slog.Logger
Cache cache.Cache
ManagerOpts *ManagerOptions
@@ -60,12 +65,19 @@ type PrepareTestRuleOptions struct {
const taskNameSuffix = "webAppEditor"
func RuleIDFromTaskName(n string) string {
func RuleIdFromTaskName(n string) string {
return strings.Split(n, "-groupname")[0]
}
func prepareTaskName(ruleID string) string {
return fmt.Sprintf("%s-groupname", ruleID)
func prepareTaskName(ruleId interface{}) string {
switch ruleId.(type) {
case int, int64:
return fmt.Sprintf("%d-groupname", ruleId)
case string:
return fmt.Sprintf("%s-groupname", ruleId)
default:
return fmt.Sprintf("%v-groupname", ruleId)
}
}
// ManagerOptions bundles options for the Manager.
@@ -76,7 +88,8 @@ type ManagerOptions struct {
Context context.Context
ResendDelay time.Duration
Querier querier.Querier
Reader interfaces.Reader
Querier querierV5.Querier
Logger *slog.Logger
Cache cache.Cache
@@ -85,12 +98,12 @@ type ManagerOptions struct {
RuleStateHistoryModule rulestatehistory.Module
PrepareTaskFunc func(opts PrepareTaskOptions) (Task, error)
PrepareTestRuleFunc func(opts PrepareTestRuleOptions) (int, error)
PrepareTestRuleFunc func(opts PrepareTestRuleOptions) (int, *model.ApiError)
Alertmanager alertmanager.Alertmanager
OrgGetter organization.Getter
RuleStore ruletypes.RuleStore
MaintenanceStore ruletypes.MaintenanceStore
SQLStore sqlstore.SQLStore
SqlStore sqlstore.SQLStore
QueryParser queryparser.QueryParser
}
@@ -106,9 +119,10 @@ type Manager struct {
maintenanceStore ruletypes.MaintenanceStore
logger *slog.Logger
reader interfaces.Reader
cache cache.Cache
prepareTaskFunc func(opts PrepareTaskOptions) (Task, error)
prepareTestRuleFunc func(opts PrepareTestRuleOptions) (int, error)
prepareTestRuleFunc func(opts PrepareTestRuleOptions) (int, *model.ApiError)
alertmanager alertmanager.Alertmanager
sqlstore sqlstore.SQLStore
@@ -138,7 +152,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
rules := make([]Rule, 0)
var task Task
ruleID := RuleIDFromTaskName(opts.TaskName)
ruleId := RuleIdFromTaskName(opts.TaskName)
evaluation, err := opts.Rule.Evaluation.GetEvaluation()
if err != nil {
@@ -148,9 +162,10 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
if opts.Rule.RuleType == ruletypes.RuleTypeThreshold {
// create a threshold rule
tr, err := NewThresholdRule(
ruleID,
ruleId,
opts.OrgID,
opts.Rule,
opts.Reader,
opts.Querier,
opts.Logger,
WithEvalDelay(opts.ManagerOpts.EvalDelay),
@@ -173,10 +188,11 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
// create promql rule
pr, err := NewPromRule(
ruleID,
ruleId,
opts.OrgID,
opts.Rule,
opts.Logger,
opts.Reader,
opts.ManagerOpts.Prometheus,
WithSQLStore(opts.SQLStore),
WithQueryParser(opts.ManagerOpts.QueryParser),
@@ -194,7 +210,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
task = newTask(TaskTypeProm, opts.TaskName, taskNameSuffix, evaluation.GetFrequency().Duration(), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
} else {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported rule type %s. Supported types: %s, %s", opts.Rule.RuleType, ruletypes.RuleTypeProm, ruletypes.RuleTypeThreshold)
return nil, fmt.Errorf("unsupported rule type %s. Supported types: %s, %s", opts.Rule.RuleType, ruletypes.RuleTypeProm, ruletypes.RuleTypeThreshold)
}
return task, nil
@@ -213,12 +229,13 @@ func NewManager(o *ManagerOptions) (*Manager, error) {
opts: o,
block: make(chan struct{}),
logger: o.Logger,
reader: o.Reader,
cache: o.Cache,
prepareTaskFunc: o.PrepareTaskFunc,
prepareTestRuleFunc: o.PrepareTestRuleFunc,
alertmanager: o.Alertmanager,
orgGetter: o.OrgGetter,
sqlstore: o.SQLStore,
sqlstore: o.SqlStore,
queryParser: o.QueryParser,
}
@@ -275,7 +292,6 @@ func (m *Manager) initiate(ctx context.Context) error {
loadErrors = append(loadErrors, err)
continue
}
if parsedRule.NotificationSettings != nil {
config := parsedRule.NotificationSettings.GetAlertManagerNotificationConfig()
err = m.alertmanager.SetNotificationConfig(ctx, org.ID, rec.ID.StringValue(), &config)
@@ -397,6 +413,7 @@ func (m *Manager) editTask(_ context.Context, orgID valuer.UUID, rule *ruletypes
TaskName: taskName,
RuleStore: m.ruleStore,
MaintenanceStore: m.maintenanceStore,
Reader: m.reader,
Querier: m.opts.Querier,
Logger: m.opts.Logger,
Cache: m.cache,
@@ -444,7 +461,7 @@ func (m *Manager) DeleteRule(ctx context.Context, idStr string) error {
id, err := valuer.NewUUID(idStr)
if err != nil {
m.logger.Error("delete rule received a rule id in invalid format, must be a valid uuid-v7", "id", idStr, errors.Attr(err))
return errors.NewInvalidInputf(errors.CodeInvalidInput, "delete rule received an rule id in invalid format, must be a valid uuid-v7")
return fmt.Errorf("delete rule received an rule id in invalid format, must be a valid uuid-v7")
}
claims, err := authtypes.ClaimsFromContext(ctx)
@@ -509,7 +526,7 @@ func (m *Manager) deleteTask(taskName string) {
if ok {
oldg.Stop()
delete(m.tasks, taskName)
delete(m.rules, RuleIDFromTaskName(taskName))
delete(m.rules, RuleIdFromTaskName(taskName))
m.logger.Debug("rule task deleted", "name", taskName)
} else {
m.logger.Info("rule not found for deletion", "name", taskName)
@@ -605,6 +622,7 @@ func (m *Manager) addTask(_ context.Context, orgID valuer.UUID, rule *ruletypes.
TaskName: taskName,
RuleStore: m.ruleStore,
MaintenanceStore: m.maintenanceStore,
Reader: m.reader,
Querier: m.opts.Querier,
Logger: m.opts.Logger,
Cache: m.cache,
@@ -626,7 +644,7 @@ func (m *Manager) addTask(_ context.Context, orgID valuer.UUID, rule *ruletypes.
// If there is another task with the same identifier, raise an error
_, ok := m.tasks[taskName]
if ok {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "a rule with the same name already exists")
return fmt.Errorf("a rule with the same name already exists")
}
go func() {
@@ -748,7 +766,7 @@ func (m *Manager) prepareTestNotifyFunc() NotifyFunc {
if len(alerts) == 0 {
return
}
ruleID := alerts[0].Labels.Map()[ruletypes.AlertRuleIDLabel]
ruleID := alerts[0].Labels.Map()[labels.AlertRuleIdLabel]
receiverMap := make(map[*alertmanagertypes.PostableAlert][]string)
for _, alert := range alerts {
generatorURL := alert.GeneratorURL
@@ -757,7 +775,7 @@ func (m *Manager) prepareTestNotifyFunc() NotifyFunc {
a.Annotations = alert.Annotations.Map()
a.StartsAt = strfmt.DateTime(alert.FiredAt)
labelsMap := alert.Labels.Map()
labelsMap[ruletypes.TestAlertLabel] = "true"
labelsMap[labels.TestAlertLabel] = "true"
a.Alert = alertmanagertypes.AlertModel{
Labels: labelsMap,
GeneratorURL: strfmt.URI(generatorURL),
@@ -814,7 +832,7 @@ func (m *Manager) ListRuleStates(ctx context.Context) (*ruletypes.GettableRules,
// fetch state of rule from memory
if rm, ok := m.rules[ruleResponse.Id]; !ok {
ruleResponse.State = ruletypes.StateDisabled
ruleResponse.State = model.StateDisabled
ruleResponse.Disabled = true
} else {
ruleResponse.State = rm.State()
@@ -843,7 +861,7 @@ func (m *Manager) GetRule(ctx context.Context, id valuer.UUID) (*ruletypes.Getta
r.Id = id.StringValue()
// fetch state of rule from memory
if rm, ok := m.rules[r.Id]; !ok {
r.State = ruletypes.StateDisabled
r.State = model.StateDisabled
r.Disabled = true
} else {
r.State = rm.State()
@@ -954,7 +972,7 @@ func (m *Manager) PatchRule(ctx context.Context, ruleStr string, id valuer.UUID)
// fetch state of rule from memory
if rm, ok := m.rules[id.StringValue()]; !ok {
response.State = ruletypes.StateDisabled
response.State = model.StateDisabled
response.Disabled = true
} else {
response.State = rm.State()
@@ -965,11 +983,11 @@ func (m *Manager) PatchRule(ctx context.Context, ruleStr string, id valuer.UUID)
// TestNotification prepares a dummy rule for given rule parameters and
// sends a test notification. returns alert count and error (if any)
func (m *Manager) TestNotification(ctx context.Context, orgID valuer.UUID, ruleStr string) (int, error) {
func (m *Manager) TestNotification(ctx context.Context, orgID valuer.UUID, ruleStr string) (int, *model.ApiError) {
parsedRule := ruletypes.PostableRule{}
err := json.Unmarshal([]byte(ruleStr), &parsedRule)
if err != nil {
return 0, errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "failed to unmarshal rule")
return 0, model.BadRequest(err)
}
if !parsedRule.NotificationSettings.UsePolicy {
parsedRule.NotificationSettings.GroupBy = append(parsedRule.NotificationSettings.GroupBy, ruletypes.LabelThresholdName)
@@ -977,13 +995,17 @@ func (m *Manager) TestNotification(ctx context.Context, orgID valuer.UUID, ruleS
config := parsedRule.NotificationSettings.GetAlertManagerNotificationConfig()
err = m.alertmanager.SetNotificationConfig(ctx, orgID, parsedRule.AlertName, &config)
if err != nil {
return 0, err
return 0, &model.ApiError{
Typ: model.ErrorBadData,
Err: err,
}
}
alertCount, err := m.prepareTestRuleFunc(PrepareTestRuleOptions{
alertCount, apiErr := m.prepareTestRuleFunc(PrepareTestRuleOptions{
Rule: &parsedRule,
RuleStore: m.ruleStore,
MaintenanceStore: m.maintenanceStore,
Reader: m.reader,
Querier: m.opts.Querier,
Logger: m.opts.Logger,
Cache: m.cache,
@@ -993,5 +1015,83 @@ func (m *Manager) TestNotification(ctx context.Context, orgID valuer.UUID, ruleS
OrgID: orgID,
})
return alertCount, err
return alertCount, apiErr
}
func (m *Manager) GetAlertDetailsForMetricNames(ctx context.Context, metricNames []string) (map[string][]ruletypes.GettableRule, *model.ApiError) {
claims, err := authtypes.ClaimsFromContext(ctx)
if err != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
}
result := make(map[string][]ruletypes.GettableRule)
rules, err := m.ruleStore.GetStoredRules(ctx, claims.OrgID)
if err != nil {
m.logger.ErrorContext(ctx, "error getting stored rules", errors.Attr(err))
return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
}
metricRulesMap := make(map[string][]ruletypes.GettableRule)
for _, storedRule := range rules {
var rule ruletypes.GettableRule
err = json.Unmarshal([]byte(storedRule.Data), &rule)
if err != nil {
m.logger.ErrorContext(ctx, "failed to unmarshal rule from db", "id", storedRule.ID.StringValue(), errors.Attr(err))
continue
}
if rule.AlertType != ruletypes.AlertTypeMetric || rule.RuleCondition == nil || rule.RuleCondition.CompositeQuery == nil {
continue
}
rule.Id = storedRule.ID.StringValue()
rule.CreatedAt = &storedRule.CreatedAt
rule.CreatedBy = &storedRule.CreatedBy
rule.UpdatedAt = &storedRule.UpdatedAt
rule.UpdatedBy = &storedRule.UpdatedBy
for _, query := range rule.RuleCondition.CompositeQuery.BuilderQueries {
if query.AggregateAttribute.Key != "" {
metricRulesMap[query.AggregateAttribute.Key] = append(metricRulesMap[query.AggregateAttribute.Key], rule)
}
}
for _, query := range rule.RuleCondition.CompositeQuery.PromQueries {
if query.Query != "" {
for _, metricName := range metricNames {
if strings.Contains(query.Query, metricName) {
metricRulesMap[metricName] = append(metricRulesMap[metricName], rule)
}
}
}
}
for _, query := range rule.RuleCondition.CompositeQuery.ClickHouseQueries {
if query.Query != "" {
for _, metricName := range metricNames {
if strings.Contains(query.Query, metricName) {
metricRulesMap[metricName] = append(metricRulesMap[metricName], rule)
}
}
}
}
}
for _, metricName := range metricNames {
if rules, exists := metricRulesMap[metricName]; exists {
seen := make(map[string]bool)
uniqueRules := make([]ruletypes.GettableRule, 0)
for _, rule := range rules {
if !seen[rule.Id] {
seen[rule.Id] = true
uniqueRules = append(uniqueRules, rule)
}
}
result[metricName] = uniqueRules
}
}
return result, nil
}

View File

@@ -110,8 +110,11 @@ func TestManager_TestNotification_SendUnmatched_ThresholdRule(t *testing.T) {
},
})
count, err := mgr.TestNotification(context.Background(), orgID, string(ruleBytes))
require.Nil(t, err)
count, apiErr := mgr.TestNotification(context.Background(), orgID, string(ruleBytes))
if apiErr != nil {
t.Logf("TestNotification error: %v, type: %s", apiErr.Err, apiErr.Typ)
}
require.Nil(t, apiErr)
assert.Equal(t, tc.ExpectAlerts, count)
if tc.ExpectAlerts > 0 {
@@ -206,13 +209,13 @@ func TestManager_TestNotification_SendUnmatched_PromRule(t *testing.T) {
// Create fingerprint data
fingerprint := uint64(12345)
labelsJSON := `{"__name__":"test_metric"}`
fingerprintData := [][]any{
fingerprintData := [][]interface{}{
{fingerprint, labelsJSON},
}
fingerprintRows := cmock.NewRows(fingerprintCols, fingerprintData)
// Create samples data from test case values, calculating timestamps relative to baseTime
validSamplesData := make([][]any, 0)
validSamplesData := make([][]interface{}, 0)
for _, v := range tc.Values {
// Skip NaN and Inf values in the samples data
if math.IsNaN(v.Value) || math.IsInf(v.Value, 0) {
@@ -220,7 +223,7 @@ func TestManager_TestNotification_SendUnmatched_PromRule(t *testing.T) {
}
// Calculate timestamp relative to baseTime
sampleTimestamp := baseTime.Add(v.Offset).UnixMilli()
validSamplesData = append(validSamplesData, []any{
validSamplesData = append(validSamplesData, []interface{}{
"test_metric",
fingerprint,
sampleTimestamp,
@@ -260,8 +263,11 @@ func TestManager_TestNotification_SendUnmatched_PromRule(t *testing.T) {
},
})
count, err := mgr.TestNotification(context.Background(), orgID, string(ruleBytes))
require.Nil(t, err)
count, apiErr := mgr.TestNotification(context.Background(), orgID, string(ruleBytes))
if apiErr != nil {
t.Logf("TestNotification error: %v, type: %s", apiErr.Err, apiErr.Typ)
}
require.Nil(t, apiErr)
assert.Equal(t, tc.ExpectAlerts, count)
if tc.ExpectAlerts > 0 {

View File

@@ -4,6 +4,7 @@ import (
"math"
"time"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
@@ -14,7 +15,7 @@ import (
// ThresholdRuleTestCase defines test case structure for threshold rule test notifications
type ThresholdRuleTestCase struct {
Name string
Values [][]any
Values [][]interface{}
ExpectAlerts int
ExpectValue float64
}
@@ -51,11 +52,11 @@ func ThresholdRuleAtLeastOnceValueAbove(target float64, recovery *float64) rulet
},
Version: "v5",
RuleCondition: &ruletypes.RuleCondition{
MatchType: ruletypes.AtleastOnce,
CompareOperator: ruletypes.ValueIsAbove,
Target: &target,
CompositeQuery: &ruletypes.AlertCompositeQuery{
QueryType: ruletypes.QueryTypeBuilder,
MatchType: ruletypes.AtleastOnce,
CompareOp: ruletypes.ValueIsAbove,
Target: &target,
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
@@ -79,11 +80,11 @@ func ThresholdRuleAtLeastOnceValueAbove(target float64, recovery *float64) rulet
Kind: ruletypes.BasicThresholdKind,
Spec: ruletypes.BasicRuleThresholds{
{
Name: "primary",
TargetValue: &target,
RecoveryTarget: recovery,
MatchType: ruletypes.AtleastOnce,
CompareOperator: ruletypes.ValueIsAbove,
Name: "primary",
TargetValue: &target,
RecoveryTarget: recovery,
MatchType: ruletypes.AtleastOnce,
CompareOp: ruletypes.ValueIsAbove,
},
},
},
@@ -110,13 +111,13 @@ func BuildPromAtLeastOnceValueAbove(target float64, recovery *float64) ruletypes
},
Version: "v5",
RuleCondition: &ruletypes.RuleCondition{
MatchType: ruletypes.AtleastOnce,
SelectedQuery: "A",
CompareOperator: ruletypes.ValueIsAbove,
Target: &target,
CompositeQuery: &ruletypes.AlertCompositeQuery{
QueryType: ruletypes.QueryTypePromQL,
PanelType: ruletypes.PanelTypeGraph,
MatchType: ruletypes.AtleastOnce,
SelectedQuery: "A",
CompareOp: ruletypes.ValueIsAbove,
Target: &target,
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypePromQL,
PanelType: v3.PanelTypeGraph,
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypePromQL,
@@ -133,12 +134,12 @@ func BuildPromAtLeastOnceValueAbove(target float64, recovery *float64) ruletypes
Kind: ruletypes.BasicThresholdKind,
Spec: ruletypes.BasicRuleThresholds{
{
Name: "primary",
TargetValue: &target,
RecoveryTarget: recovery,
MatchType: ruletypes.AtleastOnce,
CompareOperator: ruletypes.ValueIsAbove,
Channels: []string{"slack"},
Name: "primary",
TargetValue: &target,
RecoveryTarget: recovery,
MatchType: ruletypes.AtleastOnce,
CompareOp: ruletypes.ValueIsAbove,
Channels: []string{"slack"},
},
},
},
@@ -152,7 +153,7 @@ var (
TcTestNotiSendUnmatchedThresholdRule = []ThresholdRuleTestCase{
{
Name: "return first valid point in case of test notification",
Values: [][]any{
Values: [][]interface{}{
{float64(3), "attr", time.Now()},
{float64(4), "attr", time.Now().Add(1 * time.Minute)},
},
@@ -161,12 +162,12 @@ var (
},
{
Name: "No data in DB so no alerts fired",
Values: [][]any{},
Values: [][]interface{}{},
ExpectAlerts: 0,
},
{
Name: "return first valid point in case of test notification skips NaN and Inf",
Values: [][]any{
Values: [][]interface{}{
{math.NaN(), "attr", time.Now()},
{math.Inf(1), "attr", time.Now().Add(1 * time.Minute)},
{float64(7), "attr", time.Now().Add(2 * time.Minute)},
@@ -176,7 +177,7 @@ var (
},
{
Name: "If found matching alert with given target value, return the alerting value rather than first valid point",
Values: [][]any{
Values: [][]interface{}{
{float64(1), "attr", time.Now()},
{float64(2), "attr", time.Now().Add(1 * time.Minute)},
{float64(3), "attr", time.Now().Add(2 * time.Minute)},

View File

@@ -16,6 +16,7 @@ import (
"github.com/SigNoz/signoz/pkg/prometheus/prometheustest"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/querier/signozquerier"
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/sqlstore/sqlstoretest"
"github.com/SigNoz/signoz/pkg/telemetrystore"
@@ -87,7 +88,7 @@ func NewTestManager(t *testing.T, testOpts *TestManagerOptions) *Manager {
}
// Create reader with mocked telemetry store
cache, err := cachetest.New(cache.Config{
readerCache, err := cachetest.New(cache.Config{
Provider: "memory",
Memory: cache.Memory{
NumCounters: 10 * 1000,
@@ -96,16 +97,28 @@ func NewTestManager(t *testing.T, testOpts *TestManagerOptions) *Manager {
})
require.NoError(t, err)
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
providerSettings := instrumentationtest.New().ToProviderSettings()
prometheus := prometheustest.New(context.Background(), providerSettings, prometheus.Config{Timeout: 2 * time.Minute}, telemetryStore)
reader := clickhouseReader.NewReader(
instrumentationtest.New().Logger(),
nil,
telemetryStore,
prometheus,
"",
time.Duration(time.Second),
nil,
readerCache,
options,
)
flagger, err := flagger.New(context.Background(), instrumentationtest.New().ToProviderSettings(), flagger.Config{}, flagger.MustNewRegistry())
if err != nil {
t.Fatalf("failed to create flagger: %v", err)
}
// Create querier with test values
providerFactory := signozquerier.NewFactory(telemetryStore, prometheus, cache, flagger)
// Create mock querierV5 with test values
providerFactory := signozquerier.NewFactory(telemetryStore, prometheus, readerCache, flagger)
mockQuerier, err := providerFactory.New(context.Background(), providerSettings, querier.Config{})
require.NoError(t, err)
@@ -115,7 +128,8 @@ func NewTestManager(t *testing.T, testOpts *TestManagerOptions) *Manager {
Alertmanager: fAlert,
Querier: mockQuerier,
TelemetryStore: telemetryStore,
SQLStore: sqlStore, // SQLStore needed for SendAlerts to query organizations
Reader: reader,
SqlStore: sqlStore, // SQLStore needed for SendAlerts to query organizations
}
// Call the ManagerOptions hook if provided to allow customization

View File

@@ -12,10 +12,14 @@ import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
"github.com/SigNoz/signoz/pkg/query-service/model"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
qslabels "github.com/SigNoz/signoz/pkg/query-service/utils/labels"
"github.com/SigNoz/signoz/pkg/query-service/utils/times"
"github.com/SigNoz/signoz/pkg/query-service/utils/timestamp"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/rulestatehistorytypes"
"github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/units"
"github.com/SigNoz/signoz/pkg/valuer"
)
@@ -33,12 +37,13 @@ func NewPromRule(
orgID valuer.UUID,
postableRule *ruletypes.PostableRule,
logger *slog.Logger,
reader interfaces.Reader,
prometheus prometheus.Prometheus,
opts ...RuleOption,
) (*PromRule, error) {
opts = append(opts, WithLogger(logger))
baseRule, err := NewBaseRule(id, orgID, postableRule, opts...)
baseRule, err := NewBaseRule(id, orgID, postableRule, reader, opts...)
if err != nil {
return nil, err
}
@@ -50,7 +55,7 @@ func NewPromRule(
}
p.logger = logger
query, err := p.getPqlQuery(context.Background())
query, err := p.getPqlQuery()
if err != nil {
// can not generate a valid prom QL query
return nil, err
@@ -63,45 +68,72 @@ func (r *PromRule) Type() ruletypes.RuleType {
return ruletypes.RuleTypeProm
}
func (r *PromRule) GetSelectedQuery(ctx context.Context) string {
if r.ruleCondition.SelectedQuery != "" {
return r.ruleCondition.SelectedQuery
func (r *PromRule) GetSelectedQuery() string {
if r.ruleCondition != nil {
// If the user has explicitly set the selected query, we return that.
if r.ruleCondition.SelectedQuery != "" {
return r.ruleCondition.SelectedQuery
}
// Historically, we used to have only one query in the alerts for promql.
// So, if there is only one query, we return that.
// This is to maintain backward compatibility.
// For new rules, we will have to explicitly set the selected query.
return "A"
}
r.logger.WarnContext(ctx, "missing selected query", "rule_name", r.Name())
return r.ruleCondition.SelectedQueryName()
// This should never happen.
return ""
}
func (r *PromRule) getPqlQuery(ctx context.Context) (string, error) {
selectedQuery := r.GetSelectedQuery(ctx)
for _, item := range r.ruleCondition.CompositeQuery.Queries {
switch item.Type {
case qbtypes.QueryTypePromQL:
promQuery, ok := item.Spec.(qbtypes.PromQuery)
if !ok {
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid promql query spec %T", item.Spec)
func (r *PromRule) getPqlQuery() (string, error) {
if r.version == "v5" {
if len(r.ruleCondition.CompositeQuery.Queries) > 0 {
selectedQuery := r.GetSelectedQuery()
for _, item := range r.ruleCondition.CompositeQuery.Queries {
switch item.Type {
case qbtypes.QueryTypePromQL:
promQuery, ok := item.Spec.(qbtypes.PromQuery)
if !ok {
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid promql query spec %T", item.Spec)
}
if promQuery.Name == selectedQuery {
return promQuery.Query, nil
}
}
}
if promQuery.Name == selectedQuery {
return promQuery.Query, nil
}
return "", fmt.Errorf("invalid promql rule setup")
}
if r.ruleCondition.CompositeQuery.QueryType == v3.QueryTypePromQL {
if len(r.ruleCondition.CompositeQuery.PromQueries) > 0 {
selectedQuery := r.GetSelectedQuery()
if promQuery, ok := r.ruleCondition.CompositeQuery.PromQueries[selectedQuery]; ok {
query := promQuery.Query
if query == "" {
return query, fmt.Errorf("a promquery needs to be set for this rule to function")
}
return query, nil
}
}
}
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid promql rule setup")
return "", fmt.Errorf("invalid promql rule query")
}
func (r *PromRule) matrixToCommonSeries(res promql.Matrix) []*qbtypes.TimeSeries {
seriesSlice := make([]*qbtypes.TimeSeries, 0, len(res))
func (r *PromRule) matrixToV3Series(res promql.Matrix) []*v3.Series {
v3Series := make([]*v3.Series, 0, len(res))
for _, series := range res {
commonSeries := toCommonSeries(series)
seriesSlice = append(seriesSlice, commonSeries)
v3Series = append(v3Series, &commonSeries)
}
return seriesSlice
return v3Series
}
func (r *PromRule) buildAndRunQuery(ctx context.Context, ts time.Time) (ruletypes.Vector, error) {
start, end := r.Timestamps(ts)
interval := 60 * time.Second // TODO(srikanthccv): this should be configurable
q, err := r.getPqlQuery(ctx)
q, err := r.getPqlQuery()
if err != nil {
return nil, err
}
@@ -113,7 +145,7 @@ func (r *PromRule) buildAndRunQuery(ctx context.Context, ts time.Time) (ruletype
return nil, err
}
matrixToProcess := r.matrixToCommonSeries(res)
matrixToProcess := r.matrixToV3Series(res)
hasData := len(matrixToProcess) > 0
if missingDataAlert := r.HandleMissingDataAlert(ctx, ts, hasData); missingDataAlert != nil {
@@ -137,11 +169,11 @@ func (r *PromRule) buildAndRunQuery(ctx context.Context, ts time.Time) (ruletype
if !r.Condition().ShouldEval(series) {
r.logger.InfoContext(
ctx, "not enough data points to evaluate series, skipping",
"rule_id", r.ID(), "num_points", len(series.Values), "required_points", r.Condition().RequiredNumPoints,
"rule_id", r.ID(), "num_points", len(series.Points), "required_points", r.Condition().RequiredNumPoints,
)
continue
}
resultSeries, err := r.Threshold.Eval(series, r.Unit(), ruletypes.EvalData{
resultSeries, err := r.Threshold.Eval(*series, r.Unit(), ruletypes.EvalData{
ActiveAlerts: r.ActiveAlertsLabelFP(),
SendUnmatched: r.ShouldSendUnmatched(),
})
@@ -196,6 +228,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (int, error) {
defs+text,
"__alert_"+r.Name(),
tmplData,
times.Time(timestamp.FromTime(ts)),
nil,
)
result, err := tmpl.Expand()
@@ -206,24 +239,24 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (int, error) {
return result
}
lb := ruletypes.NewBuilder(result.Metric...).Del(ruletypes.MetricNameLabel)
resultLabels := ruletypes.NewBuilder(result.Metric...).Del(ruletypes.MetricNameLabel).Labels()
lb := qslabels.NewBuilder(result.Metric).Del(qslabels.MetricNameLabel)
resultLabels := qslabels.NewBuilder(result.Metric).Del(qslabels.MetricNameLabel).Labels()
for name, value := range r.labels.Map() {
lb.Set(name, expand(value))
}
lb.Set(ruletypes.AlertNameLabel, r.Name())
lb.Set(ruletypes.AlertRuleIDLabel, r.ID())
lb.Set(ruletypes.RuleSourceLabel, r.GeneratorURL())
lb.Set(qslabels.AlertNameLabel, r.Name())
lb.Set(qslabels.AlertRuleIdLabel, r.ID())
lb.Set(qslabels.RuleSourceLabel, r.GeneratorURL())
annotations := make(ruletypes.Labels, 0, len(r.annotations.Map()))
annotations := make(qslabels.Labels, 0, len(r.annotations.Map()))
for name, value := range r.annotations.Map() {
annotations = append(annotations, ruletypes.Label{Name: name, Value: expand(value)})
annotations = append(annotations, qslabels.Label{Name: name, Value: expand(value)})
}
if result.IsMissing {
lb.Set(ruletypes.AlertNameLabel, "[No data] "+r.Name())
lb.Set(ruletypes.NoDataLabel, "true")
lb.Set(qslabels.AlertNameLabel, "[No data] "+r.Name())
lb.Set(qslabels.NoDataLabel, "true")
}
lbs := lb.Labels()
@@ -231,7 +264,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (int, error) {
resultFPs[h] = struct{}{}
if _, ok := alerts[h]; ok {
err = errors.NewInternalf(errors.CodeInternal, "vector contains metrics with the same labelset after applying alert labels")
err = fmt.Errorf("vector contains metrics with the same labelset after applying alert labels")
// We have already acquired the lock above hence using SetHealth and
// SetLastError will deadlock.
r.health = ruletypes.HealthBad
@@ -240,10 +273,10 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (int, error) {
}
alerts[h] = &ruletypes.Alert{
Labels: lbs,
QueryResultLabels: resultLabels,
QueryResultLables: resultLabels,
Annotations: annotations,
ActiveAt: ts,
State: ruletypes.StatePending,
State: model.StatePending,
Value: result.V,
GeneratorURL: r.GeneratorURL(),
Receivers: ruleReceiverMap[lbs.Map()[ruletypes.LabelThresholdName]],
@@ -257,7 +290,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (int, error) {
for h, a := range alerts {
// Check whether we already have alerting state for the identifying label set.
// Update the last value and annotations if so, create a new alert entry otherwise.
if alert, ok := r.Active[h]; ok && alert.State != ruletypes.StateInactive {
if alert, ok := r.Active[h]; ok && alert.State != model.StateInactive {
alert.Value = a.Value
alert.Annotations = a.Annotations
// Update the recovering and missing state of existing alert
@@ -273,75 +306,75 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (int, error) {
}
itemsToAdd := []rulestatehistorytypes.RuleStateHistory{}
itemsToAdd := []model.RuleStateHistory{}
// Check if any pending alerts should be removed or fire now. Write out alert timeseries.
for fp, a := range r.Active {
labelsJSON, err := json.Marshal(a.QueryResultLabels)
labelsJSON, err := json.Marshal(a.QueryResultLables)
if err != nil {
r.logger.ErrorContext(ctx, "error marshaling labels", errors.Attr(err), "rule_name", r.Name())
}
if _, ok := resultFPs[fp]; !ok {
// If the alert was previously firing, keep it around for a given
// retention time so it is reported as resolved to the AlertManager.
if a.State == ruletypes.StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > ruletypes.ResolvedRetention) {
if a.State == model.StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > ruletypes.ResolvedRetention) {
delete(r.Active, fp)
}
if a.State != ruletypes.StateInactive {
a.State = ruletypes.StateInactive
if a.State != model.StateInactive {
a.State = model.StateInactive
a.ResolvedAt = ts
itemsToAdd = append(itemsToAdd, rulestatehistorytypes.RuleStateHistory{
itemsToAdd = append(itemsToAdd, model.RuleStateHistory{
RuleID: r.ID(),
RuleName: r.Name(),
State: ruletypes.StateInactive,
State: model.StateInactive,
StateChanged: true,
UnixMilli: ts.UnixMilli(),
Labels: rulestatehistorytypes.LabelsString(labelsJSON),
Fingerprint: a.QueryResultLabels.Hash(),
Labels: model.LabelsString(labelsJSON),
Fingerprint: a.QueryResultLables.Hash(),
})
}
continue
}
if a.State == ruletypes.StatePending && ts.Sub(a.ActiveAt) >= r.holdDuration.Duration() {
a.State = ruletypes.StateFiring
if a.State == model.StatePending && ts.Sub(a.ActiveAt) >= r.holdDuration.Duration() {
a.State = model.StateFiring
a.FiredAt = ts
state := ruletypes.StateFiring
state := model.StateFiring
if a.Missing {
state = ruletypes.StateNoData
state = model.StateNoData
}
itemsToAdd = append(itemsToAdd, rulestatehistorytypes.RuleStateHistory{
itemsToAdd = append(itemsToAdd, model.RuleStateHistory{
RuleID: r.ID(),
RuleName: r.Name(),
State: state,
StateChanged: true,
UnixMilli: ts.UnixMilli(),
Labels: rulestatehistorytypes.LabelsString(labelsJSON),
Fingerprint: a.QueryResultLabels.Hash(),
Labels: model.LabelsString(labelsJSON),
Fingerprint: a.QueryResultLables.Hash(),
Value: a.Value,
})
}
// We need to change firing alert to recovering if the returned sample meets recovery threshold
changeAlertingToRecovering := a.State == ruletypes.StateFiring && a.IsRecovering
changeAlertingToRecovering := a.State == model.StateFiring && a.IsRecovering
// We need to change recovering alerts to firing if the returned sample meets target threshold
changeRecoveringToFiring := a.State == ruletypes.StateRecovering && !a.IsRecovering && !a.Missing
changeRecoveringToFiring := a.State == model.StateRecovering && !a.IsRecovering && !a.Missing
// in any of the above case we need to update the status of alert
if changeAlertingToRecovering || changeRecoveringToFiring {
state := ruletypes.StateRecovering
state := model.StateRecovering
if changeRecoveringToFiring {
state = ruletypes.StateFiring
state = model.StateFiring
}
a.State = state
r.logger.DebugContext(ctx, "converting alert state", "name", r.Name(), "state", state)
itemsToAdd = append(itemsToAdd, rulestatehistorytypes.RuleStateHistory{
itemsToAdd = append(itemsToAdd, model.RuleStateHistory{
RuleID: r.ID(),
RuleName: r.Name(),
State: state,
StateChanged: true,
UnixMilli: ts.UnixMilli(),
Labels: rulestatehistorytypes.LabelsString(labelsJSON),
Fingerprint: a.QueryResultLabels.Hash(),
Labels: model.LabelsString(labelsJSON),
Fingerprint: a.QueryResultLables.Hash(),
Value: a.Value,
})
}
@@ -419,25 +452,26 @@ func (r *PromRule) RunAlertQuery(ctx context.Context, qs string, start, end time
case promql.Matrix:
return res.Value.(promql.Matrix), nil
default:
return nil, errors.NewInternalf(errors.CodeInternal, "rule result is not a vector or scalar")
return nil, fmt.Errorf("rule result is not a vector or scalar")
}
}
func toCommonSeries(series promql.Series) *qbtypes.TimeSeries {
commonSeries := &qbtypes.TimeSeries{
Labels: make([]*qbtypes.Label, 0),
Values: make([]*qbtypes.TimeSeriesValue, 0),
func toCommonSeries(series promql.Series) v3.Series {
commonSeries := v3.Series{
Labels: make(map[string]string),
LabelsArray: make([]map[string]string, 0),
Points: make([]v3.Point, 0),
}
series.Metric.Range(func(lbl labels.Label) {
commonSeries.Labels = append(commonSeries.Labels, &qbtypes.Label{
Key: telemetrytypes.TelemetryFieldKey{Name: lbl.Name},
Value: lbl.Value,
commonSeries.Labels[lbl.Name] = lbl.Value
commonSeries.LabelsArray = append(commonSeries.LabelsArray, map[string]string{
lbl.Name: lbl.Value,
})
})
for _, f := range series.Floats {
commonSeries.Values = append(commonSeries.Values, &qbtypes.TimeSeriesValue{
commonSeries.Points = append(commonSeries.Points, v3.Point{
Timestamp: f.T,
Value: f.F,
})

View File

@@ -2,6 +2,7 @@ package rules
import (
"context"
"fmt"
"sort"
"sync"
"time"
@@ -186,7 +187,7 @@ func (g *PromRuleTask) PromRules() []*PromRule {
}
}
sort.Slice(alerts, func(i, j int) bool {
return alerts[i].State().Severity() > alerts[j].State().Severity() ||
return alerts[i].State() > alerts[j].State() ||
(alerts[i].State() == alerts[j].State() &&
alerts[i].Name() < alerts[j].Name())
})
@@ -267,7 +268,7 @@ func (g *PromRuleTask) CopyState(fromTask Task) error {
from, ok := fromTask.(*PromRuleTask)
if !ok {
return errors.NewInternalf(errors.CodeInternal, "you can only copy rule groups with same type")
return fmt.Errorf("you can only copy rule groups with same type")
}
g.evaluationTime = from.evaluationTime

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1 @@
package rules

View File

@@ -4,7 +4,8 @@ import (
"context"
"time"
"github.com/SigNoz/signoz/pkg/types/rulestatehistorytypes"
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
@@ -16,13 +17,13 @@ type Rule interface {
Name() string
Type() ruletypes.RuleType
Labels() ruletypes.Labels
Annotations() ruletypes.Labels
Labels() labels.BaseLabels
Annotations() labels.BaseLabels
Condition() *ruletypes.RuleCondition
EvalDelay() valuer.TextDuration
EvalWindow() valuer.TextDuration
HoldDuration() valuer.TextDuration
State() ruletypes.AlertState
State() model.AlertState
ActiveAlerts() []*ruletypes.Alert
// ActiveAlertsLabelFP returns a map of active alert labels fingerprint
ActiveAlertsLabelFP() map[uint64]struct{}
@@ -41,17 +42,7 @@ type Rule interface {
SetEvaluationTimestamp(time.Time)
GetEvaluationTimestamp() time.Time
RecordRuleStateHistory(
ctx context.Context,
prevState, currentState ruletypes.AlertState,
itemsToAdd []rulestatehistorytypes.RuleStateHistory,
) error
RecordRuleStateHistory(ctx context.Context, prevState, currentState model.AlertState, itemsToAdd []model.RuleStateHistory) error
SendAlerts(
ctx context.Context,
ts time.Time,
resendDelay time.Duration,
interval time.Duration,
notifyFunc NotifyFunc,
)
SendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc)
}

View File

@@ -2,6 +2,7 @@ package rules
import (
"context"
"fmt"
"sort"
"sync"
"time"
@@ -11,9 +12,10 @@ import (
opentracing "github.com/opentracing/opentracing-go"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/ctxtypes"
"github.com/SigNoz/signoz/pkg/types/ruletypes"
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
@@ -93,7 +95,7 @@ func (g *RuleTask) Pause(b bool) {
type QueryOrigin struct{}
func NewQueryOriginContext(ctx context.Context, data map[string]any) context.Context {
func NewQueryOriginContext(ctx context.Context, data map[string]interface{}) context.Context {
return context.WithValue(ctx, QueryOrigin{}, data)
}
@@ -109,7 +111,7 @@ func (g *RuleTask) Run(ctx context.Context) {
return
}
ctx = NewQueryOriginContext(ctx, map[string]any{
ctx = NewQueryOriginContext(ctx, map[string]interface{}{
"ruleRuleTask": map[string]string{
"name": g.Name(),
},
@@ -161,8 +163,8 @@ func (g *RuleTask) Stop() {
}
func (g *RuleTask) hash() uint64 {
l := ruletypes.New(
ruletypes.Label{Name: "name", Value: g.name},
l := labels.New(
labels.Label{Name: "name", Value: g.name},
)
return l.Hash()
}
@@ -178,7 +180,7 @@ func (g *RuleTask) ThresholdRules() []*ThresholdRule {
}
}
sort.Slice(alerts, func(i, j int) bool {
return alerts[i].State().Severity() > alerts[j].State().Severity() ||
return alerts[i].State() > alerts[j].State() ||
(alerts[i].State() == alerts[j].State() &&
alerts[i].Name() < alerts[j].Name())
})
@@ -263,7 +265,7 @@ func (g *RuleTask) CopyState(fromTask Task) error {
from, ok := fromTask.(*RuleTask)
if !ok {
return errors.NewInternalf(errors.CodeInternal, "invalid from task for copy")
return fmt.Errorf("invalid from task for copy")
}
g.evaluationTime = from.evaluationTime
g.lastEvaluation = from.lastEvaluation

View File

@@ -1,162 +0,0 @@
package rules
import (
"context"
"testing"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/querybuilder/resourcefilter"
"github.com/SigNoz/signoz/pkg/telemetrylogs"
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/telemetrytraces"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest"
"github.com/stretchr/testify/require"
)
func prepareQuerierForMetrics(t *testing.T, telemetryStore telemetrystore.TelemetryStore) querier.Querier {
providerSettings := instrumentationtest.New().ToProviderSettings()
metadataStore := telemetrytypestest.NewMockMetadataStore()
flagger, err := flagger.New(
context.Background(),
instrumentationtest.New().ToProviderSettings(),
flagger.Config{},
flagger.MustNewRegistry(),
)
require.NoError(t, err)
metricFieldMapper := telemetrymetrics.NewFieldMapper()
metricConditionBuilder := telemetrymetrics.NewConditionBuilder(metricFieldMapper)
metricStmtBuilder := telemetrymetrics.NewMetricQueryStatementBuilder(
providerSettings,
metadataStore,
metricFieldMapper,
metricConditionBuilder,
flagger,
)
return querier.New(
providerSettings,
telemetryStore,
metadataStore,
nil, // prometheus
nil, // traceStmtBuilder
nil, // logStmtBuilder
metricStmtBuilder,
nil, // meterStmtBuilder
nil, // traceOperatorStmtBuilder
nil, // bucketCache
)
}
func prepareQuerierForLogs(telemetryStore telemetrystore.TelemetryStore, keysMap map[string][]*telemetrytypes.TelemetryFieldKey) querier.Querier {
providerSettings := instrumentationtest.New().ToProviderSettings()
metadataStore := telemetrytypestest.NewMockMetadataStore()
for _, keys := range keysMap {
for _, key := range keys {
key.Signal = telemetrytypes.SignalLogs
}
}
metadataStore.KeysMap = keysMap
resourceFilterFieldMapper := resourcefilter.NewFieldMapper()
resourceFilterConditionBuilder := resourcefilter.NewConditionBuilder(resourceFilterFieldMapper)
logFieldMapper := telemetrylogs.NewFieldMapper()
logConditionBuilder := telemetrylogs.NewConditionBuilder(logFieldMapper)
logResourceFilterStmtBuilder := resourcefilter.NewLogResourceFilterStatementBuilder(
providerSettings,
resourceFilterFieldMapper,
resourceFilterConditionBuilder,
metadataStore,
telemetrylogs.DefaultFullTextColumn,
telemetrylogs.GetBodyJSONKey,
)
logAggExprRewriter := querybuilder.NewAggExprRewriter(
providerSettings,
telemetrylogs.DefaultFullTextColumn,
logFieldMapper,
logConditionBuilder,
telemetrylogs.GetBodyJSONKey,
)
logStmtBuilder := telemetrylogs.NewLogQueryStatementBuilder(
providerSettings,
metadataStore,
logFieldMapper,
logConditionBuilder,
logResourceFilterStmtBuilder,
logAggExprRewriter,
telemetrylogs.DefaultFullTextColumn,
telemetrylogs.GetBodyJSONKey,
)
return querier.New(
providerSettings,
telemetryStore,
metadataStore,
nil, // prometheus
nil, // traceStmtBuilder
logStmtBuilder, // logStmtBuilder
nil, // metricStmtBuilder
nil, // meterStmtBuilder
nil, // traceOperatorStmtBuilder
nil, // bucketCache
)
}
func prepareQuerierForTraces(telemetryStore telemetrystore.TelemetryStore, keysMap map[string][]*telemetrytypes.TelemetryFieldKey) querier.Querier {
providerSettings := instrumentationtest.New().ToProviderSettings()
metadataStore := telemetrytypestest.NewMockMetadataStore()
for _, keys := range keysMap {
for _, key := range keys {
key.Signal = telemetrytypes.SignalTraces
}
}
metadataStore.KeysMap = keysMap
// Create trace statement builder
traceFieldMapper := telemetrytraces.NewFieldMapper()
traceConditionBuilder := telemetrytraces.NewConditionBuilder(traceFieldMapper)
resourceFilterFieldMapper := resourcefilter.NewFieldMapper()
resourceFilterConditionBuilder := resourcefilter.NewConditionBuilder(resourceFilterFieldMapper)
resourceFilterStmtBuilder := resourcefilter.NewTraceResourceFilterStatementBuilder(
providerSettings,
resourceFilterFieldMapper,
resourceFilterConditionBuilder,
metadataStore,
)
traceAggExprRewriter := querybuilder.NewAggExprRewriter(providerSettings, nil, traceFieldMapper, traceConditionBuilder, nil)
traceStmtBuilder := telemetrytraces.NewTraceQueryStatementBuilder(
providerSettings,
metadataStore,
traceFieldMapper,
traceConditionBuilder,
resourceFilterStmtBuilder,
traceAggExprRewriter,
telemetryStore,
)
return querier.New(
providerSettings,
telemetryStore,
metadataStore,
nil, // prometheus
traceStmtBuilder, // traceStmtBuilder
nil, // logStmtBuilder
nil, // metricStmtBuilder
nil, // meterStmtBuilder
nil, // traceOperatorStmtBuilder
nil, // bucketCache
)
}

View File

@@ -4,7 +4,7 @@ import (
"context"
"time"
"github.com/SigNoz/signoz/pkg/types/ruletypes"
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/valuer"
)

View File

@@ -10,16 +10,18 @@ import (
"github.com/google/uuid"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
)
// TestNotification prepares a dummy rule for given rule parameters and
// sends a test notification. returns alert count and error (if any)
func defaultTestNotification(opts PrepareTestRuleOptions) (int, error) {
func defaultTestNotification(opts PrepareTestRuleOptions) (int, *model.ApiError) {
ctx := context.Background()
if opts.Rule == nil {
return 0, errors.NewInvalidInputf(errors.CodeInvalidInput, "rule is required")
return 0, model.BadRequest(fmt.Errorf("rule is required"))
}
parsedRule := opts.Rule
@@ -39,14 +41,15 @@ func defaultTestNotification(opts PrepareTestRuleOptions) (int, error) {
if parsedRule.RuleType == ruletypes.RuleTypeThreshold {
// add special labels for test alerts
parsedRule.Labels[ruletypes.RuleSourceLabel] = ""
parsedRule.Labels[ruletypes.AlertRuleIDLabel] = ""
parsedRule.Labels[labels.RuleSourceLabel] = ""
parsedRule.Labels[labels.AlertRuleIdLabel] = ""
// create a threshold rule
rule, err = NewThresholdRule(
alertname,
opts.OrgID,
parsedRule,
opts.Reader,
opts.Querier,
opts.Logger,
WithSendAlways(),
@@ -58,7 +61,7 @@ func defaultTestNotification(opts PrepareTestRuleOptions) (int, error) {
if err != nil {
slog.Error("failed to prepare a new threshold rule for test", errors.Attr(err))
return 0, err
return 0, model.BadRequest(err)
}
} else if parsedRule.RuleType == ruletypes.RuleTypeProm {
@@ -69,6 +72,7 @@ func defaultTestNotification(opts PrepareTestRuleOptions) (int, error) {
opts.OrgID,
parsedRule,
opts.Logger,
opts.Reader,
opts.ManagerOpts.Prometheus,
WithSendAlways(),
WithSendUnmatched(),
@@ -79,10 +83,10 @@ func defaultTestNotification(opts PrepareTestRuleOptions) (int, error) {
if err != nil {
slog.Error("failed to prepare a new promql rule for test", errors.Attr(err))
return 0, err
return 0, model.BadRequest(err)
}
} else {
return 0, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid rule type")
return 0, model.BadRequest(fmt.Errorf("failed to derive ruletype with given information"))
}
// set timestamp to current utc time
@@ -90,8 +94,8 @@ func defaultTestNotification(opts PrepareTestRuleOptions) (int, error) {
alertsFound, err := rule.Eval(ctx, ts)
if err != nil {
slog.Error("evaluating rule failed", "rule_name", rule.Name(), errors.Attr(err))
return 0, err
slog.Error("evaluating rule failed", "rule", rule.Name(), errors.Attr(err))
return 0, model.InternalError(fmt.Errorf("rule evaluation failed"))
}
rule.SendAlerts(ctx, ts, 0, time.Duration(1*time.Minute), opts.NotifyFunc)

View File

@@ -1,33 +1,67 @@
package rules
import (
"bytes"
"context"
"encoding/json"
"fmt"
"log/slog"
"math"
"net/url"
"reflect"
"text/template"
"time"
"github.com/SigNoz/signoz/pkg/contextlinks"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/query-service/common"
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/query-service/postprocess"
"github.com/SigNoz/signoz/pkg/transition"
"github.com/SigNoz/signoz/pkg/types/ctxtypes"
"github.com/SigNoz/signoz/pkg/types/instrumentationtypes"
"github.com/SigNoz/signoz/pkg/types/rulestatehistorytypes"
"github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/SigNoz/signoz/pkg/query-service/app/querier"
querierV2 "github.com/SigNoz/signoz/pkg/query-service/app/querier/v2"
"github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder"
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
querytemplate "github.com/SigNoz/signoz/pkg/query-service/utils/queryTemplate"
"github.com/SigNoz/signoz/pkg/query-service/utils/times"
"github.com/SigNoz/signoz/pkg/query-service/utils/timestamp"
logsv3 "github.com/SigNoz/signoz/pkg/query-service/app/logs/v3"
tracesV4 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v4"
"github.com/SigNoz/signoz/pkg/units"
querierV5 "github.com/SigNoz/signoz/pkg/querier"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
)
type ThresholdRule struct {
*BaseRule
// Ever since we introduced the new metrics query builder, the version is "v4"
// for all the rules
// if the version is "v3", then we use the old querier
// if the version is "v4", then we use the new querierV2
version string
querier querier.Querier
// querier is used for alerts created before the introduction of new metrics query builder
querier interfaces.Querier
// querierV2 is used for alerts created after the introduction of new metrics query builder
querierV2 interfaces.Querier
// querierV5 is used for alerts migrated after the introduction of new query builder
querierV5 querierV5.Querier
// used for attribute metadata enrichment for logs and traces
logsKeys map[string]v3.AttributeKey
spansKeys map[string]v3.AttributeKey
}
var _ Rule = (*ThresholdRule)(nil)
@@ -36,7 +70,8 @@ func NewThresholdRule(
id string,
orgID valuer.UUID,
p *ruletypes.PostableRule,
querier querier.Querier,
reader interfaces.Reader,
querierV5 querierV5.Querier,
logger *slog.Logger,
opts ...RuleOption,
) (*ThresholdRule, error) {
@@ -44,33 +79,211 @@ func NewThresholdRule(
opts = append(opts, WithLogger(logger))
baseRule, err := NewBaseRule(id, orgID, p, opts...)
baseRule, err := NewBaseRule(id, orgID, p, reader, opts...)
if err != nil {
return nil, err
}
return &ThresholdRule{
t := ThresholdRule{
BaseRule: baseRule,
querier: querier,
}, nil
version: p.Version,
}
querierOption := querier.QuerierOptions{
Reader: reader,
Cache: nil,
KeyGenerator: queryBuilder.NewKeyGenerator(),
}
querierOptsV2 := querierV2.QuerierOptions{
Reader: reader,
Cache: nil,
KeyGenerator: queryBuilder.NewKeyGenerator(),
}
t.querier = querier.NewQuerier(querierOption)
t.querierV2 = querierV2.NewQuerier(querierOptsV2)
t.querierV5 = querierV5
t.reader = reader
return &t, nil
}
func (r *ThresholdRule) hostFromSource() string {
parsedURL, err := url.Parse(r.source)
parsedUrl, err := url.Parse(r.source)
if err != nil {
return ""
}
if parsedURL.Port() != "" {
return fmt.Sprintf("%s://%s:%s", parsedURL.Scheme, parsedURL.Hostname(), parsedURL.Port())
if parsedUrl.Port() != "" {
return fmt.Sprintf("%s://%s:%s", parsedUrl.Scheme, parsedUrl.Hostname(), parsedUrl.Port())
}
return fmt.Sprintf("%s://%s", parsedURL.Scheme, parsedURL.Hostname())
return fmt.Sprintf("%s://%s", parsedUrl.Scheme, parsedUrl.Hostname())
}
func (r *ThresholdRule) Type() ruletypes.RuleType {
return ruletypes.RuleTypeThreshold
}
func (r *ThresholdRule) prepareQueryRange(ctx context.Context, ts time.Time) (*qbtypes.QueryRangeRequest, error) {
func (r *ThresholdRule) prepareQueryRange(ctx context.Context, ts time.Time) (*v3.QueryRangeParamsV3, error) {
r.logger.InfoContext(
ctx, "prepare query range request v4", "ts", ts.UnixMilli(), "eval_window", r.evalWindow.Milliseconds(), "eval_delay", r.evalDelay.Milliseconds(),
)
startTs, endTs := r.Timestamps(ts)
start, end := startTs.UnixMilli(), endTs.UnixMilli()
if r.ruleCondition.QueryType() == v3.QueryTypeClickHouseSQL {
params := &v3.QueryRangeParamsV3{
Start: start,
End: end,
Step: int64(math.Max(float64(common.MinAllowedStepInterval(start, end)), 60)),
CompositeQuery: &v3.CompositeQuery{
QueryType: r.ruleCondition.CompositeQuery.QueryType,
PanelType: r.ruleCondition.CompositeQuery.PanelType,
BuilderQueries: make(map[string]*v3.BuilderQuery),
ClickHouseQueries: make(map[string]*v3.ClickHouseQuery),
PromQueries: make(map[string]*v3.PromQuery),
Unit: r.ruleCondition.CompositeQuery.Unit,
},
Variables: make(map[string]interface{}),
NoCache: true,
}
querytemplate.AssignReservedVarsV3(params)
for name, chQuery := range r.ruleCondition.CompositeQuery.ClickHouseQueries {
if chQuery.Disabled {
continue
}
tmpl := template.New("clickhouse-query")
tmpl, err := tmpl.Parse(chQuery.Query)
if err != nil {
return nil, err
}
var query bytes.Buffer
err = tmpl.Execute(&query, params.Variables)
if err != nil {
return nil, err
}
params.CompositeQuery.ClickHouseQueries[name] = &v3.ClickHouseQuery{
Query: query.String(),
Disabled: chQuery.Disabled,
Legend: chQuery.Legend,
}
}
return params, nil
}
if r.ruleCondition.CompositeQuery != nil && r.ruleCondition.CompositeQuery.BuilderQueries != nil {
for _, q := range r.ruleCondition.CompositeQuery.BuilderQueries {
// If the step interval is less than the minimum allowed step interval, set it to the minimum allowed step interval
if minStep := common.MinAllowedStepInterval(start, end); q.StepInterval < minStep {
q.StepInterval = minStep
}
q.SetShiftByFromFunc()
if q.DataSource == v3.DataSourceMetrics {
// if the time range is greater than 1 day, and less than 1 week set the step interval to be multiple of 5 minutes
// if the time range is greater than 1 week, set the step interval to be multiple of 30 mins
if end-start >= 24*time.Hour.Milliseconds() && end-start < 7*24*time.Hour.Milliseconds() {
q.StepInterval = int64(math.Round(float64(q.StepInterval)/300)) * 300
} else if end-start >= 7*24*time.Hour.Milliseconds() {
q.StepInterval = int64(math.Round(float64(q.StepInterval)/1800)) * 1800
}
}
}
}
if r.ruleCondition.CompositeQuery.PanelType != v3.PanelTypeGraph {
r.ruleCondition.CompositeQuery.PanelType = v3.PanelTypeGraph
}
// default mode
return &v3.QueryRangeParamsV3{
Start: start,
End: end,
Step: int64(math.Max(float64(common.MinAllowedStepInterval(start, end)), 60)),
CompositeQuery: r.ruleCondition.CompositeQuery,
Variables: make(map[string]interface{}),
NoCache: true,
}, nil
}
func (r *ThresholdRule) prepareLinksToLogs(ctx context.Context, ts time.Time, lbls labels.Labels) string {
if r.version == "v5" {
return r.prepareLinksToLogsV5(ctx, ts, lbls)
}
selectedQuery := r.GetSelectedQuery()
qr, err := r.prepareQueryRange(ctx, ts)
if err != nil {
return ""
}
start := time.UnixMilli(qr.Start)
end := time.UnixMilli(qr.End)
// TODO(srikanthccv): handle formula queries
if selectedQuery < "A" || selectedQuery > "Z" {
return ""
}
q := r.ruleCondition.CompositeQuery.BuilderQueries[selectedQuery]
if q == nil {
return ""
}
if q.DataSource != v3.DataSourceLogs {
return ""
}
queryFilter := []v3.FilterItem{}
if q.Filters != nil {
queryFilter = q.Filters.Items
}
filterItems := contextlinks.PrepareFilters(lbls.Map(), queryFilter, q.GroupBy, r.logsKeys)
return contextlinks.PrepareLinksToLogs(start, end, filterItems)
}
func (r *ThresholdRule) prepareLinksToTraces(ctx context.Context, ts time.Time, lbls labels.Labels) string {
if r.version == "v5" {
return r.prepareLinksToTracesV5(ctx, ts, lbls)
}
selectedQuery := r.GetSelectedQuery()
qr, err := r.prepareQueryRange(ctx, ts)
if err != nil {
return ""
}
start := time.UnixMilli(qr.Start)
end := time.UnixMilli(qr.End)
// TODO(srikanthccv): handle formula queries
if selectedQuery < "A" || selectedQuery > "Z" {
return ""
}
q := r.ruleCondition.CompositeQuery.BuilderQueries[selectedQuery]
if q == nil {
return ""
}
if q.DataSource != v3.DataSourceTraces {
return ""
}
queryFilter := []v3.FilterItem{}
if q.Filters != nil {
queryFilter = q.Filters.Items
}
filterItems := contextlinks.PrepareFilters(lbls.Map(), queryFilter, q.GroupBy, r.spansKeys)
return contextlinks.PrepareLinksToTraces(start, end, filterItems)
}
func (r *ThresholdRule) prepareQueryRangeV5(ctx context.Context, ts time.Time) (*qbtypes.QueryRangeRequest, error) {
r.logger.InfoContext(
ctx, "prepare query range request v5", "ts", ts.UnixMilli(), "eval_window", r.evalWindow.Milliseconds(), "eval_delay", r.evalDelay.Milliseconds(),
)
@@ -92,10 +305,10 @@ func (r *ThresholdRule) prepareQueryRange(ctx context.Context, ts time.Time) (*q
return req, nil
}
func (r *ThresholdRule) prepareLinksToLogs(ctx context.Context, ts time.Time, lbls ruletypes.Labels) string {
selectedQuery := r.GetSelectedQuery(ctx)
func (r *ThresholdRule) prepareLinksToLogsV5(ctx context.Context, ts time.Time, lbls labels.Labels) string {
selectedQuery := r.GetSelectedQuery()
qr, err := r.prepareQueryRange(ctx, ts)
qr, err := r.prepareQueryRangeV5(ctx, ts)
if err != nil {
return ""
}
@@ -132,10 +345,10 @@ func (r *ThresholdRule) prepareLinksToLogs(ctx context.Context, ts time.Time, lb
return contextlinks.PrepareLinksToLogsV5(start, end, whereClause)
}
func (r *ThresholdRule) prepareLinksToTraces(ctx context.Context, ts time.Time, lbls ruletypes.Labels) string {
selectedQuery := r.GetSelectedQuery(ctx)
func (r *ThresholdRule) prepareLinksToTracesV5(ctx context.Context, ts time.Time, lbls labels.Labels) string {
selectedQuery := r.GetSelectedQuery()
qr, err := r.prepareQueryRange(ctx, ts)
qr, err := r.prepareQueryRangeV5(ctx, ts)
if err != nil {
return ""
}
@@ -172,12 +385,8 @@ func (r *ThresholdRule) prepareLinksToTraces(ctx context.Context, ts time.Time,
return contextlinks.PrepareLinksToTracesV5(start, end, whereClause)
}
func (r *ThresholdRule) GetSelectedQuery(ctx context.Context) string {
if r.ruleCondition.SelectedQuery != "" {
return r.ruleCondition.SelectedQuery
}
r.logger.WarnContext(ctx, "missing selected query", "rule_name", r.Name())
return r.ruleCondition.SelectedQueryName()
func (r *ThresholdRule) GetSelectedQuery() string {
return r.ruleCondition.GetSelectedQueryName()
}
func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID, ts time.Time) (ruletypes.Vector, error) {
@@ -185,31 +394,74 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID,
if err != nil {
return nil, err
}
err = r.PopulateTemporality(ctx, orgID, params)
if err != nil {
return nil, fmt.Errorf("internal error while setting temporality")
}
var results []*qbtypes.TimeSeriesData
if params.CompositeQuery.QueryType == v3.QueryTypeBuilder {
hasLogsQuery := false
hasTracesQuery := false
for _, query := range params.CompositeQuery.BuilderQueries {
if query.DataSource == v3.DataSourceLogs {
hasLogsQuery = true
}
if query.DataSource == v3.DataSourceTraces {
hasTracesQuery = true
}
}
if hasLogsQuery {
// check if any enrichment is required for logs if yes then enrich them
if logsv3.EnrichmentRequired(params) {
logsFields, apiErr := r.reader.GetLogFieldsFromNames(ctx, logsv3.GetFieldNames(params.CompositeQuery))
if apiErr != nil {
return nil, apiErr.ToError()
}
logsKeys := model.GetLogFieldsV3(ctx, params, logsFields)
r.logsKeys = logsKeys
logsv3.Enrich(params, logsKeys)
}
}
if hasTracesQuery {
spanKeys, err := r.reader.GetSpanAttributeKeysByNames(ctx, logsv3.GetFieldNames(params.CompositeQuery))
if err != nil {
return nil, err
}
r.spansKeys = spanKeys
tracesV4.Enrich(params, spanKeys)
}
}
var results []*v3.Result
var queryErrors map[string]error
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
instrumentationtypes.CodeNamespace: "rules",
instrumentationtypes.CodeFunctionName: "buildAndRunQuery",
})
v5Result, err := r.querier.QueryRange(ctx, orgID, params)
if err != nil {
return nil, err
if r.version == "v4" {
results, queryErrors, err = r.querierV2.QueryRange(ctx, orgID, params)
} else {
results, queryErrors, err = r.querier.QueryRange(ctx, orgID, params)
}
for _, item := range v5Result.Data.Results {
if tsData, ok := item.(*qbtypes.TimeSeriesData); ok {
results = append(results, tsData)
} else {
// NOTE: should not happen but just to ensure we don't miss it if it happens for some reason
r.logger.WarnContext(ctx, "expected qbtypes.TimeSeriesData but got", "item_type", reflect.TypeOf(item))
if err != nil {
r.logger.ErrorContext(ctx, "failed to get alert query range result", "rule_name", r.Name(), errors.Attr(err), "query_errors", queryErrors)
return nil, fmt.Errorf("internal error while querying")
}
if params.CompositeQuery.QueryType == v3.QueryTypeBuilder {
results, err = postprocess.PostProcessResult(results, params)
if err != nil {
r.logger.ErrorContext(ctx, "failed to post process result", "rule_name", r.Name(), errors.Attr(err))
return nil, fmt.Errorf("internal error while post processing")
}
}
selectedQuery := r.GetSelectedQuery(ctx)
selectedQuery := r.GetSelectedQuery()
var queryResult *qbtypes.TimeSeriesData
var queryResult *v3.Result
for _, res := range results {
if res.QueryName == selectedQuery {
queryResult = res
@@ -217,24 +469,88 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID,
}
}
hasData := queryResult != nil &&
len(queryResult.Aggregations) > 0 &&
queryResult.Aggregations[0] != nil &&
len(queryResult.Aggregations[0].Series) > 0
hasData := queryResult != nil && len(queryResult.Series) > 0
if missingDataAlert := r.HandleMissingDataAlert(ctx, ts, hasData); missingDataAlert != nil {
return ruletypes.Vector{*missingDataAlert}, nil
}
var resultVector ruletypes.Vector
if queryResult == nil || len(queryResult.Aggregations) == 0 || queryResult.Aggregations[0] == nil {
if queryResult == nil {
r.logger.WarnContext(ctx, "query result is nil", "rule_name", r.Name(), "query_name", selectedQuery)
return resultVector, nil
}
for _, series := range queryResult.Series {
if !r.Condition().ShouldEval(series) {
r.logger.InfoContext(ctx, "not enough data points to evaluate series, skipping", "ruleid", r.ID(), "numPoints", len(series.Points), "requiredPoints", r.Condition().RequiredNumPoints)
continue
}
resultSeries, err := r.Threshold.Eval(*series, r.Unit(), ruletypes.EvalData{
ActiveAlerts: r.ActiveAlertsLabelFP(),
SendUnmatched: r.ShouldSendUnmatched(),
})
if err != nil {
return nil, err
}
resultVector = append(resultVector, resultSeries...)
}
return resultVector, nil
}
func (r *ThresholdRule) buildAndRunQueryV5(ctx context.Context, orgID valuer.UUID, ts time.Time) (ruletypes.Vector, error) {
params, err := r.prepareQueryRangeV5(ctx, ts)
if err != nil {
return nil, err
}
var results []*v3.Result
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
instrumentationtypes.CodeNamespace: "rules",
instrumentationtypes.CodeFunctionName: "buildAndRunQueryV5",
})
v5Result, err := r.querierV5.QueryRange(ctx, orgID, params)
if err != nil {
r.logger.ErrorContext(ctx, "failed to get alert query result", "rule_name", r.Name(), errors.Attr(err))
return nil, fmt.Errorf("internal error while querying")
}
for _, item := range v5Result.Data.Results {
if tsData, ok := item.(*qbtypes.TimeSeriesData); ok {
results = append(results, transition.ConvertV5TimeSeriesDataToV4Result(tsData))
} else {
// NOTE: should not happen but just to ensure we don't miss it if it happens for some reason
r.logger.WarnContext(ctx, "expected qbtypes.TimeSeriesData but got", "item_type", reflect.TypeOf(item))
}
}
selectedQuery := r.GetSelectedQuery()
var queryResult *v3.Result
for _, res := range results {
if res.QueryName == selectedQuery {
queryResult = res
break
}
}
hasData := queryResult != nil && len(queryResult.Series) > 0
if missingDataAlert := r.HandleMissingDataAlert(ctx, ts, hasData); missingDataAlert != nil {
return ruletypes.Vector{*missingDataAlert}, nil
}
var resultVector ruletypes.Vector
if queryResult == nil {
r.logger.WarnContext(ctx, "query result is nil", "rule_name", r.Name(), "query_name", selectedQuery)
return resultVector, nil
}
// Filter out new series if newGroupEvalDelay is configured
seriesToProcess := queryResult.Aggregations[0].Series
seriesToProcess := queryResult.Series
if r.ShouldSkipNewGroups() {
filteredSeries, filterErr := r.BaseRule.FilterNewSeries(ctx, ts, seriesToProcess)
// In case of error we log the error and continue with the original series
@@ -247,10 +563,10 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID,
for _, series := range seriesToProcess {
if !r.Condition().ShouldEval(series) {
r.logger.InfoContext(ctx, "not enough data points to evaluate series, skipping", "rule_id", r.ID(), "num_points", len(series.Values), "required_points", r.Condition().RequiredNumPoints)
r.logger.InfoContext(ctx, "not enough data points to evaluate series, skipping", "ruleid", r.ID(), "numPoints", len(series.Points), "requiredPoints", r.Condition().RequiredNumPoints)
continue
}
resultSeries, err := r.Threshold.Eval(series, r.Unit(), ruletypes.EvalData{
resultSeries, err := r.Threshold.Eval(*series, r.Unit(), ruletypes.EvalData{
ActiveAlerts: r.ActiveAlertsLabelFP(),
SendUnmatched: r.ShouldSendUnmatched(),
})
@@ -271,7 +587,13 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (int, error) {
var res ruletypes.Vector
var err error
res, err = r.buildAndRunQuery(ctx, r.orgID, ts)
if r.version == "v5" {
r.logger.InfoContext(ctx, "running v5 query")
res, err = r.buildAndRunQueryV5(ctx, r.orgID, ts)
} else {
r.logger.InfoContext(ctx, "running v4 query")
res, err = r.buildAndRunQuery(ctx, r.orgID, ts)
}
if err != nil {
return 0, err
@@ -312,6 +634,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (int, error) {
defs+text,
"__alert_"+r.Name(),
tmplData,
times.Time(timestamp.FromTime(ts)),
nil,
)
result, err := tmpl.Expand()
@@ -322,24 +645,24 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (int, error) {
return result
}
lb := ruletypes.NewBuilder(smpl.Metric...).Del(ruletypes.MetricNameLabel).Del(ruletypes.TemporalityLabel)
resultLabels := ruletypes.NewBuilder(smpl.Metric...).Del(ruletypes.MetricNameLabel).Del(ruletypes.TemporalityLabel).Labels()
lb := labels.NewBuilder(smpl.Metric).Del(labels.MetricNameLabel).Del(labels.TemporalityLabel)
resultLabels := labels.NewBuilder(smpl.Metric).Del(labels.MetricNameLabel).Del(labels.TemporalityLabel).Labels()
for name, value := range r.labels.Map() {
lb.Set(name, expand(value))
}
lb.Set(ruletypes.AlertNameLabel, r.Name())
lb.Set(ruletypes.AlertRuleIDLabel, r.ID())
lb.Set(ruletypes.RuleSourceLabel, r.GeneratorURL())
lb.Set(labels.AlertNameLabel, r.Name())
lb.Set(labels.AlertRuleIdLabel, r.ID())
lb.Set(labels.RuleSourceLabel, r.GeneratorURL())
annotations := make(ruletypes.Labels, 0, len(r.annotations.Map()))
annotations := make(labels.Labels, 0, len(r.annotations.Map()))
for name, value := range r.annotations.Map() {
annotations = append(annotations, ruletypes.Label{Name: name, Value: expand(value)})
annotations = append(annotations, labels.Label{Name: name, Value: expand(value)})
}
if smpl.IsMissing {
lb.Set(ruletypes.AlertNameLabel, "[No data] "+r.Name())
lb.Set(ruletypes.NoDataLabel, "true")
lb.Set(labels.AlertNameLabel, "[No data] "+r.Name())
lb.Set(labels.NoDataLabel, "true")
}
// Links with timestamps should go in annotations since labels
@@ -350,13 +673,13 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (int, error) {
link := r.prepareLinksToTraces(ctx, ts, smpl.Metric)
if link != "" && r.hostFromSource() != "" {
r.logger.InfoContext(ctx, "adding traces link to annotations", "link", fmt.Sprintf("%s/traces-explorer?%s", r.hostFromSource(), link))
annotations = append(annotations, ruletypes.Label{Name: "related_traces", Value: fmt.Sprintf("%s/traces-explorer?%s", r.hostFromSource(), link)})
annotations = append(annotations, labels.Label{Name: "related_traces", Value: fmt.Sprintf("%s/traces-explorer?%s", r.hostFromSource(), link)})
}
case ruletypes.AlertTypeLogs:
link := r.prepareLinksToLogs(ctx, ts, smpl.Metric)
if link != "" && r.hostFromSource() != "" {
r.logger.InfoContext(ctx, "adding logs link to annotations", "link", fmt.Sprintf("%s/logs/logs-explorer?%s", r.hostFromSource(), link))
annotations = append(annotations, ruletypes.Label{Name: "related_logs", Value: fmt.Sprintf("%s/logs/logs-explorer?%s", r.hostFromSource(), link)})
annotations = append(annotations, labels.Label{Name: "related_logs", Value: fmt.Sprintf("%s/logs/logs-explorer?%s", r.hostFromSource(), link)})
}
}
@@ -365,15 +688,15 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (int, error) {
resultFPs[h] = struct{}{}
if _, ok := alerts[h]; ok {
return 0, errors.NewInternalf(errors.CodeInternal, "duplicate alert found, vector contains metrics with the same labelset after applying alert labels")
return 0, fmt.Errorf("duplicate alert found, vector contains metrics with the same labelset after applying alert labels")
}
alerts[h] = &ruletypes.Alert{
Labels: lbs,
QueryResultLabels: resultLabels,
QueryResultLables: resultLabels,
Annotations: annotations,
ActiveAt: ts,
State: ruletypes.StatePending,
State: model.StatePending,
Value: smpl.V,
GeneratorURL: r.GeneratorURL(),
Receivers: ruleReceiverMap[lbs.Map()[ruletypes.LabelThresholdName]],
@@ -388,7 +711,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (int, error) {
for h, a := range alerts {
// Check whether we already have alerting state for the identifying label set.
// Update the last value and annotations if so, create a new alert entry otherwise.
if alert, ok := r.Active[h]; ok && alert.State != ruletypes.StateInactive {
if alert, ok := r.Active[h]; ok && alert.State != model.StateInactive {
alert.Value = a.Value
alert.Annotations = a.Annotations
@@ -404,78 +727,78 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (int, error) {
r.Active[h] = a
}
itemsToAdd := []rulestatehistorytypes.RuleStateHistory{}
itemsToAdd := []model.RuleStateHistory{}
// Check if any pending alerts should be removed or fire now. Write out alert timeseries.
for fp, a := range r.Active {
labelsJSON, err := json.Marshal(a.QueryResultLabels)
labelsJSON, err := json.Marshal(a.QueryResultLables)
if err != nil {
r.logger.ErrorContext(ctx, "error marshaling labels", errors.Attr(err), "labels", a.Labels)
}
if _, ok := resultFPs[fp]; !ok {
// If the alert was previously firing, keep it around for a given
// retention time so it is reported as resolved to the AlertManager.
if a.State == ruletypes.StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > ruletypes.ResolvedRetention) {
if a.State == model.StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > ruletypes.ResolvedRetention) {
delete(r.Active, fp)
}
if a.State != ruletypes.StateInactive {
if a.State != model.StateInactive {
r.logger.DebugContext(ctx, "converting firing alert to inActive", "name", r.Name())
a.State = ruletypes.StateInactive
a.State = model.StateInactive
a.ResolvedAt = ts
itemsToAdd = append(itemsToAdd, rulestatehistorytypes.RuleStateHistory{
itemsToAdd = append(itemsToAdd, model.RuleStateHistory{
RuleID: r.ID(),
RuleName: r.Name(),
State: ruletypes.StateInactive,
State: model.StateInactive,
StateChanged: true,
UnixMilli: ts.UnixMilli(),
Labels: rulestatehistorytypes.LabelsString(labelsJSON),
Fingerprint: a.QueryResultLabels.Hash(),
Labels: model.LabelsString(labelsJSON),
Fingerprint: a.QueryResultLables.Hash(),
Value: a.Value,
})
}
continue
}
if a.State == ruletypes.StatePending && ts.Sub(a.ActiveAt) >= r.holdDuration.Duration() {
if a.State == model.StatePending && ts.Sub(a.ActiveAt) >= r.holdDuration.Duration() {
r.logger.DebugContext(ctx, "converting pending alert to firing", "name", r.Name())
a.State = ruletypes.StateFiring
a.State = model.StateFiring
a.FiredAt = ts
state := ruletypes.StateFiring
state := model.StateFiring
if a.Missing {
state = ruletypes.StateNoData
state = model.StateNoData
}
itemsToAdd = append(itemsToAdd, rulestatehistorytypes.RuleStateHistory{
itemsToAdd = append(itemsToAdd, model.RuleStateHistory{
RuleID: r.ID(),
RuleName: r.Name(),
State: state,
StateChanged: true,
UnixMilli: ts.UnixMilli(),
Labels: rulestatehistorytypes.LabelsString(labelsJSON),
Fingerprint: a.QueryResultLabels.Hash(),
Labels: model.LabelsString(labelsJSON),
Fingerprint: a.QueryResultLables.Hash(),
Value: a.Value,
})
}
// We need to change firing alert to recovering if the returned sample meets recovery threshold
changeAlertingToRecovering := a.State == ruletypes.StateFiring && a.IsRecovering
changeAlertingToRecovering := a.State == model.StateFiring && a.IsRecovering
// We need to change recovering alerts to firing if the returned sample meets target threshold
changeRecoveringToFiring := a.State == ruletypes.StateRecovering && !a.IsRecovering && !a.Missing
changeRecoveringToFiring := a.State == model.StateRecovering && !a.IsRecovering && !a.Missing
// in any of the above case we need to update the status of alert
if changeAlertingToRecovering || changeRecoveringToFiring {
state := ruletypes.StateRecovering
state := model.StateRecovering
if changeRecoveringToFiring {
state = ruletypes.StateFiring
state = model.StateFiring
}
a.State = state
r.logger.DebugContext(ctx, "converting alert state", "name", r.Name(), "state", state)
itemsToAdd = append(itemsToAdd, rulestatehistorytypes.RuleStateHistory{
itemsToAdd = append(itemsToAdd, model.RuleStateHistory{
RuleID: r.ID(),
RuleName: r.Name(),
State: state,
StateChanged: true,
UnixMilli: ts.UnixMilli(),
Labels: rulestatehistorytypes.LabelsString(labelsJSON),
Fingerprint: a.QueryResultLabels.Hash(),
Labels: model.LabelsString(labelsJSON),
Fingerprint: a.QueryResultLables.Hash(),
Value: a.Value,
})
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,35 @@
package times
import (
"math"
"time"
)
const (
// MinimumTick is the minimum supported time resolution. This has to be
// at least time.Second in order for the code below to work.
minimumTick = time.Millisecond
// second is the Time duration equivalent to one second.
second = int64(time.Second / minimumTick)
// The number of nanoseconds per minimum tick.
nanosPerTick = int64(minimumTick / time.Nanosecond)
// Earliest is the earliest Time representable. Handy for
// initializing a high watermark.
Earliest = Time(math.MinInt64)
// Latest is the latest Time representable. Handy for initializing
// a low watermark.
Latest = Time(math.MaxInt64)
)
type Time int64
// TimeFromUnixNano returns the Time equivalent to the Unix Time
// t provided in nanoseconds.
func TimeFromUnixNano(t int64) Time {
return Time(t / nanosPerTick)
}
func (t Time) Time() time.Time {
return time.Unix(int64(t)/second, (int64(t)%second)*nanosPerTick)
}

View File

@@ -0,0 +1,13 @@
package timestamp
import "time"
// FromTime returns a new millisecond timestamp from a time.
func FromTime(t time.Time) int64 {
return t.Unix()*1000 + int64(t.Nanosecond())/int64(time.Millisecond)
}
// Time returns a new time.Time object from a millisecond timestamp.
func Time(ts int64) time.Time {
return time.Unix(ts/1000, (ts%1000)*int64(time.Millisecond))
}

View File

@@ -139,7 +139,8 @@ func (r *rule) GetStoredRulesByMetricName(ctx context.Context, orgID string, met
// Check conditions: must be metric-based alert with valid composite query
if ruleData.AlertType != ruletypes.AlertTypeMetric ||
ruleData.RuleCondition == nil {
ruleData.RuleCondition == nil ||
ruleData.RuleCondition.CompositeQuery == nil {
continue
}

View File

@@ -3,6 +3,7 @@ package telemetrylogs
import (
"fmt"
"slices"
"strings"
schemamigrator "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz/pkg/errors"
@@ -33,182 +34,34 @@ func NewJSONConditionBuilder(key *telemetrytypes.TelemetryFieldKey, valueType te
// BuildCondition builds the full WHERE condition for body_v2 JSON paths.
func (c *jsonConditionBuilder) buildJSONCondition(operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
conditions := []string{}
for _, node := range c.key.JSONPlan {
condition, err := c.emitPlannedCondition(node, operator, value, sb)
if err != nil {
return "", err
}
conditions = append(conditions, condition)
baseCond, err := c.emitPlannedCondition(operator, value, sb)
if err != nil {
return "", err
}
baseCond := sb.Or(conditions...)
// path index
if operator.AddDefaultExistsFilter() {
pathIndex := fmt.Sprintf(`has(%s, '%s')`, schemamigrator.JSONPathsIndexExpr(LogsV2BodyV2Column), c.key.ArrayParentPaths()[0])
return sb.And(baseCond, pathIndex), nil
}
return baseCond, nil
}
// emitPlannedCondition handles paths with array traversal.
func (c *jsonConditionBuilder) emitPlannedCondition(node *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
func (c *jsonConditionBuilder) emitPlannedCondition(operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
// Build traversal + terminal recursively per-hop
compiled, err := c.recurseArrayHops(node, operator, value, sb)
if err != nil {
return "", err
}
return compiled, nil
}
// buildTerminalCondition creates the innermost condition.
func (c *jsonConditionBuilder) buildTerminalCondition(node *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
if node.TerminalConfig.ElemType.IsArray {
conditions := []string{}
// if the value type is not an array
// TODO(piyush): Confirm the Query built for Array case and add testcases for it later
if !c.valueType.IsArray {
// if operator is a String search Operator, then we need to build one more String comparison condition along with the Strict match condition
if operator.IsStringSearchOperator() {
formattedValue := querybuilder.FormatValueForContains(value)
arrayCond, err := c.buildArrayMembershipCondition(node, operator, formattedValue, sb)
if err != nil {
return "", err
}
conditions = append(conditions, arrayCond)
}
// switch operator for array membership checks
switch operator {
case qbtypes.FilterOperatorContains:
operator = qbtypes.FilterOperatorEqual
case qbtypes.FilterOperatorNotContains:
operator = qbtypes.FilterOperatorNotEqual
}
}
arrayCond, err := c.buildArrayMembershipCondition(node, operator, value, sb)
if err != nil {
return "", err
}
conditions = append(conditions, arrayCond)
// or the conditions together
return sb.Or(conditions...), nil
}
return c.buildPrimitiveTerminalCondition(node, operator, value, sb)
}
// buildPrimitiveTerminalCondition builds the condition if the terminal node is a primitive type
// it handles the data type collisions and utilizes indexes for the condition if available.
func (c *jsonConditionBuilder) buildPrimitiveTerminalCondition(node *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
fieldPath := node.FieldPath()
conditions := []string{}
var formattedValue = value
if operator.IsStringSearchOperator() {
formattedValue = querybuilder.FormatValueForContains(value)
}
elemType := node.TerminalConfig.ElemType
fieldExpr := fmt.Sprintf("dynamicElement(%s, '%s')", fieldPath, elemType.StringValue())
fieldExpr, formattedValue = querybuilder.DataTypeCollisionHandledFieldName(node.TerminalConfig.Key, formattedValue, fieldExpr, operator)
// utilize indexes for the condition if available
indexed := slices.ContainsFunc(node.TerminalConfig.Key.Indexes, func(index telemetrytypes.JSONDataTypeIndex) bool {
return index.Type == elemType && index.ColumnExpression == fieldPath
})
if elemType.IndexSupported && indexed {
indexedExpr := assumeNotNull(fieldPath, elemType)
emptyValue := func() any {
switch elemType {
case telemetrytypes.String:
return ""
case telemetrytypes.Int64, telemetrytypes.Float64, telemetrytypes.Bool:
return 0
default:
return nil
}
}()
// switch the operator and value for exists and not exists
switch operator {
case qbtypes.FilterOperatorExists:
operator = qbtypes.FilterOperatorNotEqual
value = emptyValue
case qbtypes.FilterOperatorNotExists:
operator = qbtypes.FilterOperatorEqual
value = emptyValue
default:
// do nothing
}
indexedExpr, indexedComparisonValue := querybuilder.DataTypeCollisionHandledFieldName(node.TerminalConfig.Key, formattedValue, indexedExpr, operator)
cond, err := c.applyOperator(sb, indexedExpr, operator, indexedComparisonValue)
for _, node := range c.key.JSONPlan {
condition, err := c.recurseArrayHops(node, operator, value, sb)
if err != nil {
return "", err
}
// if qb has a definitive value, we can skip adding a condition to
// check the existence of the path in the json column
if value != emptyValue {
return cond, nil
}
conditions = append(conditions, cond)
// Switch operator to EXISTS since indexed paths on assumedNotNull, indexes will always have a default value
// So we flip the operator to Exists and filter the rows that actually have the value
operator = qbtypes.FilterOperatorExists
conditions = append(conditions, condition)
}
cond, err := c.applyOperator(sb, fieldExpr, operator, formattedValue)
if err != nil {
return "", err
}
conditions = append(conditions, cond)
if len(conditions) > 1 {
return sb.And(conditions...), nil
}
return conditions[0], nil
return sb.Or(conditions...), nil
}
// buildArrayMembershipCondition handles array membership checks.
func (c *jsonConditionBuilder) buildArrayMembershipCondition(node *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
arrayPath := node.FieldPath()
localKeyCopy := *node.TerminalConfig.Key
// create typed array out of a dynamic array
filteredDynamicExpr := func() string {
// Change the field data type from []dynamic to the value type
// since we've filtered the value type out of the dynamic array, we need to change the field data corresponding to the value type
localKeyCopy.FieldDataType = telemetrytypes.MappingJSONDataTypeToFieldDataType[telemetrytypes.ScalerTypeToArrayType[c.valueType]]
baseArrayDynamicExpr := fmt.Sprintf("dynamicElement(%s, 'Array(Dynamic)')", arrayPath)
return fmt.Sprintf("arrayMap(x->dynamicElement(x, '%s'), arrayFilter(x->(dynamicType(x) = '%s'), %s))",
c.valueType.StringValue(),
c.valueType.StringValue(),
baseArrayDynamicExpr)
}
typedArrayExpr := func() string {
return fmt.Sprintf("dynamicElement(%s, '%s')", arrayPath, node.TerminalConfig.ElemType.StringValue())
}
var arrayExpr string
if node.TerminalConfig.ElemType == telemetrytypes.ArrayDynamic {
arrayExpr = filteredDynamicExpr()
} else {
arrayExpr = typedArrayExpr()
}
key := "x"
fieldExpr, value := querybuilder.DataTypeCollisionHandledFieldName(&localKeyCopy, value, key, operator)
op, err := c.applyOperator(sb, fieldExpr, operator, value)
if err != nil {
return "", err
}
return fmt.Sprintf("arrayExists(%s -> %s, %s)", key, op, arrayExpr), nil
}
// recurseArrayHops recursively builds array traversal conditions.
func (c *jsonConditionBuilder) recurseArrayHops(current *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
if current == nil {
return "", errors.NewInternalf(CodeArrayNavigationFailed, "navigation failed, current node is nil")
@@ -222,6 +75,33 @@ func (c *jsonConditionBuilder) recurseArrayHops(current *telemetrytypes.JSONAcce
return terminalCond, nil
}
// apply NOT at top level arrayExists so that any subsequent arrayExists fails we count it as true (matching log)
yes, operator := applyNotCondition(operator)
condition, err := c.buildAccessNodeBranches(current, operator, value, sb)
if err != nil {
return "", err
}
if yes {
return sb.Not(condition), nil
}
return condition, nil
}
func applyNotCondition(operator qbtypes.FilterOperator) (bool, qbtypes.FilterOperator) {
if operator.IsNegativeOperator() {
return true, operator.Inverse()
}
return false, operator
}
// buildAccessNodeBranches builds conditions for each branch of the access node
func (c *jsonConditionBuilder) buildAccessNodeBranches(current *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
if current == nil {
return "", errors.NewInternalf(CodeArrayNavigationFailed, "navigation failed, current node is nil")
}
currAlias := current.Alias()
fieldPath := current.FieldPath()
// Determine availability of Array(JSON) and Array(Dynamic) at this hop
@@ -256,6 +136,213 @@ func (c *jsonConditionBuilder) recurseArrayHops(current *telemetrytypes.JSONAcce
return sb.Or(branches...), nil
}
// buildTerminalCondition creates the innermost condition
func (c *jsonConditionBuilder) buildTerminalCondition(node *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
if node.TerminalConfig.ElemType.IsArray {
// Note: here applyNotCondition will return true only if; top level path is an array; and operator is a negative operator
// Otherwise this code will be triggered by buildAccessNodeBranches; Where operator would've been already inverted if needed.
yes, operator := applyNotCondition(operator)
cond, err := c.buildTerminalArrayCondition(node, operator, value, sb)
if err != nil {
return "", err
}
if yes {
return sb.Not(cond), nil
}
return cond, nil
}
return c.buildPrimitiveTerminalCondition(node, operator, value, sb)
}
func getEmptyValue(elemType telemetrytypes.JSONDataType) any {
switch elemType {
case telemetrytypes.String:
return ""
case telemetrytypes.Int64, telemetrytypes.Float64, telemetrytypes.Bool:
return 0
default:
return nil
}
}
func (c *jsonConditionBuilder) terminalIndexedCondition(node *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
fieldPath := node.FieldPath()
if strings.Contains(fieldPath, telemetrytypes.ArraySepSuffix) {
return "", errors.NewInternalf(CodeArrayNavigationFailed, "can not build index condition for array field %s", fieldPath)
}
elemType := node.TerminalConfig.ElemType
dynamicExpr := fmt.Sprintf("dynamicElement(%s, '%s')", fieldPath, elemType.StringValue())
indexedExpr := assumeNotNull(dynamicExpr)
// switch the operator and value for exists and not exists
switch operator {
case qbtypes.FilterOperatorExists:
operator = qbtypes.FilterOperatorNotEqual
value = getEmptyValue(elemType)
case qbtypes.FilterOperatorNotExists:
operator = qbtypes.FilterOperatorEqual
value = getEmptyValue(elemType)
default:
// do nothing
}
indexedExpr, formattedValue := querybuilder.DataTypeCollisionHandledFieldName(node.TerminalConfig.Key, value, indexedExpr, operator)
cond, err := c.applyOperator(sb, indexedExpr, operator, formattedValue)
if err != nil {
return "", err
}
return cond, nil
}
// buildPrimitiveTerminalCondition builds the condition if the terminal node is a primitive type
// it handles the data type collisions and utilizes indexes for the condition if available
func (c *jsonConditionBuilder) buildPrimitiveTerminalCondition(node *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
fieldPath := node.FieldPath()
conditions := []string{}
// utilize indexes for the condition if available
//
// Note: Indexing code doesn't get executed for Array Nested fields because they can not be indexed
indexed := slices.ContainsFunc(node.TerminalConfig.Key.Indexes, func(index telemetrytypes.JSONDataTypeIndex) bool {
return index.Type == node.TerminalConfig.ElemType
})
if node.TerminalConfig.ElemType.IndexSupported && indexed {
indexCond, err := c.terminalIndexedCondition(node, operator, value, sb)
if err != nil {
return "", err
}
// if qb has a definitive value, we can skip adding a condition to
// check the existence of the path in the json column
if value != nil && value != getEmptyValue(node.TerminalConfig.ElemType) {
return indexCond, nil
}
conditions = append(conditions, indexCond)
// Switch operator to EXISTS except when operator is NOT EXISTS since
// indexed paths on assumedNotNull, indexes will always have a default
// value so we flip the operator to Exists and filter the rows that
// actually have the value
if operator != qbtypes.FilterOperatorNotExists {
operator = qbtypes.FilterOperatorExists
}
}
var formattedValue any = value
if operator.IsStringSearchOperator() {
formattedValue = querybuilder.FormatValueForContains(value)
}
fieldExpr := fmt.Sprintf("dynamicElement(%s, '%s')", fieldPath, node.TerminalConfig.ElemType.StringValue())
// if operator is negative and has a value comparison i.e. excluding EXISTS and NOT EXISTS, we need to assume that the field exists everywhere
//
// Note: here applyNotCondition will return true only if; top level path is being queried and operator is a negative operator
// Otherwise this code will be triggered by buildAccessNodeBranches; Where operator would've been already inverted if needed.
if node.IsNonNestedPath() {
yes, _ := applyNotCondition(operator)
if yes {
switch operator {
case qbtypes.FilterOperatorNotExists:
// skip
default:
fieldExpr = assumeNotNull(fieldExpr)
}
}
}
fieldExpr, formattedValue = querybuilder.DataTypeCollisionHandledFieldName(node.TerminalConfig.Key, formattedValue, fieldExpr, operator)
cond, err := c.applyOperator(sb, fieldExpr, operator, formattedValue)
if err != nil {
return "", err
}
conditions = append(conditions, cond)
if len(conditions) > 1 {
return sb.And(conditions...), nil
}
return conditions[0], nil
}
func (c *jsonConditionBuilder) buildTerminalArrayCondition(node *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
conditions := []string{}
// if operator is a String search Operator, then we need to build one more String comparison condition along with the Strict match condition
if operator.IsStringSearchOperator() {
formattedValue := querybuilder.FormatValueForContains(value)
arrayCond, err := c.buildArrayMembershipCondition(node, operator, formattedValue, sb)
if err != nil {
return "", err
}
conditions = append(conditions, arrayCond)
// switch operator for array membership checks
switch operator {
case qbtypes.FilterOperatorContains:
operator = qbtypes.FilterOperatorEqual
case qbtypes.FilterOperatorNotContains:
operator = qbtypes.FilterOperatorNotEqual
}
}
arrayCond, err := c.buildArrayMembershipCondition(node, operator, value, sb)
if err != nil {
return "", err
}
conditions = append(conditions, arrayCond)
if len(conditions) > 1 {
return sb.Or(conditions...), nil
}
return conditions[0], nil
}
// buildArrayMembershipCondition builds condition of the part where Arrays becomes primitive typed Arrays
// e.g. [300, 404, 500], and value operations will work on the array elements
func (c *jsonConditionBuilder) buildArrayMembershipCondition(node *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
arrayPath := node.FieldPath()
localKeyCopy := *node.TerminalConfig.Key
// create typed array out of a dynamic array
filteredDynamicExpr := func() string {
// Change the field data type from []dynamic to the value type
// since we've filtered the value type out of the dynamic array, we need to change the field data corresponding to the value type
localKeyCopy.FieldDataType = telemetrytypes.MappingJSONDataTypeToFieldDataType[telemetrytypes.ScalerTypeToArrayType[c.valueType]]
primitiveType := c.valueType.StringValue()
// check if value is an array
if c.valueType.IsArray {
primitiveType = c.valueType.ScalerType
}
baseArrayDynamicExpr := fmt.Sprintf("dynamicElement(%s, 'Array(Dynamic)')", arrayPath)
return fmt.Sprintf("arrayMap(x->dynamicElement(x, '%s'), arrayFilter(x->(dynamicType(x) = '%s'), %s))",
primitiveType,
primitiveType,
baseArrayDynamicExpr)
}
typedArrayExpr := func() string {
return fmt.Sprintf("dynamicElement(%s, '%s')", arrayPath, node.TerminalConfig.ElemType.StringValue())
}
var arrayExpr string
if node.TerminalConfig.ElemType == telemetrytypes.ArrayDynamic {
arrayExpr = filteredDynamicExpr()
} else {
arrayExpr = typedArrayExpr()
}
key := "x"
fieldExpr, value := querybuilder.DataTypeCollisionHandledFieldName(&localKeyCopy, value, key, operator)
op, err := c.applyOperator(sb, fieldExpr, operator, value)
if err != nil {
return "", err
}
return fmt.Sprintf("arrayExists(%s -> %s, %s)", key, op, arrayExpr), nil
}
func (c *jsonConditionBuilder) applyOperator(sb *sqlbuilder.SelectBuilder, fieldExpr string, operator qbtypes.FilterOperator, value any) (string, error) {
switch operator {
case qbtypes.FilterOperatorEqual:
@@ -317,6 +404,6 @@ func (c *jsonConditionBuilder) applyOperator(sb *sqlbuilder.SelectBuilder, field
}
}
func assumeNotNull(column string, elemType telemetrytypes.JSONDataType) string {
return fmt.Sprintf("assumeNotNull(dynamicElement(%s, '%s'))", column, elemType.StringValue())
func assumeNotNull(fieldExpr string) string {
return fmt.Sprintf("assumeNotNull(%s)", fieldExpr)
}

File diff suppressed because one or more lines are too long

View File

@@ -113,6 +113,29 @@ const (
FilterOperatorNotContains
)
var operatorInverseMapping = map[FilterOperator]FilterOperator{
FilterOperatorEqual: FilterOperatorNotEqual,
FilterOperatorNotEqual: FilterOperatorEqual,
FilterOperatorGreaterThan: FilterOperatorLessThanOrEq,
FilterOperatorGreaterThanOrEq: FilterOperatorLessThan,
FilterOperatorLessThan: FilterOperatorGreaterThanOrEq,
FilterOperatorLessThanOrEq: FilterOperatorGreaterThan,
FilterOperatorLike: FilterOperatorNotLike,
FilterOperatorNotLike: FilterOperatorLike,
FilterOperatorILike: FilterOperatorNotILike,
FilterOperatorNotILike: FilterOperatorILike,
FilterOperatorBetween: FilterOperatorNotBetween,
FilterOperatorNotBetween: FilterOperatorBetween,
FilterOperatorIn: FilterOperatorNotIn,
FilterOperatorNotIn: FilterOperatorIn,
FilterOperatorExists: FilterOperatorNotExists,
FilterOperatorNotExists: FilterOperatorExists,
FilterOperatorRegexp: FilterOperatorNotRegexp,
FilterOperatorNotRegexp: FilterOperatorRegexp,
FilterOperatorContains: FilterOperatorNotContains,
FilterOperatorNotContains: FilterOperatorContains,
}
// AddDefaultExistsFilter returns true if addl exists filter should be added to the query
// For the negative predicates, we don't want to add the exists filter. Why?
// Say for example, user adds a filter `service.name != "redis"`, we can't interpret it
@@ -162,6 +185,10 @@ func (f FilterOperator) IsNegativeOperator() bool {
return true
}
func (f FilterOperator) Inverse() FilterOperator {
return operatorInverseMapping[f]
}
func (f FilterOperator) IsComparisonOperator() bool {
switch f {
case FilterOperatorGreaterThan, FilterOperatorGreaterThanOrEq, FilterOperatorLessThan, FilterOperatorLessThanOrEq:

View File

@@ -96,21 +96,6 @@ type TimeSeries struct {
Values []*TimeSeriesValue `json:"values"`
}
// EvaluableValues returns only the values where Partial is false and value is not NaN or +/- Inf.
// TODO(srikanthccv): should we skip them in the consume.go?
func (ts *TimeSeries) EvaluableValues() []*TimeSeriesValue {
if ts == nil {
return nil
}
result := make([]*TimeSeriesValue, 0, len(ts.Values))
for _, v := range ts.Values {
if !v.Partial && !math.IsNaN(v.Value) && !math.IsInf(v.Value, 0) {
result = append(result, v)
}
}
return result
}
type Label struct {
Key telemetrytypes.TelemetryFieldKey `json:"key"`
Value any `json:"value"`

View File

@@ -1,9 +1,6 @@
package rulestatehistorytypes
import (
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/ruletypes"
)
import qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
// PostableRuleStateHistoryBaseQuery defines URL query params common across v2 rule history APIs.
type PostableRuleStateHistoryBaseQuery struct {
@@ -15,7 +12,7 @@ type PostableRuleStateHistoryBaseQuery struct {
type PostableRuleStateHistoryTimelineQuery struct {
Start int64 `query:"start" required:"true"`
End int64 `query:"end" required:"true"`
State ruletypes.AlertState `query:"state"`
State AlertState `query:"state"`
FilterExpression string `query:"filterExpression"`
Limit int64 `query:"limit"`
Order qbtypes.OrderDirection `query:"order"`

View File

@@ -3,13 +3,12 @@ package rulestatehistorytypes
import (
"github.com/SigNoz/signoz/pkg/errors"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/ruletypes"
)
type Query struct {
Start int64
End int64
State ruletypes.AlertState
State AlertState
FilterExpression qbtypes.Filter
Limit int64
Offset int64

View File

@@ -2,7 +2,6 @@ package rulestatehistorytypes
import (
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/ruletypes"
)
type GettableRuleStateTimeline struct {
@@ -12,16 +11,16 @@ type GettableRuleStateTimeline struct {
}
type GettableRuleStateHistory struct {
RuleID string `json:"ruleId" required:"true"`
RuleName string `json:"ruleName" required:"true"`
OverallState ruletypes.AlertState `json:"overallState" required:"true"`
OverallStateChanged bool `json:"overallStateChanged" required:"true"`
State ruletypes.AlertState `json:"state" required:"true"`
StateChanged bool `json:"stateChanged" required:"true"`
UnixMilli int64 `json:"unixMilli" required:"true"`
Labels []*qbtypes.Label `json:"labels" required:"true"`
Fingerprint uint64 `json:"fingerprint" required:"true"`
Value float64 `json:"value" required:"true"`
RuleID string `json:"ruleID" required:"true"`
RuleName string `json:"ruleName" required:"true"`
OverallState AlertState `json:"overallState" required:"true"`
OverallStateChanged bool `json:"overallStateChanged" required:"true"`
State AlertState `json:"state" required:"true"`
StateChanged bool `json:"stateChanged" required:"true"`
UnixMilli int64 `json:"unixMilli" required:"true"`
Labels []*qbtypes.Label `json:"labels" required:"true"`
Fingerprint uint64 `json:"fingerprint" required:"true"`
Value float64 `json:"value" required:"true"`
}
type GettableRuleStateHistoryContributor struct {
@@ -33,9 +32,9 @@ type GettableRuleStateHistoryContributor struct {
}
type GettableRuleStateWindow struct {
State ruletypes.AlertState `json:"state" ch:"state" required:"true"`
Start int64 `json:"start" ch:"start" required:"true"`
End int64 `json:"end" ch:"end" required:"true"`
State AlertState `json:"state" ch:"state" required:"true"`
Start int64 `json:"start" ch:"start" required:"true"`
End int64 `json:"end" ch:"end" required:"true"`
}
type GettableRuleStateHistoryStats struct {

View File

@@ -9,12 +9,36 @@ import (
"github.com/SigNoz/signoz/pkg/errors"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
type AlertState struct {
valuer.String
}
var (
StateInactive = AlertState{valuer.NewString("inactive")}
StatePending = AlertState{valuer.NewString("pending")}
StateRecovering = AlertState{valuer.NewString("recovering")}
StateFiring = AlertState{valuer.NewString("firing")}
StateNoData = AlertState{valuer.NewString("nodata")}
StateDisabled = AlertState{valuer.NewString("disabled")}
)
type LabelsString string
func (AlertState) Enum() []any {
return []any{
StateInactive,
StatePending,
StateRecovering,
StateFiring,
StateNoData,
StateDisabled,
}
}
func (l *LabelsString) Scan(src any) error {
switch data := src.(type) {
case nil:
@@ -66,15 +90,15 @@ type RuleStateHistory struct {
RuleID string `ch:"rule_id"`
RuleName string `ch:"rule_name"`
OverallState ruletypes.AlertState `ch:"overall_state"`
OverallStateChanged bool `ch:"overall_state_changed"`
OverallState AlertState `ch:"overall_state"`
OverallStateChanged bool `ch:"overall_state_changed"`
State ruletypes.AlertState `ch:"state"`
StateChanged bool `ch:"state_changed"`
UnixMilli int64 `ch:"unix_milli"`
Labels LabelsString `ch:"labels"`
Fingerprint uint64 `ch:"fingerprint"`
Value float64 `ch:"value"`
State AlertState `ch:"state"`
StateChanged bool `ch:"state_changed"`
UnixMilli int64 `ch:"unix_milli"`
Labels LabelsString `ch:"labels"`
Fingerprint uint64 `ch:"fingerprint"`
Value float64 `ch:"value"`
}
type RuleStateHistoryContributor struct {

View File

@@ -1,40 +0,0 @@
package ruletypes
import "github.com/SigNoz/signoz/pkg/valuer"
type AlertState struct {
valuer.String
}
var (
StateInactive = AlertState{valuer.NewString("inactive")}
StatePending = AlertState{valuer.NewString("pending")}
StateRecovering = AlertState{valuer.NewString("recovering")}
StateFiring = AlertState{valuer.NewString("firing")}
StateNoData = AlertState{valuer.NewString("nodata")}
StateDisabled = AlertState{valuer.NewString("disabled")}
)
func (AlertState) Enum() []any {
return []any{
StateInactive,
StatePending,
StateRecovering,
StateFiring,
StateNoData,
StateDisabled,
}
}
var alertStateSeverity = map[AlertState]int{
StateInactive: 0,
StatePending: 1,
StateRecovering: 2,
StateFiring: 3,
StateNoData: 4,
StateDisabled: 5,
}
func (a AlertState) Severity() int {
return alertStateSeverity[a]
}

View File

@@ -8,24 +8,44 @@ import (
"strings"
"time"
"github.com/SigNoz/signoz/pkg/query-service/model"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/valuer"
)
// this file contains common structs and methods used by
// rule engine
const (
// how long before re-sending the alert.
ResolvedRetention = 15 * time.Minute
TestAlertPostFix = "_TEST_ALERT"
AlertTimeFormat = "2006-01-02 15:04:05"
)
type RuleType string
const (
RuleTypeThreshold = "threshold_rule"
RuleTypeProm = "promql_rule"
RuleTypeAnomaly = "anomaly_rule"
)
type RuleHealth string
const (
HealthUnknown RuleHealth = "unknown"
HealthGood RuleHealth = "ok"
HealthBad RuleHealth = "err"
)
type Alert struct {
State AlertState
State model.AlertState
Labels Labels
Annotations Labels
Labels labels.BaseLabels
Annotations labels.BaseLabels
QueryResultLabels Labels
QueryResultLables labels.BaseLabels
GeneratorURL string
@@ -43,13 +63,8 @@ type Alert struct {
IsRecovering bool
}
type NamedAlert struct {
Name string
*Alert
}
func (a *Alert) NeedsSending(ts time.Time, resendDelay time.Duration) bool {
if a.State == StatePending {
if a.State == model.StatePending {
return false
}
@@ -61,118 +76,153 @@ func (a *Alert) NeedsSending(ts time.Time, resendDelay time.Duration) bool {
return a.LastSentAt.Add(resendDelay).Before(ts)
}
type PanelType struct {
valuer.String
type NamedAlert struct {
Name string
*Alert
}
var (
PanelTypeValue = PanelType{valuer.NewString("value")}
PanelTypeTable = PanelType{valuer.NewString("table")}
PanelTypeGraph = PanelType{valuer.NewString("graph")}
type CompareOp string
const (
CompareOpNone CompareOp = "0"
ValueIsAbove CompareOp = "1"
ValueIsBelow CompareOp = "2"
ValueIsEq CompareOp = "3"
ValueIsNotEq CompareOp = "4"
ValueAboveOrEq CompareOp = "5"
ValueBelowOrEq CompareOp = "6"
ValueOutsideBounds CompareOp = "7"
)
// Note: this is used to represent the state of the alert query
// i.e the active tab which should be used to represent the selection
type MatchType string
type QueryType struct {
valuer.String
}
var (
QueryTypeBuilder = QueryType{String: valuer.NewString("builder")}
QueryTypeClickHouseSQL = QueryType{valuer.NewString("clickhouse_sql")}
QueryTypePromQL = QueryType{valuer.NewString("promql")}
const (
MatchTypeNone MatchType = "0"
AtleastOnce MatchType = "1"
AllTheTimes MatchType = "2"
OnAverage MatchType = "3"
InTotal MatchType = "4"
Last MatchType = "5"
)
type AlertCompositeQuery struct {
Queries []qbtypes.QueryEnvelope `json:"queries"`
PanelType PanelType `json:"panelType"`
QueryType QueryType `json:"queryType"`
// Unit for the time series data shown in the graph
// This is used to format the value and threshold
Unit string `json:"unit,omitempty"`
}
type RuleCondition struct {
CompositeQuery *AlertCompositeQuery `json:"compositeQuery"`
CompareOperator CompareOperator `json:"op"`
Target *float64 `json:"target,omitempty"`
AlertOnAbsent bool `json:"alertOnAbsent,omitempty"`
AbsentFor uint64 `json:"absentFor,omitempty"`
MatchType MatchType `json:"matchType"`
TargetUnit string `json:"targetUnit,omitempty"`
Algorithm string `json:"algorithm,omitempty"`
Seasonality string `json:"seasonality,omitempty"`
SelectedQuery string `json:"selectedQueryName,omitempty"`
RequireMinPoints bool `json:"requireMinPoints,omitempty"`
RequiredNumPoints int `json:"requiredNumPoints,omitempty"`
Thresholds *RuleThresholdData `json:"thresholds,omitempty"`
CompositeQuery *v3.CompositeQuery `json:"compositeQuery,omitempty"`
CompareOp CompareOp `json:"op,omitempty"`
Target *float64 `json:"target,omitempty"`
AlertOnAbsent bool `json:"alertOnAbsent,omitempty"`
AbsentFor uint64 `json:"absentFor,omitempty"`
MatchType MatchType `json:"matchType,omitempty"`
TargetUnit string `json:"targetUnit,omitempty"`
Algorithm string `json:"algorithm,omitempty"`
Seasonality string `json:"seasonality,omitempty"`
SelectedQuery string `json:"selectedQueryName,omitempty"`
RequireMinPoints bool `json:"requireMinPoints,omitempty"`
RequiredNumPoints int `json:"requiredNumPoints,omitempty"`
Thresholds *RuleThresholdData `json:"thresholds,omitempty"`
}
func (rc *RuleCondition) SelectedQueryName() string {
func (rc *RuleCondition) GetSelectedQueryName() string {
if rc != nil {
if rc.SelectedQuery != "" {
return rc.SelectedQuery
}
queryNames := map[string]struct{}{}
queryNames := map[string]struct{}{}
for _, query := range rc.CompositeQuery.Queries {
switch spec := query.Spec.(type) {
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
if !spec.Disabled {
queryNames[spec.Name] = struct{}{}
}
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
if !spec.Disabled {
queryNames[spec.Name] = struct{}{}
}
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
if !spec.Disabled {
queryNames[spec.Name] = struct{}{}
}
case qbtypes.QueryBuilderFormula:
if !spec.Disabled {
queryNames[spec.Name] = struct{}{}
}
case qbtypes.ClickHouseQuery:
if !spec.Disabled {
queryNames[spec.Name] = struct{}{}
}
case qbtypes.PromQuery:
if !spec.Disabled {
queryNames[spec.Name] = struct{}{}
if rc.CompositeQuery != nil {
if rc.QueryType() == v3.QueryTypeBuilder {
for name := range rc.CompositeQuery.BuilderQueries {
queryNames[name] = struct{}{}
}
for _, query := range rc.CompositeQuery.Queries {
switch spec := query.Spec.(type) {
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
queryNames[spec.Name] = struct{}{}
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
queryNames[spec.Name] = struct{}{}
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
queryNames[spec.Name] = struct{}{}
case qbtypes.QueryBuilderFormula:
queryNames[spec.Name] = struct{}{}
}
}
} else if rc.QueryType() == v3.QueryTypeClickHouseSQL {
for name := range rc.CompositeQuery.ClickHouseQueries {
queryNames[name] = struct{}{}
}
for _, query := range rc.CompositeQuery.Queries {
switch spec := query.Spec.(type) {
case qbtypes.ClickHouseQuery:
queryNames[spec.Name] = struct{}{}
}
}
}
}
}
// The following logic exists for backward compatibility
// If there is no selected query, then
// - check if F1 is present, if yes, return F1
// - else return the query with max ascii value
if _, ok := queryNames["F1"]; ok {
return "F1"
// The following logic exists for backward compatibility
// If there is no selected query, then
// - check if F1 is present, if yes, return F1
// - else return the query with max ascii value
// this logic is not really correct. we should be considering
// whether the query is enabled or not. but this is a temporary
// fix to support backward compatibility
if _, ok := queryNames["F1"]; ok {
return "F1"
}
keys := make([]string, 0, len(queryNames))
for k := range queryNames {
keys = append(keys, k)
}
sort.Strings(keys)
return keys[len(keys)-1]
}
keys := make([]string, 0, len(queryNames))
for k := range queryNames {
keys = append(keys, k)
}
sort.Strings(keys)
return keys[len(keys)-1]
// This should never happen
return ""
}
func (rc *RuleCondition) IsValid() bool {
if rc.CompositeQuery == nil {
return false
}
if rc.QueryType() == v3.QueryTypeBuilder {
if rc.Thresholds == nil {
return false
}
}
if rc.QueryType() == v3.QueryTypePromQL {
if len(rc.CompositeQuery.PromQueries) == 0 && len(rc.CompositeQuery.Queries) == 0 {
return false
}
}
return true
}
// ShouldEval checks if the further series should be evaluated at all for alerts.
func (rc *RuleCondition) ShouldEval(series *qbtypes.TimeSeries) bool {
return !rc.RequireMinPoints || len(series.Values) >= rc.RequiredNumPoints
func (rc *RuleCondition) ShouldEval(series *v3.Series) bool {
if rc == nil {
return true
}
return !rc.RequireMinPoints || len(series.Points) >= rc.RequiredNumPoints
}
// QueryType is a shorthand method to get query type.
func (rc *RuleCondition) QueryType() QueryType {
return rc.CompositeQuery.QueryType
func (rc *RuleCondition) QueryType() v3.QueryType {
if rc.CompositeQuery != nil {
return rc.CompositeQuery.QueryType
}
return v3.QueryTypeUnknown
}
// String is useful in printing rule condition in logs.
func (rc *RuleCondition) String() string {
if rc == nil {
return ""
}
data, _ := json.Marshal(*rc)
return string(data)
}
@@ -180,7 +230,7 @@ func (rc *RuleCondition) String() string {
// PrepareRuleGeneratorURL creates an appropriate url for the rule. The URL is
// sent in Slack messages as well as to other systems and allows backtracking
// to the rule definition from the third party systems.
func PrepareRuleGeneratorURL(ruleID string, source string) string {
func PrepareRuleGeneratorURL(ruleId string, source string) string {
if source == "" {
return source
}
@@ -196,7 +246,7 @@ func PrepareRuleGeneratorURL(ruleID string, source string) string {
hasNew := strings.LastIndex(source, "new")
if hasNew > -1 {
ruleURL := fmt.Sprintf("%sedit?ruleId=%s", source[0:hasNew], ruleID)
ruleURL := fmt.Sprintf("%sedit?ruleId=%s", source[0:hasNew], ruleId)
return ruleURL
}
@@ -205,7 +255,7 @@ func PrepareRuleGeneratorURL(ruleID string, source string) string {
// mainly to keep the URL short and lower the alert body contents
// The generator URL with /alerts/edit?ruleId= is enough
if parsedSource.Port() != "" {
return fmt.Sprintf("%s://%s:%s/alerts/edit?ruleId=%s", parsedSource.Scheme, parsedSource.Hostname(), parsedSource.Port(), ruleID)
return fmt.Sprintf("%s://%s:%s/alerts/edit?ruleId=%s", parsedSource.Scheme, parsedSource.Hostname(), parsedSource.Port(), ruleId)
}
return fmt.Sprintf("%s://%s/alerts/edit?ruleId=%s", parsedSource.Scheme, parsedSource.Hostname(), ruleID)
return fmt.Sprintf("%s://%s/alerts/edit?ruleId=%s", parsedSource.Scheme, parsedSource.Hostname(), ruleId)
}

View File

@@ -10,7 +10,11 @@ import (
"github.com/prometheus/alertmanager/config"
"github.com/SigNoz/signoz/pkg/errors"
signozError "github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/model"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/utils/times"
"github.com/SigNoz/signoz/pkg/query-service/utils/timestamp"
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
@@ -36,7 +40,7 @@ const (
// PostableRule is used to create alerting rule from HTTP api.
type PostableRule struct {
AlertName string `json:"alert"`
AlertName string `json:"alert,omitempty"`
AlertType AlertType `json:"alertType,omitempty"`
Description string `json:"description,omitempty"`
RuleType RuleType `json:"ruleType,omitempty"`
@@ -73,17 +77,17 @@ type NotificationSettings struct {
type Renotify struct {
Enabled bool `json:"enabled"`
ReNotifyInterval valuer.TextDuration `json:"interval,omitzero"`
AlertStates []AlertState `json:"alertStates,omitempty"`
AlertStates []model.AlertState `json:"alertStates,omitempty"`
}
func (ns *NotificationSettings) GetAlertManagerNotificationConfig() alertmanagertypes.NotificationConfig {
var renotifyInterval time.Duration
var noDataRenotifyInterval time.Duration
if ns.Renotify.Enabled {
if slices.Contains(ns.Renotify.AlertStates, StateNoData) {
if slices.Contains(ns.Renotify.AlertStates, model.StateNoData) {
noDataRenotifyInterval = ns.Renotify.ReNotifyInterval.Duration()
}
if slices.Contains(ns.Renotify.AlertStates, StateFiring) {
if slices.Contains(ns.Renotify.AlertStates, model.StateFiring) {
renotifyInterval = ns.Renotify.ReNotifyInterval.Duration()
}
} else {
@@ -93,7 +97,7 @@ func (ns *NotificationSettings) GetAlertManagerNotificationConfig() alertmanager
return alertmanagertypes.NewNotificationConfig(ns.GroupBy, renotifyInterval, noDataRenotifyInterval, ns.UsePolicy)
}
func (r *PostableRule) GetRuleRouteRequest(ruleID string) ([]*alertmanagertypes.PostableRoutePolicy, error) {
func (r *PostableRule) GetRuleRouteRequest(ruleId string) ([]*alertmanagertypes.PostableRoutePolicy, error) {
threshold, err := r.RuleCondition.Thresholds.GetRuleThreshold()
if err != nil {
return nil, err
@@ -101,20 +105,20 @@ func (r *PostableRule) GetRuleRouteRequest(ruleID string) ([]*alertmanagertypes.
receivers := threshold.GetRuleReceivers()
routeRequests := make([]*alertmanagertypes.PostableRoutePolicy, 0)
for _, receiver := range receivers {
expression := fmt.Sprintf(`%s == "%s" && %s == "%s"`, LabelThresholdName, receiver.Name, LabelRuleID, ruleID)
expression := fmt.Sprintf(`%s == "%s" && %s == "%s"`, LabelThresholdName, receiver.Name, LabelRuleId, ruleId)
routeRequests = append(routeRequests, &alertmanagertypes.PostableRoutePolicy{
Expression: expression,
ExpressionKind: alertmanagertypes.RuleBasedExpression,
Channels: receiver.Channels,
Name: ruleID,
Description: fmt.Sprintf("Auto-generated route for rule %s", ruleID),
Name: ruleId,
Description: fmt.Sprintf("Auto-generated route for rule %s", ruleId),
Tags: []string{"auto-generated", "rule-based"},
})
}
return routeRequests, nil
}
func (r *PostableRule) GetInhibitRules(ruleID string) ([]config.InhibitRule, error) {
func (r *PostableRule) GetInhibitRules(ruleId string) ([]config.InhibitRule, error) {
threshold, err := r.RuleCondition.Thresholds.GetRuleThreshold()
if err != nil {
return nil, err
@@ -135,8 +139,8 @@ func (r *PostableRule) GetInhibitRules(ruleID string) ([]config.InhibitRule, err
Value: receivers[i].Name,
},
{
Name: LabelRuleID,
Value: ruleID,
Name: LabelRuleId,
Value: ruleId,
},
},
TargetMatchers: config.Matchers{
@@ -145,8 +149,8 @@ func (r *PostableRule) GetInhibitRules(ruleID string) ([]config.InhibitRule, err
Value: receivers[i+1].Name,
},
{
Name: LabelRuleID,
Value: ruleID,
Name: LabelRuleId,
Value: ruleId,
},
},
Equal: groups,
@@ -170,8 +174,8 @@ func (ns *NotificationSettings) UnmarshalJSON(data []byte) error {
// Validate states after unmarshaling
for _, state := range ns.Renotify.AlertStates {
if state != StateFiring && state != StateNoData {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid alert state: %s", state)
if state != model.StateFiring && state != model.StateNoData {
return signozError.NewInvalidInputf(signozError.CodeInvalidInput, "invalid alert state: %s", state)
}
}
@@ -181,6 +185,7 @@ func (ns *NotificationSettings) UnmarshalJSON(data []byte) error {
// processRuleDefaults applies the default values
// for the rule options that are blank or unset.
func (r *PostableRule) processRuleDefaults() {
if r.SchemaVersion == "" {
r.SchemaVersion = DefaultSchemaVersion
}
@@ -195,14 +200,21 @@ func (r *PostableRule) processRuleDefaults() {
if r.RuleCondition != nil {
switch r.RuleCondition.CompositeQuery.QueryType {
case QueryTypeBuilder:
if r.RuleType.IsZero() {
case v3.QueryTypeBuilder:
if r.RuleType == "" {
r.RuleType = RuleTypeThreshold
}
case QueryTypePromQL:
case v3.QueryTypePromQL:
r.RuleType = RuleTypeProm
}
for qLabel, q := range r.RuleCondition.CompositeQuery.BuilderQueries {
if q.AggregateAttribute.Key != "" && q.Expression == "" {
q.Expression = qLabel
}
}
//added alerts v2 fields
if r.SchemaVersion == DefaultSchemaVersion {
thresholdName := CriticalThresholdName
if r.Labels != nil {
@@ -213,7 +225,7 @@ func (r *PostableRule) processRuleDefaults() {
// For anomaly detection with ValueIsBelow, negate the target
targetValue := r.RuleCondition.Target
if r.RuleType == RuleTypeAnomaly && r.RuleCondition.CompareOperator == ValueIsBelow && targetValue != nil {
if r.RuleType == RuleTypeAnomaly && r.RuleCondition.CompareOp == ValueIsBelow && targetValue != nil {
negated := -1 * *targetValue
targetValue = &negated
}
@@ -221,12 +233,12 @@ func (r *PostableRule) processRuleDefaults() {
thresholdData := RuleThresholdData{
Kind: BasicThresholdKind,
Spec: BasicRuleThresholds{{
Name: thresholdName,
TargetUnit: r.RuleCondition.TargetUnit,
TargetValue: targetValue,
MatchType: r.RuleCondition.MatchType,
CompareOperator: r.RuleCondition.CompareOperator,
Channels: r.PreferredChannels,
Name: thresholdName,
TargetUnit: r.RuleCondition.TargetUnit,
TargetValue: targetValue,
MatchType: r.RuleCondition.MatchType,
CompareOp: r.RuleCondition.CompareOp,
Channels: r.PreferredChannels,
}},
}
r.RuleCondition.Thresholds = &thresholdData
@@ -235,11 +247,11 @@ func (r *PostableRule) processRuleDefaults() {
Renotify: Renotify{
Enabled: true,
ReNotifyInterval: valuer.MustParseTextDuration("4h"),
AlertStates: []AlertState{StateFiring},
AlertStates: []model.AlertState{model.StateFiring},
},
}
if r.RuleCondition.AlertOnAbsent {
r.NotificationSettings.Renotify.AlertStates = append(r.NotificationSettings.Renotify.AlertStates, StateNoData)
r.NotificationSettings.Renotify.AlertStates = append(r.NotificationSettings.Renotify.AlertStates, model.StateNoData)
}
}
}
@@ -270,7 +282,7 @@ func (r *PostableRule) UnmarshalJSON(bytes []byte) error {
type Alias PostableRule
aux := (*Alias)(r)
if err := json.Unmarshal(bytes, aux); err != nil {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "failed to parse json: %v", err)
return signozError.NewInvalidInputf(signozError.CodeInvalidInput, "failed to parse json: %v", err)
}
r.processRuleDefaults()
return r.validate()
@@ -292,36 +304,83 @@ func isValidLabelValue(v string) bool {
return utf8.ValidString(v)
}
func isAllQueriesDisabled(compositeQuery *v3.CompositeQuery) bool {
if compositeQuery == nil {
return false
}
if compositeQuery.BuilderQueries == nil && compositeQuery.PromQueries == nil && compositeQuery.ClickHouseQueries == nil {
return false
}
switch compositeQuery.QueryType {
case v3.QueryTypeBuilder:
if len(compositeQuery.BuilderQueries) == 0 {
return false
}
for _, query := range compositeQuery.BuilderQueries {
if !query.Disabled {
return false
}
}
case v3.QueryTypePromQL:
if len(compositeQuery.PromQueries) == 0 {
return false
}
for _, query := range compositeQuery.PromQueries {
if !query.Disabled {
return false
}
}
case v3.QueryTypeClickHouseSQL:
if len(compositeQuery.ClickHouseQueries) == 0 {
return false
}
for _, query := range compositeQuery.ClickHouseQueries {
if !query.Disabled {
return false
}
}
}
return true
}
func (r *PostableRule) validate() error {
var errs []error
if r.RuleCondition == nil {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "rule condition is required")
// will get panic if we try to access CompositeQuery, so return here
return signozError.NewInvalidInputf(signozError.CodeInvalidInput, "rule condition is required")
}
if r.RuleCondition.CompositeQuery == nil {
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "composite query is required"))
}
if r.Version != "v5" {
errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput, "only version v5 is supported, got %q", r.Version))
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "only version v5 is supported, got %q", r.Version))
}
if isAllQueriesDisabled(r.RuleCondition.CompositeQuery) {
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "all queries are disabled in rule condition"))
}
for k, v := range r.Labels {
if !isValidLabelName(k) {
errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid label name: %s", k))
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "invalid label name: %s", k))
}
if !isValidLabelValue(v) {
errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid label value: %s", v))
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "invalid label value: %s", v))
}
}
for k := range r.Annotations {
if !isValidLabelName(k) {
errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid annotation name: %s", k))
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "invalid annotation name: %s", k))
}
}
errs = append(errs, testTemplateParsing(r)...)
return errors.Join(errs...)
return signozError.Join(errs...)
}
func testTemplateParsing(rl *PostableRule) (errs []error) {
@@ -339,6 +398,7 @@ func testTemplateParsing(rl *PostableRule) (errs []error) {
defs+text,
"__alert_"+rl.AlertName,
tmplData,
times.Time(timestamp.FromTime(time.Now())),
nil,
)
return tmpl.ParseTest()
@@ -348,7 +408,7 @@ func testTemplateParsing(rl *PostableRule) (errs []error) {
for _, val := range rl.Labels {
err := parseTest(val)
if err != nil {
errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput, "template parsing error: %s", err.Error()))
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "template parsing error: %s", err.Error()))
}
}
@@ -356,7 +416,7 @@ func testTemplateParsing(rl *PostableRule) (errs []error) {
for _, val := range rl.Annotations {
err := parseTest(val)
if err != nil {
errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput, "template parsing error: %s", err.Error()))
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "template parsing error: %s", err.Error()))
}
}
@@ -370,8 +430,8 @@ type GettableRules struct {
// GettableRule has info for an alerting rules.
type GettableRule struct {
Id string `json:"id"`
State AlertState `json:"state"`
Id string `json:"id"`
State model.AlertState `json:"state"`
PostableRule
CreatedAt *time.Time `json:"createAt"`
CreatedBy *string `json:"createBy"`

View File

@@ -7,10 +7,88 @@ import (
"github.com/stretchr/testify/assert"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
)
func TestIsAllQueriesDisabled(t *testing.T) {
testCases := []*v3.CompositeQuery{
{
BuilderQueries: map[string]*v3.BuilderQuery{
"query1": {
Disabled: true,
},
"query2": {
Disabled: true,
},
},
QueryType: v3.QueryTypeBuilder,
},
nil,
{
QueryType: v3.QueryTypeBuilder,
},
{
QueryType: v3.QueryTypeBuilder,
BuilderQueries: map[string]*v3.BuilderQuery{
"query1": {
Disabled: true,
},
"query2": {
Disabled: false,
},
},
},
{
QueryType: v3.QueryTypePromQL,
},
{
QueryType: v3.QueryTypePromQL,
PromQueries: map[string]*v3.PromQuery{
"query3": {
Disabled: false,
},
},
},
{
QueryType: v3.QueryTypePromQL,
PromQueries: map[string]*v3.PromQuery{
"query3": {
Disabled: true,
},
},
},
{
QueryType: v3.QueryTypeClickHouseSQL,
},
{
QueryType: v3.QueryTypeClickHouseSQL,
ClickHouseQueries: map[string]*v3.ClickHouseQuery{
"query4": {
Disabled: false,
},
},
},
{
QueryType: v3.QueryTypeClickHouseSQL,
ClickHouseQueries: map[string]*v3.ClickHouseQuery{
"query4": {
Disabled: true,
},
},
},
}
expectedResult := []bool{true, false, false, false, false, false, true, false, false, true}
for index, compositeQuery := range testCases {
expected := expectedResult[index]
actual := isAllQueriesDisabled(compositeQuery)
if actual != expected {
t.Errorf("Expected %v, but got %v", expected, actual)
}
}
}
func TestParseIntoRule(t *testing.T) {
tests := []struct {
name string
@@ -101,6 +179,9 @@ func TestParseIntoRule(t *testing.T) {
if rule.Frequency.Duration() != time.Minute {
t.Errorf("Expected default frequency '1m', got '%v'", rule.Frequency)
}
if rule.RuleCondition.CompositeQuery.BuilderQueries["A"].Expression != "A" {
t.Errorf("Expected expression 'A', got '%s'", rule.RuleCondition.CompositeQuery.BuilderQueries["A"].Expression)
}
},
},
{
@@ -236,7 +317,7 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
if spec.MatchType != rule.RuleCondition.MatchType {
t.Error("Expected MatchType to be copied from RuleCondition")
}
if spec.CompareOperator != rule.RuleCondition.CompareOperator {
if spec.CompareOp != rule.RuleCondition.CompareOp {
t.Error("Expected CompareOp to be copied from RuleCondition")
}
@@ -549,16 +630,9 @@ func TestParseIntoRuleThresholdGeneration(t *testing.T) {
}
// Test that threshold can evaluate properly
vector, err := threshold.Eval(&qbtypes.TimeSeries{
Values: []*qbtypes.TimeSeriesValue{{Value: 0.15, Timestamp: 1000}}, // 150ms in seconds
Labels: []*qbtypes.Label{
{
Key: telemetrytypes.TelemetryFieldKey{
Name: "test",
},
Value: "label",
},
},
vector, err := threshold.Eval(v3.Series{
Points: []v3.Point{{Value: 0.15, Timestamp: 1000}}, // 150ms in seconds
Labels: map[string]string{"test": "label"},
}, "", EvalData{})
if err != nil {
t.Fatalf("Unexpected error in shouldAlert: %v", err)
@@ -634,16 +708,9 @@ func TestParseIntoRuleMultipleThresholds(t *testing.T) {
}
// Test with a value that should trigger both WARNING and CRITICAL thresholds
vector, err := threshold.Eval(&qbtypes.TimeSeries{
Values: []*qbtypes.TimeSeriesValue{{Value: 95.0, Timestamp: 1000}}, // 95% CPU usage
Labels: []*qbtypes.Label{
{
Key: telemetrytypes.TelemetryFieldKey{
Name: "service",
},
Value: "test",
},
},
vector, err := threshold.Eval(v3.Series{
Points: []v3.Point{{Value: 95.0, Timestamp: 1000}}, // 95% CPU usage
Labels: map[string]string{"service": "test"},
}, "", EvalData{})
if err != nil {
t.Fatalf("Unexpected error in shouldAlert: %v", err)
@@ -651,16 +718,9 @@ func TestParseIntoRuleMultipleThresholds(t *testing.T) {
assert.Equal(t, 2, len(vector))
vector, err = threshold.Eval(&qbtypes.TimeSeries{
Values: []*qbtypes.TimeSeriesValue{{Value: 75.0, Timestamp: 1000}}, // 75% CPU usage
Labels: []*qbtypes.Label{
{
Key: telemetrytypes.TelemetryFieldKey{
Name: "service",
},
Value: "test",
},
},
vector, err = threshold.Eval(v3.Series{
Points: []v3.Point{{Value: 75.0, Timestamp: 1000}}, // 75% CPU usage
Labels: map[string]string{"service": "test"},
}, "", EvalData{})
if err != nil {
t.Fatalf("Unexpected error in shouldAlert: %v", err)
@@ -673,7 +733,7 @@ func TestAnomalyNegationEval(t *testing.T) {
tests := []struct {
name string
ruleJSON []byte
series *qbtypes.TimeSeries
series v3.Series
shouldAlert bool
expectedValue float64
}{
@@ -702,16 +762,9 @@ func TestAnomalyNegationEval(t *testing.T) {
"selectedQuery": "A"
}
}`),
series: &qbtypes.TimeSeries{
Labels: []*qbtypes.Label{
{
Key: telemetrytypes.TelemetryFieldKey{
Name: "host",
},
Value: "server1",
},
},
Values: []*qbtypes.TimeSeriesValue{
series: v3.Series{
Labels: map[string]string{"host": "server1"},
Points: []v3.Point{
{Timestamp: 1000, Value: -2.1}, // below & at least once, should alert
{Timestamp: 2000, Value: -2.3},
},
@@ -744,16 +797,9 @@ func TestAnomalyNegationEval(t *testing.T) {
"selectedQuery": "A"
}
}`), // below & at least once, no value below -2.0
series: &qbtypes.TimeSeries{
Labels: []*qbtypes.Label{
{
Key: telemetrytypes.TelemetryFieldKey{
Name: "host",
},
Value: "server1",
},
},
Values: []*qbtypes.TimeSeriesValue{
series: v3.Series{
Labels: map[string]string{"host": "server1"},
Points: []v3.Point{
{Timestamp: 1000, Value: -1.9},
{Timestamp: 2000, Value: -1.8},
},
@@ -785,16 +831,9 @@ func TestAnomalyNegationEval(t *testing.T) {
"selectedQuery": "A"
}
}`), // above & at least once, should alert
series: &qbtypes.TimeSeries{
Labels: []*qbtypes.Label{
{
Key: telemetrytypes.TelemetryFieldKey{
Name: "host",
},
Value: "server1",
},
},
Values: []*qbtypes.TimeSeriesValue{
series: v3.Series{
Labels: map[string]string{"host": "server1"},
Points: []v3.Point{
{Timestamp: 1000, Value: 2.1}, // above 2.0, should alert
{Timestamp: 2000, Value: 2.2},
},
@@ -827,16 +866,9 @@ func TestAnomalyNegationEval(t *testing.T) {
"selectedQuery": "A"
}
}`),
series: &qbtypes.TimeSeries{
Labels: []*qbtypes.Label{
{
Key: telemetrytypes.TelemetryFieldKey{
Name: "host",
},
Value: "server1",
},
},
Values: []*qbtypes.TimeSeriesValue{
series: v3.Series{
Labels: map[string]string{"host": "server1"},
Points: []v3.Point{
{Timestamp: 1000, Value: 1.1},
{Timestamp: 2000, Value: 1.2},
},
@@ -868,16 +900,9 @@ func TestAnomalyNegationEval(t *testing.T) {
"selectedQuery": "A"
}
}`), // below and all the times
series: &qbtypes.TimeSeries{
Labels: []*qbtypes.Label{
{
Key: telemetrytypes.TelemetryFieldKey{
Name: "host",
},
Value: "server1",
},
},
Values: []*qbtypes.TimeSeriesValue{
series: v3.Series{
Labels: map[string]string{"host": "server1"},
Points: []v3.Point{
{Timestamp: 1000, Value: -2.1}, // all below -2
{Timestamp: 2000, Value: -2.2},
{Timestamp: 3000, Value: -2.5},
@@ -911,16 +936,9 @@ func TestAnomalyNegationEval(t *testing.T) {
"selectedQuery": "A"
}
}`),
series: &qbtypes.TimeSeries{
Labels: []*qbtypes.Label{
{
Key: telemetrytypes.TelemetryFieldKey{
Name: "host",
},
Value: "server1",
},
},
Values: []*qbtypes.TimeSeriesValue{
series: v3.Series{
Labels: map[string]string{"host": "server1"},
Points: []v3.Point{
{Timestamp: 1000, Value: -3.0},
{Timestamp: 2000, Value: -1.0}, // above -2, breaks condition
{Timestamp: 3000, Value: -2.5},
@@ -953,16 +971,9 @@ func TestAnomalyNegationEval(t *testing.T) {
"selectedQuery": "A"
}
}`),
series: &qbtypes.TimeSeries{
Labels: []*qbtypes.Label{
{
Key: telemetrytypes.TelemetryFieldKey{
Name: "host",
},
Value: "server1",
},
},
Values: []*qbtypes.TimeSeriesValue{
series: v3.Series{
Labels: map[string]string{"host": "server1"},
Points: []v3.Point{
{Timestamp: 1000, Value: -8.0}, // abs(8) >= 7, alert
{Timestamp: 2000, Value: 5.0},
},
@@ -995,16 +1006,9 @@ func TestAnomalyNegationEval(t *testing.T) {
"selectedQuery": "A"
}
}`),
series: &qbtypes.TimeSeries{
Labels: []*qbtypes.Label{
{
Key: telemetrytypes.TelemetryFieldKey{
Name: "host",
},
Value: "server1",
},
},
Values: []*qbtypes.TimeSeriesValue{
series: v3.Series{
Labels: map[string]string{"host": "server1"},
Points: []v3.Point{
{Timestamp: 1000, Value: 80.0}, // below 90, should alert
{Timestamp: 2000, Value: 85.0},
},
@@ -1037,16 +1041,9 @@ func TestAnomalyNegationEval(t *testing.T) {
"selectedQuery": "A"
}
}`),
series: &qbtypes.TimeSeries{
Labels: []*qbtypes.Label{
{
Key: telemetrytypes.TelemetryFieldKey{
Name: "host",
},
Value: "server1",
},
},
Values: []*qbtypes.TimeSeriesValue{
series: v3.Series{
Labels: map[string]string{"host": "server1"},
Points: []v3.Point{
{Timestamp: 1000, Value: 60.0}, // below, should alert
{Timestamp: 2000, Value: 90.0},
},

View File

@@ -1,79 +0,0 @@
package ruletypes
import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/valuer"
)
type CompareOperator struct {
valuer.String
}
var (
ValueIsAbove = CompareOperator{valuer.NewString("1")}
ValueIsAboveLiteral = CompareOperator{valuer.NewString("above")}
ValueIsAboveSymbol = CompareOperator{valuer.NewString(">")}
ValueIsBelow = CompareOperator{valuer.NewString("2")}
ValueIsBelowLiteral = CompareOperator{valuer.NewString("below")}
ValueIsBelowSymbol = CompareOperator{valuer.NewString("<")}
ValueIsEq = CompareOperator{valuer.NewString("3")}
ValueIsEqLiteral = CompareOperator{valuer.NewString("equal")}
ValueIsEqLiteralShort = CompareOperator{valuer.NewString("eq")}
ValueIsEqSymbol = CompareOperator{valuer.NewString("=")}
ValueIsNotEq = CompareOperator{valuer.NewString("4")}
ValueIsNotEqLiteral = CompareOperator{valuer.NewString("not_equal")}
ValueIsNotEqLiteralShort = CompareOperator{valuer.NewString("not_eq")}
ValueIsNotEqSymbol = CompareOperator{valuer.NewString("!=")}
ValueAboveOrEq = CompareOperator{valuer.NewString("5")}
ValueAboveOrEqLiteral = CompareOperator{valuer.NewString("above_or_equal")}
ValueAboveOrEqLiteralShort = CompareOperator{valuer.NewString("above_or_eq")}
ValueAboveOrEqSymbol = CompareOperator{valuer.NewString(">=")}
ValueBelowOrEq = CompareOperator{valuer.NewString("6")}
ValueBelowOrEqLiteral = CompareOperator{valuer.NewString("below_or_equal")}
ValueBelowOrEqLiteralShort = CompareOperator{valuer.NewString("below_or_eq")}
ValueBelowOrEqSymbol = CompareOperator{valuer.NewString("<=")}
ValueOutsideBounds = CompareOperator{valuer.NewString("7")}
ValueOutsideBoundsLiteral = CompareOperator{valuer.NewString("outside_bounds")}
)
func (CompareOperator) Enum() []any {
return []any{
ValueIsAboveLiteral,
ValueIsBelowLiteral,
ValueIsEqLiteral,
ValueIsNotEqLiteral,
// ValueAboveOrEqLiteral,
// ValueBelowOrEqLiteral,
ValueOutsideBoundsLiteral,
}
}
func (c CompareOperator) Validate() error {
switch c {
case ValueIsAbove,
ValueIsAboveLiteral,
ValueIsAboveSymbol,
ValueIsBelow,
ValueIsBelowLiteral,
ValueIsBelowSymbol,
ValueIsEq,
ValueIsEqLiteral,
ValueIsEqLiteralShort,
ValueIsEqSymbol,
ValueIsNotEq,
ValueIsNotEqLiteral,
ValueIsNotEqLiteralShort,
ValueIsNotEqSymbol,
ValueOutsideBounds,
ValueOutsideBoundsLiteral:
return nil
default:
return errors.NewInvalidInputf(errors.CodeInvalidInput, "unknown comparison operator, known values are: ")
}
}

View File

@@ -8,5 +8,5 @@ const (
LabelThresholdName = "threshold.name"
LabelSeverityName = "severity"
LabelLastSeen = "lastSeen"
LabelRuleID = "ruleId"
LabelRuleId = "ruleId"
)

View File

@@ -1,316 +0,0 @@
package ruletypes
import (
"bytes"
"encoding/json"
"sort"
"strconv"
"strings"
"github.com/cespare/xxhash/v2"
)
const sep = '\xff'
// Well-known label names used by Prometheus components.
const (
MetricNameLabel = "__name__"
TemporalityLabel = "__temporality__"
AlertNameLabel = "alertname"
TestAlertLabel = "testalert"
NoDataLabel = "nodata"
// AlertStateLabel is the label name indicating the state of an alert.
AlertStateLabel = "alertstate"
AlertRuleIDLabel = "ruleId"
RuleSourceLabel = "ruleSource"
RuleThresholdLabel = "threshold"
AlertSummaryLabel = "summary"
AlertDescriptionLabel = "description"
)
// Label is a key/value pair of strings.
// TODO(srikanthccv): https://github.com/SigNoz/signoz/issues/9232?
type Label struct {
Name, Value string
}
// Labels is a sorted set of labels. Order has to be guaranteed upon
// instantiation.
type Labels []Label
func (ls Labels) Len() int { return len(ls) }
func (ls Labels) Swap(i, j int) { ls[i], ls[j] = ls[j], ls[i] }
func (ls Labels) Less(i, j int) bool { return ls[i].Name < ls[j].Name }
func (ls Labels) String() string {
var b bytes.Buffer
b.WriteByte('{')
for i, l := range ls {
if i > 0 {
b.WriteByte(',')
b.WriteByte(' ')
}
b.WriteString(l.Name)
b.WriteByte('=')
b.WriteString(strconv.Quote(l.Value))
}
b.WriteByte('}')
return b.String()
}
// MarshalJSON implements json.Marshaler.
func (ls Labels) MarshalJSON() ([]byte, error) {
return json.Marshal(ls.Map())
}
// UnmarshalJSON implements json.Unmarshaler.
func (ls *Labels) UnmarshalJSON(b []byte) error {
var m map[string]string
if err := json.Unmarshal(b, &m); err != nil {
return err
}
*ls = FromMap(m)
return nil
}
// Hash returns a hash value for the label set.
func (ls Labels) Hash() uint64 {
b := make([]byte, 0, 1024)
for _, v := range ls {
b = append(b, v.Name...)
b = append(b, sep)
b = append(b, v.Value...)
b = append(b, sep)
}
return xxhash.Sum64(b)
}
// HashForLabels returns a hash value for the labels matching the provided names.
func (ls Labels) HashForLabels(b []byte, names ...string) (uint64, []byte) {
var seps = []byte{'\xff'}
b = b[:0]
i, j := 0, 0
for i < len(ls) && j < len(names) {
if names[j] < ls[i].Name {
j++
} else if ls[i].Name < names[j] {
i++
} else {
b = append(b, ls[i].Name...)
b = append(b, seps[0])
b = append(b, ls[i].Value...)
b = append(b, seps[0])
i++
j++
}
}
return xxhash.Sum64(b), b
}
// HashWithoutLabels returns a hash value for all labels except those matching
// the provided names.
func (ls Labels) HashWithoutLabels(names ...string) uint64 {
b := make([]byte, 0, 1024)
Outer:
for _, v := range ls {
if v.Name == MetricNameLabel {
continue
}
for _, n := range names {
if v.Name == n {
continue Outer
}
}
b = append(b, v.Name...)
b = append(b, sep)
b = append(b, v.Value...)
b = append(b, sep)
}
return xxhash.Sum64(b)
}
// Copy returns a copy of the labels.
func (ls Labels) Copy() Labels {
res := make(Labels, len(ls))
copy(res, ls)
return res
}
// Get returns the value for the label with the given name.
// Returns an empty string if the label doesn't exist.
func (ls Labels) Get(name string) string {
for _, l := range ls {
if l.Name == name {
return l.Value
}
}
return ""
}
// Has returns true if the label with the given name is present.
func (ls Labels) Has(name string) bool {
for _, l := range ls {
if l.Name == name {
return true
}
}
return false
}
// Equal returns whether the two label sets are equal.
func Equal(ls, o Labels) bool {
if len(ls) != len(o) {
return false
}
for i, l := range ls {
if l.Name != o[i].Name || l.Value != o[i].Value {
return false
}
}
return true
}
// Map returns a string map of the labels.
func (ls Labels) Map() map[string]string {
m := make(map[string]string, len(ls))
for _, l := range ls {
m[l.Name] = l.Value
}
return m
}
// New returns a sorted Labels from the given labels.
// The caller has to guarantee that all label names are unique.
func New(ls ...Label) Labels {
set := make(Labels, 0, len(ls))
for _, l := range ls {
set = append(set, l)
}
sort.Sort(set)
return set
}
// FromMap returns new sorted Labels from the given map.
func FromMap(m map[string]string) Labels {
l := make([]Label, 0, len(m))
for k, v := range m {
l = append(l, Label{Name: k, Value: v})
}
return New(l...)
}
// FromStrings creates new labels from pairs of strings.
func FromStrings(ss ...string) Labels {
if len(ss)%2 != 0 {
panic("invalid number of strings")
}
var res Labels
for i := 0; i < len(ss); i += 2 {
res = append(res, Label{Name: ss[i], Value: ss[i+1]})
}
sort.Sort(res)
return res
}
// Compare compares the two label sets.
// The result will be 0 if a==b, <0 if a < b, and >0 if a > b.
func Compare(a, b Labels) int {
l := len(a)
if len(b) < l {
l = len(b)
}
for i := 0; i < l; i++ {
if d := strings.Compare(a[i].Name, b[i].Name); d != 0 {
return d
}
if d := strings.Compare(a[i].Value, b[i].Value); d != 0 {
return d
}
}
// If all labels so far were in common, the set with fewer labels comes first.
return len(a) - len(b)
}
// Builder allows modifiying Labels.
type Builder struct {
base Labels
del []string
add []Label
}
// NewBuilder returns a new LabelsBuilder
func NewBuilder(base ...Label) *Builder {
return &Builder{
base: base,
del: make([]string, 0, 5),
add: make([]Label, 0, 5),
}
}
// Del deletes the label of the given name.
func (b *Builder) Del(ns ...string) *Builder {
for _, n := range ns {
for i, a := range b.add {
if a.Name == n {
b.add = append(b.add[:i], b.add[i+1:]...)
}
}
b.del = append(b.del, n)
}
return b
}
// Set the name/value pair as a label.
func (b *Builder) Set(n, v string) *Builder {
for i, a := range b.add {
if a.Name == n {
b.add[i].Value = v
return b
}
}
b.add = append(b.add, Label{Name: n, Value: v})
return b
}
// Labels returns the labels from the builder. If no modifications
// were made, the original labels are returned.
func (b *Builder) Labels() Labels {
if len(b.del) == 0 && len(b.add) == 0 {
return b.base
}
// In the general case, labels are removed, modified or moved
// rather than added.
res := make(Labels, 0, len(b.base))
Outer:
for _, l := range b.base {
for _, n := range b.del {
if l.Name == n {
continue Outer
}
}
for _, la := range b.add {
if l.Name == la.Name {
continue Outer
}
}
res = append(res, l)
}
res = append(res, b.add...)
sort.Sort(res)
return res
}

View File

@@ -1,60 +0,0 @@
package ruletypes
import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/valuer"
)
type MatchType struct {
valuer.String
}
var (
AtleastOnce = MatchType{valuer.NewString("1")}
AtleastOnceLiteral = MatchType{valuer.NewString("atleast_once")}
AllTheTimes = MatchType{valuer.NewString("2")}
AllTheTimesLiteral = MatchType{valuer.NewString("all_the_times")}
OnAverage = MatchType{valuer.NewString("3")}
OnAverageLiteral = MatchType{valuer.NewString("on_average")}
OnAverageShort = MatchType{valuer.NewString("avg")}
InTotal = MatchType{valuer.NewString("4")}
InTotalLiteral = MatchType{valuer.NewString("in_total")}
InTotalShort = MatchType{valuer.NewString("sum")}
Last = MatchType{valuer.NewString("5")}
LastLiteral = MatchType{valuer.NewString("last")}
)
func (MatchType) Enum() []any {
return []any{
AtleastOnceLiteral,
AllTheTimesLiteral,
OnAverageLiteral,
InTotalLiteral,
LastLiteral,
}
}
func (m MatchType) Validate() error {
switch m {
case
AtleastOnce,
AtleastOnceLiteral,
AllTheTimes,
AllTheTimesLiteral,
OnAverage,
OnAverageLiteral,
OnAverageShort,
InTotal,
InTotalLiteral,
InTotalShort,
Last,
LastLiteral:
return nil
default:
return errors.NewInvalidInputf(errors.CodeInvalidInput, "unknown match type operator, known values are")
}
}

View File

@@ -4,6 +4,8 @@ import (
"encoding/json"
"fmt"
"strconv"
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
)
// common result format of query
@@ -13,7 +15,7 @@ type Vector []Sample
type Sample struct {
Point
Metric Labels
Metric labels.Labels
IsMissing bool
@@ -32,8 +34,8 @@ func (s Sample) String() string {
func (s Sample) MarshalJSON() ([]byte, error) {
v := struct {
M Labels `json:"metric"`
V Point `json:"value"`
M labels.Labels `json:"metric"`
V Point `json:"value"`
}{
M: s.Metric,
V: s.Point,
@@ -55,5 +57,5 @@ func (p Point) String() string {
// MarshalJSON implements json.Marshaler.
func (p Point) MarshalJSON() ([]byte, error) {
v := strconv.FormatFloat(p.V, 'f', -1, 64)
return json.Marshal([...]any{float64(p.T) / 1000, v})
return json.Marshal([...]interface{}{float64(p.T) / 1000, v})
}

View File

@@ -28,7 +28,7 @@ func NewStatsFromRules(rules []*Rule) map[string]any {
continue
}
key := "rule.type." + strings.TrimSuffix(strings.ToLower(gettableRule.RuleType.StringValue()), "_rule") + ".count"
key := "rule.type." + strings.TrimSuffix(strings.ToLower(string(gettableRule.RuleType)), "_rule") + ".count"
if _, ok := stats[key]; !ok {
stats[key] = int64(1)
} else {

View File

@@ -1,13 +0,0 @@
package ruletypes
import "github.com/SigNoz/signoz/pkg/valuer"
type RuleHealth struct {
valuer.String
}
var (
HealthUnknown = RuleHealth{valuer.NewString("unknown")}
HealthGood = RuleHealth{valuer.NewString("ok")}
HealthBad = RuleHealth{valuer.NewString("err")}
)

View File

@@ -1,36 +0,0 @@
package ruletypes
import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/valuer"
)
type RuleType struct {
valuer.String
}
var (
RuleTypeThreshold = RuleType{valuer.NewString("threshold_rule")}
RuleTypeProm = RuleType{valuer.NewString("promql_rule")}
RuleTypeAnomaly = RuleType{valuer.NewString("anomaly_rule")}
)
func (RuleType) Enum() []any {
return []any{
RuleTypeThreshold,
RuleTypeProm,
RuleTypeAnomaly,
}
}
func (r RuleType) Validate() error {
switch r {
case
RuleTypeThreshold,
RuleTypeProm,
RuleTypeAnomaly:
return nil
default:
return errors.NewInvalidInputf(errors.CodeInvalidInput, "unknown rule type, known values are")
}
}

View File

@@ -9,15 +9,16 @@ import (
"regexp"
"sort"
"strings"
"time"
"unicode"
"github.com/SigNoz/signoz/pkg/errors"
htmltpl "html/template"
texttpl "text/template"
html_template "html/template"
text_template "text/template"
"golang.org/x/text/cases"
"github.com/SigNoz/signoz/pkg/query-service/common"
"github.com/SigNoz/signoz/pkg/query-service/utils/times"
)
// this file contains all the methods and structs
@@ -51,25 +52,8 @@ func (q tmplQueryResultsByLabelSorter) Swap(i, j int) {
type TemplateExpander struct {
text string
name string
data any
funcMap texttpl.FuncMap
}
func NormalizeLabelName(name string) string {
// See https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
// Regular expression to match non-alphanumeric characters except underscores
reg := regexp.MustCompile(`[^a-zA-Z0-9_]`)
// Replace all non-alphanumeric characters except underscores with underscores
normalized := reg.ReplaceAllString(name, "_")
// If the first character is not a letter or an underscore, prepend an underscore
if len(normalized) > 0 && !unicode.IsLetter(rune(normalized[0])) && normalized[0] != '_' {
normalized = "_" + normalized
}
return normalized
data interface{}
funcMap text_template.FuncMap
}
// NewTemplateExpander returns a template expander ready to use.
@@ -77,14 +61,15 @@ func NewTemplateExpander(
ctx context.Context,
text string,
name string,
data any,
data interface{},
timestamp times.Time,
externalURL *url.URL,
) *TemplateExpander {
return &TemplateExpander{
text: text,
name: name,
data: data,
funcMap: texttpl.FuncMap{
funcMap: text_template.FuncMap{
"first": func(v tmplQueryResults) (*tmplQueryRecord, error) {
if len(v) > 0 {
return v[0], nil
@@ -100,8 +85,8 @@ func NewTemplateExpander(
"strvalue": func(s *tmplQueryRecord) string {
return s.Labels["__value__"]
},
"args": func(args ...any) map[string]any {
result := make(map[string]any)
"args": func(args ...interface{}) map[string]interface{} {
result := make(map[string]interface{})
for i, a := range args {
result[fmt.Sprintf("arg%d", i)] = a
}
@@ -111,8 +96,8 @@ func NewTemplateExpander(
re := regexp.MustCompile(pattern)
return re.ReplaceAllString(text, repl)
},
"safeHtml": func(text string) htmltpl.HTML {
return htmltpl.HTML(text)
"safeHtml": func(text string) html_template.HTML {
return html_template.HTML(text)
},
"match": regexp.MatchString,
"title": cases.Title,
@@ -206,7 +191,7 @@ func NewTemplateExpander(
if math.IsNaN(v) || math.IsInf(v, 0) {
return fmt.Sprintf("%.4g", v)
}
t := time.Unix(0, int64(v*1e9)).UTC()
t := times.TimeFromUnixNano(int64(v * 1e9)).Time().UTC()
return fmt.Sprint(t)
},
"pathPrefix": func() string {
@@ -220,7 +205,7 @@ func NewTemplateExpander(
}
// AlertTemplateData returns the interface to be used in expanding the template.
func AlertTemplateData(labels map[string]string, value string, threshold string) any {
func AlertTemplateData(labels map[string]string, value string, threshold string) interface{} {
// This exists here for backwards compatibility.
// The labels map passed in no longer contains the normalized labels.
// To continue supporting the old way of referencing labels, we need to
@@ -229,7 +214,7 @@ func AlertTemplateData(labels map[string]string, value string, threshold string)
newLabels := make(map[string]string)
for k, v := range labels {
newLabels[k] = v
newLabels[NormalizeLabelName(k)] = v
newLabels[common.NormalizeLabelName(k)] = v
}
return struct {
@@ -290,7 +275,7 @@ func (te *TemplateExpander) preprocessTemplate() {
// Funcs adds the functions in fm to the Expander's function map.
// Existing functions will be overwritten in case of conflict.
func (te TemplateExpander) Funcs(fm texttpl.FuncMap) {
func (te TemplateExpander) Funcs(fm text_template.FuncMap) {
for k, v := range fm {
te.funcMap[k] = v
}
@@ -312,7 +297,7 @@ func (te TemplateExpander) Expand() (result string, resultErr error) {
te.preprocessTemplate()
tmpl, err := texttpl.New(te.name).Funcs(te.funcMap).Option("missingkey=zero").Parse(te.text)
tmpl, err := text_template.New(te.name).Funcs(te.funcMap).Option("missingkey=zero").Parse(te.text)
if err != nil {
return "", errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "error parsing template %v", te.name)
}
@@ -336,13 +321,13 @@ func (te TemplateExpander) ExpandHTML(templateFiles []string) (result string, re
}
}()
tmpl := htmltpl.New(te.name).Funcs(htmltpl.FuncMap(te.funcMap))
tmpl := html_template.New(te.name).Funcs(html_template.FuncMap(te.funcMap))
tmpl.Option("missingkey=zero")
tmpl.Funcs(htmltpl.FuncMap{
"tmpl": func(name string, data any) (htmltpl.HTML, error) {
tmpl.Funcs(html_template.FuncMap{
"tmpl": func(name string, data interface{}) (html_template.HTML, error) {
var buffer bytes.Buffer
err := tmpl.ExecuteTemplate(&buffer, name, data)
return htmltpl.HTML(buffer.String()), err
return html_template.HTML(buffer.String()), err
},
})
tmpl, err := tmpl.Parse(te.text)
@@ -366,7 +351,7 @@ func (te TemplateExpander) ExpandHTML(templateFiles []string) (result string, re
// ParseTest parses the templates and returns the error if any.
func (te TemplateExpander) ParseTest() error {
te.preprocessTemplate()
_, err := texttpl.New(te.name).Funcs(te.funcMap).Option("missingkey=zero").Parse(te.text)
_, err := text_template.New(te.name).Funcs(te.funcMap).Option("missingkey=zero").Parse(te.text)
if err != nil {
return err
}

View File

@@ -3,14 +3,16 @@ package ruletypes
import (
"context"
"testing"
"time"
"github.com/SigNoz/signoz/pkg/query-service/utils/times"
"github.com/stretchr/testify/require"
)
func TestTemplateExpander(t *testing.T) {
defs := "{{$labels := .Labels}}{{$value := .Value}}{{$threshold := .Threshold}}"
data := AlertTemplateData(map[string]string{"service.name": "my-service"}, "100", "200")
expander := NewTemplateExpander(context.Background(), defs+"test $service.name", "test", data, nil)
expander := NewTemplateExpander(context.Background(), defs+"test $service.name", "test", data, times.Time(time.Now().Unix()), nil)
result, err := expander.Expand()
if err != nil {
t.Fatal(err)
@@ -21,7 +23,7 @@ func TestTemplateExpander(t *testing.T) {
func TestTemplateExpander_WithThreshold(t *testing.T) {
defs := "{{$labels := .Labels}}{{$value := .Value}}{{$threshold := .Threshold}}"
data := AlertTemplateData(map[string]string{"service.name": "my-service"}, "200", "100")
expander := NewTemplateExpander(context.Background(), defs+"test $service.name exceeds {{$threshold}} and observed at {{$value}}", "test", data, nil)
expander := NewTemplateExpander(context.Background(), defs+"test $service.name exceeds {{$threshold}} and observed at {{$value}}", "test", data, times.Time(time.Now().Unix()), nil)
result, err := expander.Expand()
if err != nil {
t.Fatal(err)
@@ -32,7 +34,7 @@ func TestTemplateExpander_WithThreshold(t *testing.T) {
func TestTemplateExpanderOldVariableSyntax(t *testing.T) {
defs := "{{$labels := .Labels}}{{$value := .Value}}{{$threshold := .Threshold}}"
data := AlertTemplateData(map[string]string{"service.name": "my-service"}, "200", "100")
expander := NewTemplateExpander(context.Background(), defs+"test {{.Labels.service_name}} exceeds {{$threshold}} and observed at {{$value}}", "test", data, nil)
expander := NewTemplateExpander(context.Background(), defs+"test {{.Labels.service_name}} exceeds {{$threshold}} and observed at {{$value}}", "test", data, times.Time(time.Now().Unix()), nil)
result, err := expander.Expand()
if err != nil {
t.Fatal(err)
@@ -43,7 +45,7 @@ func TestTemplateExpanderOldVariableSyntax(t *testing.T) {
func TestTemplateExpander_WithAlreadyNormalizedKey(t *testing.T) {
defs := "{{$labels := .Labels}}{{$value := .Value}}{{$threshold := .Threshold}}"
data := AlertTemplateData(map[string]string{"service_name": "my-service"}, "200", "100")
expander := NewTemplateExpander(context.Background(), defs+"test {{.Labels.service_name}} exceeds {{$threshold}} and observed at {{$value}}", "test", data, nil)
expander := NewTemplateExpander(context.Background(), defs+"test {{.Labels.service_name}} exceeds {{$threshold}} and observed at {{$value}}", "test", data, times.Time(time.Now().Unix()), nil)
result, err := expander.Expand()
if err != nil {
t.Fatal(err)
@@ -54,7 +56,7 @@ func TestTemplateExpander_WithAlreadyNormalizedKey(t *testing.T) {
func TestTemplateExpander_WithMissingKey(t *testing.T) {
defs := "{{$labels := .Labels}}{{$value := .Value}}{{$threshold := .Threshold}}"
data := AlertTemplateData(map[string]string{"service_name": "my-service"}, "200", "100")
expander := NewTemplateExpander(context.Background(), defs+"test {{.Labels.missing_key}} exceeds {{$threshold}} and observed at {{$value}}", "test", data, nil)
expander := NewTemplateExpander(context.Background(), defs+"test {{.Labels.missing_key}} exceeds {{$threshold}} and observed at {{$value}}", "test", data, times.Time(time.Now().Unix()), nil)
result, err := expander.Expand()
if err != nil {
t.Fatal(err)
@@ -65,7 +67,7 @@ func TestTemplateExpander_WithMissingKey(t *testing.T) {
func TestTemplateExpander_WithLablesDotSyntax(t *testing.T) {
defs := "{{$labels := .Labels}}{{$value := .Value}}{{$threshold := .Threshold}}"
data := AlertTemplateData(map[string]string{"service.name": "my-service"}, "200", "100")
expander := NewTemplateExpander(context.Background(), defs+"test {{.Labels.service.name}} exceeds {{$threshold}} and observed at {{$value}}", "test", data, nil)
expander := NewTemplateExpander(context.Background(), defs+"test {{.Labels.service.name}} exceeds {{$threshold}} and observed at {{$value}}", "test", data, times.Time(time.Now().Unix()), nil)
result, err := expander.Expand()
if err != nil {
t.Fatal(err)
@@ -76,7 +78,7 @@ func TestTemplateExpander_WithLablesDotSyntax(t *testing.T) {
func TestTemplateExpander_WithVariableSyntax(t *testing.T) {
defs := "{{$labels := .Labels}}{{$value := .Value}}{{$threshold := .Threshold}}"
data := AlertTemplateData(map[string]string{"service.name": "my-service"}, "200", "100")
expander := NewTemplateExpander(context.Background(), defs+"test {{$service.name}} exceeds {{$threshold}} and observed at {{$value}}", "test", data, nil)
expander := NewTemplateExpander(context.Background(), defs+"test {{$service.name}} exceeds {{$threshold}} and observed at {{$value}}", "test", data, times.Time(time.Now().Unix()), nil)
result, err := expander.Expand()
if err != nil {
t.Fatal(err)

View File

@@ -2,13 +2,13 @@ package ruletypes
import (
"encoding/json"
"fmt"
"math"
"sort"
"strings"
"github.com/SigNoz/signoz/pkg/errors"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
"github.com/SigNoz/signoz/pkg/units"
"github.com/SigNoz/signoz/pkg/valuer"
)
@@ -38,7 +38,7 @@ func (r *RuleThresholdData) UnmarshalJSON(data []byte) error {
case BasicThresholdKind:
var basicThresholds BasicRuleThresholds
if err := json.Unmarshal(raw["spec"], &basicThresholds); err != nil {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "failed to unmarshal rule threshold spec: %v", err)
return errors.NewInvalidInputf(errors.CodeInvalidInput, "failed to unmarshal rule threhsold spec: %v", err)
}
if err := basicThresholds.Validate(); err != nil {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid rule threshold spec: %v", err)
@@ -70,7 +70,7 @@ type EvalData struct {
SendUnmatched bool
}
// HasActiveAlert checks if the given sample fingerprint is active
// HasActiveAlert checks if the given sample figerprint is active
// as an alert.
func (eval EvalData) HasActiveAlert(sampleLabelFp uint64) bool {
if len(eval.ActiveAlerts) == 0 {
@@ -83,18 +83,18 @@ func (eval EvalData) HasActiveAlert(sampleLabelFp uint64) bool {
type RuleThreshold interface {
// Eval runs the given series through the threshold rules
// using the given EvalData and returns the matching series
Eval(series *qbtypes.TimeSeries, unit string, evalData EvalData) (Vector, error)
Eval(series v3.Series, unit string, evalData EvalData) (Vector, error)
GetRuleReceivers() []RuleReceivers
}
type BasicRuleThreshold struct {
Name string `json:"name"`
TargetValue *float64 `json:"target"`
TargetUnit string `json:"targetUnit"`
RecoveryTarget *float64 `json:"recoveryTarget"`
MatchType MatchType `json:"matchType"`
CompareOperator CompareOperator `json:"op"`
Channels []string `json:"channels"`
Name string `json:"name"`
TargetValue *float64 `json:"target"`
TargetUnit string `json:"targetUnit"`
RecoveryTarget *float64 `json:"recoveryTarget"`
MatchType MatchType `json:"matchType"`
CompareOp CompareOp `json:"op"`
Channels []string `json:"channels"`
}
type BasicRuleThresholds []BasicRuleThreshold
@@ -122,13 +122,7 @@ func (r BasicRuleThresholds) Validate() error {
return errors.Join(errs...)
}
func (r BasicRuleThresholds) Eval(s *qbtypes.TimeSeries, unit string, evalData EvalData) (Vector, error) {
series := &qbtypes.TimeSeries{
Labels: s.Labels,
Values: s.EvaluableValues(),
}
func (r BasicRuleThresholds) Eval(series v3.Series, unit string, evalData EvalData) (Vector, error) {
var resultVector Vector
thresholds := []BasicRuleThreshold(r)
sortThresholds(thresholds)
@@ -144,12 +138,13 @@ func (r BasicRuleThresholds) Eval(s *qbtypes.TimeSeries, unit string, evalData E
continue
} else if evalData.SendUnmatched {
// Sanitise the series points to remove any NaN or Inf values
if len(series.Values) == 0 {
series.Points = removeGroupinSetPoints(series)
if len(series.Points) == 0 {
continue
}
// prepare the sample with the first point of the series
smpl := Sample{
Point: Point{T: series.Values[0].Timestamp, V: series.Values[0].Value},
Point: Point{T: series.Points[0].Timestamp, V: series.Points[0].Value},
Metric: PrepareSampleLabelsForRule(series.Labels, threshold.Name),
Target: *threshold.TargetValue,
TargetUnit: threshold.TargetUnit,
@@ -186,10 +181,11 @@ func (r BasicRuleThresholds) Eval(s *qbtypes.TimeSeries, unit string, evalData E
func sortThresholds(thresholds []BasicRuleThreshold) {
sort.Slice(thresholds, func(i, j int) bool {
compareOp := thresholds[i].getCompareOp()
targetI := thresholds[i].target(thresholds[i].TargetUnit) //for sorting we dont need rule unit
targetJ := thresholds[j].target(thresholds[j].TargetUnit)
switch thresholds[i].CompareOperator {
switch compareOp {
case ValueIsAbove, ValueAboveOrEq, ValueOutsideBounds:
// For "above" operations, sort descending (higher values first)
return targetI > targetJ
@@ -224,6 +220,10 @@ func (b BasicRuleThreshold) recoveryTarget(ruleUnit string) float64 {
return b.convertToRuleUnit(*b.RecoveryTarget, ruleUnit)
}
func (b BasicRuleThreshold) getCompareOp() CompareOp {
return b.CompareOp
}
func (b BasicRuleThreshold) Validate() error {
var errs []error
if b.Name == "" {
@@ -234,87 +234,105 @@ func (b BasicRuleThreshold) Validate() error {
errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput, "target value cannot be nil"))
}
switch b.CompareOperator {
switch b.CompareOp {
case ValueIsAbove, ValueIsBelow, ValueIsEq, ValueIsNotEq, ValueAboveOrEq, ValueBelowOrEq, ValueOutsideBounds:
// valid compare operations
case CompareOpNone:
errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput, "compare operation cannot be none"))
default:
errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid compare operation: %s", b.CompareOperator.StringValue()))
errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid compare operation: %s", string(b.CompareOp)))
}
switch b.MatchType {
case AtleastOnce, AllTheTimes, OnAverage, InTotal, Last:
// valid match types
case MatchTypeNone:
errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput, "match type cannot be none"))
default:
errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid match type: %s", b.MatchType.StringValue()))
errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid match type: %s", string(b.MatchType)))
}
return errors.Join(errs...)
}
func (b BasicRuleThreshold) matchesRecoveryThreshold(series *qbtypes.TimeSeries, ruleUnit string) (Sample, bool) {
func (b BasicRuleThreshold) matchesRecoveryThreshold(series v3.Series, ruleUnit string) (Sample, bool) {
return b.shouldAlertWithTarget(series, b.recoveryTarget(ruleUnit))
}
func (b BasicRuleThreshold) shouldAlert(series *qbtypes.TimeSeries, ruleUnit string) (Sample, bool) {
func (b BasicRuleThreshold) shouldAlert(series v3.Series, ruleUnit string) (Sample, bool) {
return b.shouldAlertWithTarget(series, b.target(ruleUnit))
}
func removeGroupinSetPoints(series v3.Series) []v3.Point {
var result []v3.Point
for _, s := range series.Points {
if s.Timestamp >= 0 && !math.IsNaN(s.Value) && !math.IsInf(s.Value, 0) {
result = append(result, s)
}
}
return result
}
// PrepareSampleLabelsForRule prepares the labels for the sample to be used in the alerting.
// It accepts seriesLabels and thresholdName as input and returns the labels with the threshold name label added.
func PrepareSampleLabelsForRule(seriesLabels []*qbtypes.Label, thresholdName string) Labels {
lb := NewBuilder()
for _, label := range seriesLabels {
lb.Set(label.Key.Name, fmt.Sprint(label.Value))
func PrepareSampleLabelsForRule(seriesLabels map[string]string, thresholdName string) (lbls labels.Labels) {
lb := labels.NewBuilder(labels.Labels{})
for name, value := range seriesLabels {
lb.Set(name, value)
}
lb.Set(LabelThresholdName, thresholdName)
lb.Set(LabelSeverityName, strings.ToLower(thresholdName))
return lb.Labels()
}
func (b BasicRuleThreshold) shouldAlertWithTarget(series *qbtypes.TimeSeries, target float64) (Sample, bool) {
func (b BasicRuleThreshold) shouldAlertWithTarget(series v3.Series, target float64) (Sample, bool) {
var shouldAlert bool
var alertSmpl Sample
lbls := PrepareSampleLabelsForRule(series.Labels, b.Name)
series.Points = removeGroupinSetPoints(series)
// nothing to evaluate
if len(series.Values) == 0 {
if len(series.Points) == 0 {
return alertSmpl, false
}
switch b.MatchType {
case AtleastOnce:
// If any sample matches the condition, the rule is firing.
if b.CompareOperator == ValueIsAbove {
for _, smpl := range series.Values {
if b.CompareOp == ValueIsAbove {
for _, smpl := range series.Points {
if smpl.Value > target {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
shouldAlert = true
break
}
}
} else if b.CompareOperator == ValueIsBelow {
for _, smpl := range series.Values {
} else if b.CompareOp == ValueIsBelow {
for _, smpl := range series.Points {
if smpl.Value < target {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
shouldAlert = true
break
}
}
} else if b.CompareOperator == ValueIsEq {
for _, smpl := range series.Values {
} else if b.CompareOp == ValueIsEq {
for _, smpl := range series.Points {
if smpl.Value == target {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
shouldAlert = true
break
}
}
} else if b.CompareOperator == ValueIsNotEq {
for _, smpl := range series.Values {
} else if b.CompareOp == ValueIsNotEq {
for _, smpl := range series.Points {
if smpl.Value != target {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
shouldAlert = true
break
}
}
} else if b.CompareOperator == ValueOutsideBounds {
for _, smpl := range series.Values {
} else if b.CompareOp == ValueOutsideBounds {
for _, smpl := range series.Points {
if math.Abs(smpl.Value) >= target {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
shouldAlert = true
@@ -326,8 +344,8 @@ func (b BasicRuleThreshold) shouldAlertWithTarget(series *qbtypes.TimeSeries, ta
// If all samples match the condition, the rule is firing.
shouldAlert = true
alertSmpl = Sample{Point: Point{V: target}, Metric: lbls}
if b.CompareOperator == ValueIsAbove {
for _, smpl := range series.Values {
if b.CompareOp == ValueIsAbove {
for _, smpl := range series.Points {
if smpl.Value <= target {
shouldAlert = false
break
@@ -336,15 +354,15 @@ func (b BasicRuleThreshold) shouldAlertWithTarget(series *qbtypes.TimeSeries, ta
// use min value from the series
if shouldAlert {
var minValue = math.Inf(1)
for _, smpl := range series.Values {
for _, smpl := range series.Points {
if smpl.Value < minValue {
minValue = smpl.Value
}
}
alertSmpl = Sample{Point: Point{V: minValue}, Metric: lbls}
}
} else if b.CompareOperator == ValueIsBelow {
for _, smpl := range series.Values {
} else if b.CompareOp == ValueIsBelow {
for _, smpl := range series.Points {
if smpl.Value >= target {
shouldAlert = false
break
@@ -352,22 +370,22 @@ func (b BasicRuleThreshold) shouldAlertWithTarget(series *qbtypes.TimeSeries, ta
}
if shouldAlert {
var maxValue = math.Inf(-1)
for _, smpl := range series.Values {
for _, smpl := range series.Points {
if smpl.Value > maxValue {
maxValue = smpl.Value
}
}
alertSmpl = Sample{Point: Point{V: maxValue}, Metric: lbls}
}
} else if b.CompareOperator == ValueIsEq {
for _, smpl := range series.Values {
} else if b.CompareOp == ValueIsEq {
for _, smpl := range series.Points {
if smpl.Value != target {
shouldAlert = false
break
}
}
} else if b.CompareOperator == ValueIsNotEq {
for _, smpl := range series.Values {
} else if b.CompareOp == ValueIsNotEq {
for _, smpl := range series.Points {
if smpl.Value == target {
shouldAlert = false
break
@@ -375,15 +393,15 @@ func (b BasicRuleThreshold) shouldAlertWithTarget(series *qbtypes.TimeSeries, ta
}
// use any non-inf or nan value from the series
if shouldAlert {
for _, smpl := range series.Values {
for _, smpl := range series.Points {
if !math.IsInf(smpl.Value, 0) && !math.IsNaN(smpl.Value) {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
break
}
}
}
} else if b.CompareOperator == ValueOutsideBounds {
for _, smpl := range series.Values {
} else if b.CompareOp == ValueOutsideBounds {
for _, smpl := range series.Points {
if math.Abs(smpl.Value) < target {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
shouldAlert = false
@@ -394,7 +412,7 @@ func (b BasicRuleThreshold) shouldAlertWithTarget(series *qbtypes.TimeSeries, ta
case OnAverage:
// If the average of all samples matches the condition, the rule is firing.
var sum, count float64
for _, smpl := range series.Values {
for _, smpl := range series.Points {
if math.IsNaN(smpl.Value) || math.IsInf(smpl.Value, 0) {
continue
}
@@ -403,7 +421,7 @@ func (b BasicRuleThreshold) shouldAlertWithTarget(series *qbtypes.TimeSeries, ta
}
avg := sum / count
alertSmpl = Sample{Point: Point{V: avg}, Metric: lbls}
switch b.CompareOperator {
switch b.CompareOp {
case ValueIsAbove:
if avg > target {
shouldAlert = true
@@ -429,14 +447,14 @@ func (b BasicRuleThreshold) shouldAlertWithTarget(series *qbtypes.TimeSeries, ta
// If the sum of all samples matches the condition, the rule is firing.
var sum float64
for _, smpl := range series.Values {
for _, smpl := range series.Points {
if math.IsNaN(smpl.Value) || math.IsInf(smpl.Value, 0) {
continue
}
sum += smpl.Value
}
alertSmpl = Sample{Point: Point{V: sum}, Metric: lbls}
switch b.CompareOperator {
switch b.CompareOp {
case ValueIsAbove:
if sum > target {
shouldAlert = true
@@ -461,22 +479,22 @@ func (b BasicRuleThreshold) shouldAlertWithTarget(series *qbtypes.TimeSeries, ta
case Last:
// If the last sample matches the condition, the rule is firing.
shouldAlert = false
alertSmpl = Sample{Point: Point{V: series.Values[len(series.Values)-1].Value}, Metric: lbls}
switch b.CompareOperator {
alertSmpl = Sample{Point: Point{V: series.Points[len(series.Points)-1].Value}, Metric: lbls}
switch b.CompareOp {
case ValueIsAbove:
if series.Values[len(series.Values)-1].Value > target {
if series.Points[len(series.Points)-1].Value > target {
shouldAlert = true
}
case ValueIsBelow:
if series.Values[len(series.Values)-1].Value < target {
if series.Points[len(series.Points)-1].Value < target {
shouldAlert = true
}
case ValueIsEq:
if series.Values[len(series.Values)-1].Value == target {
if series.Points[len(series.Points)-1].Value == target {
shouldAlert = true
}
case ValueIsNotEq:
if series.Values[len(series.Values)-1].Value != target {
if series.Points[len(series.Points)-1].Value != target {
shouldAlert = true
}
}

View File

@@ -6,8 +6,7 @@ import (
"github.com/stretchr/testify/assert"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
)
func TestBasicRuleThresholdEval_UnitConversion(t *testing.T) {
@@ -16,29 +15,22 @@ func TestBasicRuleThresholdEval_UnitConversion(t *testing.T) {
tests := []struct {
name string
threshold BasicRuleThreshold
series *qbtypes.TimeSeries
series v3.Series
ruleUnit string
shouldAlert bool
}{
{
name: "milliseconds to seconds conversion - should alert",
threshold: BasicRuleThreshold{
Name: CriticalThresholdName,
TargetValue: &target, // 100ms
TargetUnit: "ms",
MatchType: AtleastOnce,
CompareOperator: ValueIsAbove,
Name: CriticalThresholdName,
TargetValue: &target, // 100ms
TargetUnit: "ms",
MatchType: AtleastOnce,
CompareOp: ValueIsAbove,
},
series: &qbtypes.TimeSeries{
Labels: []*qbtypes.Label{
{
Key: telemetrytypes.TelemetryFieldKey{
Name: "service",
},
Value: "test",
},
},
Values: []*qbtypes.TimeSeriesValue{
series: v3.Series{
Labels: map[string]string{"service": "test"},
Points: []v3.Point{
{Value: 0.15, Timestamp: 1000}, // 150ms in seconds
},
},
@@ -48,22 +40,15 @@ func TestBasicRuleThresholdEval_UnitConversion(t *testing.T) {
{
name: "milliseconds to seconds conversion - should not alert",
threshold: BasicRuleThreshold{
Name: WarningThresholdName,
TargetValue: &target, // 100ms
TargetUnit: "ms",
MatchType: AtleastOnce,
CompareOperator: ValueIsAbove,
Name: WarningThresholdName,
TargetValue: &target, // 100ms
TargetUnit: "ms",
MatchType: AtleastOnce,
CompareOp: ValueIsAbove,
},
series: &qbtypes.TimeSeries{
Labels: []*qbtypes.Label{
{
Key: telemetrytypes.TelemetryFieldKey{
Name: "service",
},
Value: "test",
},
},
Values: []*qbtypes.TimeSeriesValue{
series: v3.Series{
Labels: map[string]string{"service": "test"},
Points: []v3.Point{
{Value: 0.05, Timestamp: 1000}, // 50ms in seconds
},
},
@@ -73,22 +58,15 @@ func TestBasicRuleThresholdEval_UnitConversion(t *testing.T) {
{
name: "seconds to milliseconds conversion - should alert",
threshold: BasicRuleThreshold{
Name: CriticalThresholdName,
TargetValue: &target, // 100s
TargetUnit: "s",
MatchType: AtleastOnce,
CompareOperator: ValueIsAbove,
Name: CriticalThresholdName,
TargetValue: &target, // 100s
TargetUnit: "s",
MatchType: AtleastOnce,
CompareOp: ValueIsAbove,
},
series: &qbtypes.TimeSeries{
Labels: []*qbtypes.Label{
{
Key: telemetrytypes.TelemetryFieldKey{
Name: "service",
},
Value: "test",
},
},
Values: []*qbtypes.TimeSeriesValue{
series: v3.Series{
Labels: map[string]string{"service": "test"},
Points: []v3.Point{
{Value: 150000, Timestamp: 1000}, // 150000ms = 150s
},
},
@@ -99,22 +77,15 @@ func TestBasicRuleThresholdEval_UnitConversion(t *testing.T) {
{
name: "bytes to kibibytes conversion - should alert",
threshold: BasicRuleThreshold{
Name: InfoThresholdName,
TargetValue: &target, // 100 bytes
TargetUnit: "bytes",
MatchType: AtleastOnce,
CompareOperator: ValueIsAbove,
Name: InfoThresholdName,
TargetValue: &target, // 100 bytes
TargetUnit: "bytes",
MatchType: AtleastOnce,
CompareOp: ValueIsAbove,
},
series: &qbtypes.TimeSeries{
Labels: []*qbtypes.Label{
{
Key: telemetrytypes.TelemetryFieldKey{
Name: "service",
},
Value: "test",
},
},
Values: []*qbtypes.TimeSeriesValue{
series: v3.Series{
Labels: map[string]string{"service": "test"},
Points: []v3.Point{
{Value: 0.15, Timestamp: 1000}, // 0.15KiB ≈ 153.6 bytes
},
},
@@ -124,22 +95,15 @@ func TestBasicRuleThresholdEval_UnitConversion(t *testing.T) {
{
name: "kibibytes to mebibytes conversion - should alert",
threshold: BasicRuleThreshold{
Name: ErrorThresholdName,
TargetValue: &target, // 100KiB
TargetUnit: "kbytes",
MatchType: AtleastOnce,
CompareOperator: ValueIsAbove,
Name: ErrorThresholdName,
TargetValue: &target, // 100KiB
TargetUnit: "kbytes",
MatchType: AtleastOnce,
CompareOp: ValueIsAbove,
},
series: &qbtypes.TimeSeries{
Labels: []*qbtypes.Label{
{
Key: telemetrytypes.TelemetryFieldKey{
Name: "service",
},
Value: "test",
},
},
Values: []*qbtypes.TimeSeriesValue{
series: v3.Series{
Labels: map[string]string{"service": "test"},
Points: []v3.Point{
{Value: 0.15, Timestamp: 1000},
},
},
@@ -150,22 +114,15 @@ func TestBasicRuleThresholdEval_UnitConversion(t *testing.T) {
{
name: "milliseconds to seconds with ValueIsBelow - should alert",
threshold: BasicRuleThreshold{
Name: WarningThresholdName,
TargetValue: &target, // 100ms
TargetUnit: "ms",
MatchType: AtleastOnce,
CompareOperator: ValueIsBelow,
Name: WarningThresholdName,
TargetValue: &target, // 100ms
TargetUnit: "ms",
MatchType: AtleastOnce,
CompareOp: ValueIsBelow,
},
series: &qbtypes.TimeSeries{
Labels: []*qbtypes.Label{
{
Key: telemetrytypes.TelemetryFieldKey{
Name: "service",
},
Value: "test",
},
},
Values: []*qbtypes.TimeSeriesValue{
series: v3.Series{
Labels: map[string]string{"service": "test"},
Points: []v3.Point{
{Value: 0.05, Timestamp: 1000}, // 50ms in seconds
},
},
@@ -175,22 +132,15 @@ func TestBasicRuleThresholdEval_UnitConversion(t *testing.T) {
{
name: "milliseconds to seconds with OnAverage - should alert",
threshold: BasicRuleThreshold{
Name: CriticalThresholdName,
TargetValue: &target, // 100ms
TargetUnit: "ms",
MatchType: OnAverage,
CompareOperator: ValueIsAbove,
Name: CriticalThresholdName,
TargetValue: &target, // 100ms
TargetUnit: "ms",
MatchType: OnAverage,
CompareOp: ValueIsAbove,
},
series: &qbtypes.TimeSeries{
Labels: []*qbtypes.Label{
{
Key: telemetrytypes.TelemetryFieldKey{
Name: "service",
},
Value: "test",
},
},
Values: []*qbtypes.TimeSeriesValue{
series: v3.Series{
Labels: map[string]string{"service": "test"},
Points: []v3.Point{
{Value: 0.08, Timestamp: 1000}, // 80ms
{Value: 0.12, Timestamp: 2000}, // 120ms
{Value: 0.15, Timestamp: 3000}, // 150ms
@@ -202,22 +152,15 @@ func TestBasicRuleThresholdEval_UnitConversion(t *testing.T) {
{
name: "decimal megabytes to gigabytes with InTotal - should alert",
threshold: BasicRuleThreshold{
Name: WarningThresholdName,
TargetValue: &target, // 100MB
TargetUnit: "decmbytes",
MatchType: InTotal,
CompareOperator: ValueIsAbove,
Name: WarningThresholdName,
TargetValue: &target, // 100MB
TargetUnit: "decmbytes",
MatchType: InTotal,
CompareOp: ValueIsAbove,
},
series: &qbtypes.TimeSeries{
Labels: []*qbtypes.Label{
{
Key: telemetrytypes.TelemetryFieldKey{
Name: "service",
},
Value: "test",
},
},
Values: []*qbtypes.TimeSeriesValue{
series: v3.Series{
Labels: map[string]string{"service": "test"},
Points: []v3.Point{
{Value: 0.04, Timestamp: 1000}, // 40MB
{Value: 0.05, Timestamp: 2000}, // 50MB
{Value: 0.03, Timestamp: 3000}, // 30MB
@@ -229,22 +172,15 @@ func TestBasicRuleThresholdEval_UnitConversion(t *testing.T) {
{
name: "milliseconds to seconds with AllTheTimes - should alert",
threshold: BasicRuleThreshold{
Name: InfoThresholdName,
TargetValue: &target, // 100ms
TargetUnit: "ms",
MatchType: AllTheTimes,
CompareOperator: ValueIsAbove,
Name: InfoThresholdName,
TargetValue: &target, // 100ms
TargetUnit: "ms",
MatchType: AllTheTimes,
CompareOp: ValueIsAbove,
},
series: &qbtypes.TimeSeries{
Labels: []*qbtypes.Label{
{
Key: telemetrytypes.TelemetryFieldKey{
Name: "service",
},
Value: "test",
},
},
Values: []*qbtypes.TimeSeriesValue{
series: v3.Series{
Labels: map[string]string{"service": "test"},
Points: []v3.Point{
{Value: 0.11, Timestamp: 1000}, // 110ms
{Value: 0.12, Timestamp: 2000}, // 120ms
{Value: 0.15, Timestamp: 3000}, // 150ms
@@ -256,22 +192,15 @@ func TestBasicRuleThresholdEval_UnitConversion(t *testing.T) {
{
name: "kilobytes to megabytes with Last - should not alert",
threshold: BasicRuleThreshold{
Name: ErrorThresholdName,
TargetValue: &target, // 100kB
TargetUnit: "deckbytes",
MatchType: Last,
CompareOperator: ValueIsAbove,
Name: ErrorThresholdName,
TargetValue: &target, // 100kB
TargetUnit: "deckbytes",
MatchType: Last,
CompareOp: ValueIsAbove,
},
series: &qbtypes.TimeSeries{
Labels: []*qbtypes.Label{
{
Key: telemetrytypes.TelemetryFieldKey{
Name: "service",
},
Value: "test",
},
},
Values: []*qbtypes.TimeSeriesValue{
series: v3.Series{
Labels: map[string]string{"service": "test"},
Points: []v3.Point{
{Value: 0.15, Timestamp: 1000}, // 150kB
{Value: 0.05, Timestamp: 2000}, // 50kB (last value)
},
@@ -283,22 +212,15 @@ func TestBasicRuleThresholdEval_UnitConversion(t *testing.T) {
{
name: "bytes per second to kilobytes per second - should alert",
threshold: BasicRuleThreshold{
Name: CriticalThresholdName,
TargetValue: &target, // 100 bytes/s
TargetUnit: "Bps",
MatchType: AtleastOnce,
CompareOperator: ValueIsAbove,
Name: CriticalThresholdName,
TargetValue: &target, // 100 bytes/s
TargetUnit: "Bps",
MatchType: AtleastOnce,
CompareOp: ValueIsAbove,
},
series: &qbtypes.TimeSeries{
Labels: []*qbtypes.Label{
{
Key: telemetrytypes.TelemetryFieldKey{
Name: "service",
},
Value: "test",
},
},
Values: []*qbtypes.TimeSeriesValue{
series: v3.Series{
Labels: map[string]string{"service": "test"},
Points: []v3.Point{
{Value: 0.15, Timestamp: 1000},
},
},
@@ -309,22 +231,15 @@ func TestBasicRuleThresholdEval_UnitConversion(t *testing.T) {
{
name: "same unit - no conversion needed - should alert",
threshold: BasicRuleThreshold{
Name: InfoThresholdName,
TargetValue: &target, // 100ms
TargetUnit: "ms",
MatchType: AtleastOnce,
CompareOperator: ValueIsAbove,
Name: InfoThresholdName,
TargetValue: &target, // 100ms
TargetUnit: "ms",
MatchType: AtleastOnce,
CompareOp: ValueIsAbove,
},
series: &qbtypes.TimeSeries{
Labels: []*qbtypes.Label{
{
Key: telemetrytypes.TelemetryFieldKey{
Name: "service",
},
Value: "test",
},
},
Values: []*qbtypes.TimeSeriesValue{
series: v3.Series{
Labels: map[string]string{"service": "test"},
Points: []v3.Point{
{Value: 150, Timestamp: 1000}, // 150ms
},
},
@@ -335,22 +250,15 @@ func TestBasicRuleThresholdEval_UnitConversion(t *testing.T) {
{
name: "empty unit - no conversion - should alert",
threshold: BasicRuleThreshold{
Name: ErrorThresholdName,
TargetValue: &target, // 100 (unitless)
TargetUnit: "",
MatchType: AtleastOnce,
CompareOperator: ValueIsAbove,
Name: ErrorThresholdName,
TargetValue: &target, // 100 (unitless)
TargetUnit: "",
MatchType: AtleastOnce,
CompareOp: ValueIsAbove,
},
series: &qbtypes.TimeSeries{
Labels: []*qbtypes.Label{
{
Key: telemetrytypes.TelemetryFieldKey{
Name: "service",
},
Value: "test",
},
},
Values: []*qbtypes.TimeSeriesValue{
series: v3.Series{
Labels: map[string]string{"service": "test"},
Points: []v3.Point{
{Value: 150, Timestamp: 1000}, // 150 (unitless)
},
},
@@ -362,22 +270,15 @@ func TestBasicRuleThresholdEval_UnitConversion(t *testing.T) {
{
name: "bytes to Gibibytes - should alert",
threshold: BasicRuleThreshold{
Name: CriticalThresholdName,
TargetValue: &target, // 100 Gibibytes
TargetUnit: "GiBy",
MatchType: AtleastOnce,
CompareOperator: ValueIsBelow,
Name: CriticalThresholdName,
TargetValue: &target, // 100 Gibibytes
TargetUnit: "GiBy",
MatchType: AtleastOnce,
CompareOp: ValueIsBelow,
},
series: &qbtypes.TimeSeries{
Labels: []*qbtypes.Label{
{
Key: telemetrytypes.TelemetryFieldKey{
Name: "service",
},
Value: "test",
},
},
Values: []*qbtypes.TimeSeriesValue{
series: v3.Series{
Labels: map[string]string{"service": "test"},
Points: []v3.Point{
{Value: 70 * 1024 * 1024 * 1024, Timestamp: 1000}, // 70 Gibibytes
},
},
@@ -389,22 +290,15 @@ func TestBasicRuleThresholdEval_UnitConversion(t *testing.T) {
{
name: "bytes per second to MiB per second - should alert",
threshold: BasicRuleThreshold{
Name: CriticalThresholdName,
TargetValue: &target, // 100 MiB/s
TargetUnit: "MiBy/s",
MatchType: AtleastOnce,
CompareOperator: ValueIsBelow,
Name: CriticalThresholdName,
TargetValue: &target, // 100 MiB/s
TargetUnit: "MiBy/s",
MatchType: AtleastOnce,
CompareOp: ValueIsBelow,
},
series: &qbtypes.TimeSeries{
Labels: []*qbtypes.Label{
{
Key: telemetrytypes.TelemetryFieldKey{
Name: "service",
},
Value: "test",
},
},
Values: []*qbtypes.TimeSeriesValue{
series: v3.Series{
Labels: map[string]string{"service": "test"},
Points: []v3.Point{
{Value: 30 * 1024 * 1024, Timestamp: 1000}, // 30 MiB/s
},
},
@@ -452,20 +346,20 @@ func TestPrepareSampleLabelsForRule(t *testing.T) {
alertAllHashes := make(map[uint64]struct{})
thresholdName := "test"
for range 50_000 {
sampleLabels := []*qbtypes.Label{
{Key: telemetrytypes.TelemetryFieldKey{Name: "service"}, Value: "test"},
{Key: telemetrytypes.TelemetryFieldKey{Name: "env"}, Value: "prod"},
{Key: telemetrytypes.TelemetryFieldKey{Name: "tier"}, Value: "backend"},
{Key: telemetrytypes.TelemetryFieldKey{Name: "namespace"}, Value: "default"},
{Key: telemetrytypes.TelemetryFieldKey{Name: "pod"}, Value: "test-pod"},
{Key: telemetrytypes.TelemetryFieldKey{Name: "container"}, Value: "test-container"},
{Key: telemetrytypes.TelemetryFieldKey{Name: "node"}, Value: "test-node"},
{Key: telemetrytypes.TelemetryFieldKey{Name: "cluster"}, Value: "test-cluster"},
{Key: telemetrytypes.TelemetryFieldKey{Name: "region"}, Value: "test-region"},
{Key: telemetrytypes.TelemetryFieldKey{Name: "az"}, Value: "test-az"},
{Key: telemetrytypes.TelemetryFieldKey{Name: "hostname"}, Value: "test-hostname"},
{Key: telemetrytypes.TelemetryFieldKey{Name: "ip"}, Value: "192.168.1.1"},
{Key: telemetrytypes.TelemetryFieldKey{Name: "port"}, Value: "8080"},
sampleLabels := map[string]string{
"service": "test",
"env": "prod",
"tier": "backend",
"namespace": "default",
"pod": "test-pod",
"container": "test-container",
"node": "test-node",
"cluster": "test-cluster",
"region": "test-region",
"az": "test-az",
"hostname": "test-hostname",
"ip": "192.168.1.1",
"port": "8080",
}
lbls := PrepareSampleLabelsForRule(sampleLabels, thresholdName)
assert.True(t, lbls.Has(LabelThresholdName), "LabelThresholdName not found in labels")

View File

@@ -90,6 +90,11 @@ func (n *JSONAccessNode) FieldPath() string {
return n.Parent.Alias() + "." + key
}
// Returns true if the current node is a non-nested path
func (n *JSONAccessNode) IsNonNestedPath() bool {
return !strings.Contains(n.FieldPath(), ArraySep)
}
func (n *JSONAccessNode) BranchesInOrder() []JSONAccessBranchType {
return slices.SortedFunc(maps.Keys(n.Branches), func(a, b JSONAccessBranchType) int {
return strings.Compare(b.StringValue(), a.StringValue())

View File

@@ -8,65 +8,102 @@ package telemetrytypes
// This represents the type information available in the test JSON structure.
func TestJSONTypeSet() (map[string][]JSONDataType, MetadataStore) {
types := map[string][]JSONDataType{
"user.name": {String},
"user.permissions": {ArrayString},
"user.age": {Int64, String},
"user.height": {Float64},
"education": {ArrayJSON},
"education[].name": {String},
"education[].type": {String, Int64},
"education[].internal_type": {String},
"education[].metadata.location": {String},
"education[].parameters": {ArrayFloat64, ArrayDynamic},
"education[].duration": {String},
"education[].mode": {String},
"education[].year": {Int64},
"education[].field": {String},
"education[].awards": {ArrayDynamic, ArrayJSON},
"education[].awards[].name": {String},
"education[].awards[].rank": {Int64},
"education[].awards[].medal": {String},
"education[].awards[].type": {String},
"education[].awards[].semester": {Int64},
"education[].awards[].participated": {ArrayDynamic, ArrayJSON},
"education[].awards[].participated[].type": {String},
"education[].awards[].participated[].field": {String},
"education[].awards[].participated[].project_type": {String},
"education[].awards[].participated[].project_name": {String},
"education[].awards[].participated[].race_type": {String},
"education[].awards[].participated[].team_based": {Bool},
"education[].awards[].participated[].team_name": {String},
"education[].awards[].participated[].team": {ArrayJSON},
"education[].awards[].participated[].members": {ArrayString},
"education[].awards[].participated[].team[].name": {String},
"education[].awards[].participated[].team[].branch": {String},
"education[].awards[].participated[].team[].semester": {Int64},
"interests": {ArrayJSON},
"interests[].type": {String},
"interests[].entities": {ArrayJSON},
"interests[].entities.application_date": {String},
"interests[].entities[].reviews": {ArrayJSON},
"interests[].entities[].reviews[].given_by": {String},
"interests[].entities[].reviews[].remarks": {String},
"interests[].entities[].reviews[].weight": {Float64},
"interests[].entities[].reviews[].passed": {Bool},
"interests[].entities[].reviews[].type": {String},
"interests[].entities[].reviews[].analysis_type": {Int64},
"interests[].entities[].reviews[].entries": {ArrayJSON},
"interests[].entities[].reviews[].entries[].subject": {String},
"interests[].entities[].reviews[].entries[].status": {String},
"interests[].entities[].reviews[].entries[].metadata": {ArrayJSON},
"interests[].entities[].reviews[].entries[].metadata[].company": {String},
"interests[].entities[].reviews[].entries[].metadata[].experience": {Int64},
"interests[].entities[].reviews[].entries[].metadata[].unit": {String},
"interests[].entities[].reviews[].entries[].metadata[].positions": {ArrayJSON},
"interests[].entities[].reviews[].entries[].metadata[].positions[].name": {String},
"interests[].entities[].reviews[].entries[].metadata[].positions[].duration": {Int64, Float64},
"interests[].entities[].reviews[].entries[].metadata[].positions[].unit": {String},
"interests[].entities[].reviews[].entries[].metadata[].positions[].ratings": {ArrayInt64, ArrayString},
"message": {String},
"tags": {ArrayString},
// ── user (primitives) ─────────────────────────────────────────────
"user.name": {String},
"user.permissions": {ArrayString},
"user.age": {Int64, String}, // Int64/String ambiguity
"user.height": {Float64},
"user.active": {Bool}, // Bool — not IndexSupported
// Deeper non-array nesting (a.b.c — no array hops)
"user.address.zip": {Int64},
// ── education[] ───────────────────────────────────────────────────
// Pattern: x[].y
"education": {ArrayJSON},
"education[].name": {String},
"education[].type": {String, Int64},
"education[].year": {Int64},
"education[].scores": {ArrayInt64},
"education[].parameters": {ArrayFloat64, ArrayDynamic},
// Pattern: x[].y[]
"education[].awards": {ArrayDynamic, ArrayJSON},
// Pattern: x[].y[].z
"education[].awards[].name": {String},
"education[].awards[].type": {String},
"education[].awards[].semester": {Int64},
// Pattern: x[].y[].z[]
"education[].awards[].participated": {ArrayDynamic, ArrayJSON},
// Pattern: x[].y[].z[].w
"education[].awards[].participated[].members": {ArrayString},
// Pattern: x[].y[].z[].w[]
"education[].awards[].participated[].team": {ArrayJSON},
// Pattern: x[].y[].z[].w[].v
"education[].awards[].participated[].team[].branch": {String},
// ── interests[] ───────────────────────────────────────────────────
"interests": {ArrayJSON},
"interests[].entities": {ArrayJSON},
"interests[].entities[].reviews": {ArrayJSON},
"interests[].entities[].reviews[].entries": {ArrayJSON},
"interests[].entities[].reviews[].entries[].metadata": {ArrayJSON},
"interests[].entities[].reviews[].entries[].metadata[].positions": {ArrayJSON},
"interests[].entities[].reviews[].entries[].metadata[].positions[].name": {String},
"interests[].entities[].reviews[].entries[].metadata[].positions[].ratings": {ArrayInt64, ArrayString},
// ── http-events[] ─────────────────────────────────────────────────
"http-events": {ArrayJSON},
"http-events[].request-info.host": {String},
// ── top-level primitives ──────────────────────────────────────────
"message": {String},
"http-status": {Int64, String}, // hyphen in root key, ambiguous
// ── top-level nested objects (no array hops) ───────────────────────
"response.time-taken": {Float64}, // hyphen inside nested key
}
return types, nil
}
// TestIndexedPathEntry is a path + JSON type pair representing a field
// backed by a ClickHouse skip index in the test data.
//
// Only non-array paths with IndexSupported types (String, Int64, Float64)
// are valid entries — arrays and Bool cannot carry a skip index.
//
// The ColumnExpression for each entry is computed at test-setup time from
// the access plan, since it depends on the column name (e.g. body_v2)
// which is unknown to this package.
type TestIndexedPathEntry struct {
Path string
Type JSONDataType
}
// TestIndexedPaths lists path+type pairs from TestJSONTypeSet that are
// backed by a JSON data type index. Test setup uses this to populate
// key.Indexes after calling SetJSONAccessPlan.
//
// Intentionally excluded:
// - user.active → Bool, IndexSupported=false
var TestIndexedPaths = []TestIndexedPathEntry{
// user primitives
{Path: "user.name", Type: String},
// user.address — deeper non-array nesting
{Path: "user.address.zip", Type: Int64},
// root-level with special characters
{Path: "http-status", Type: Int64},
{Path: "http-status", Type: String},
// root-level nested objects (no array hops)
{Path: "response.time-taken", Type: Float64},
}