chore: use new querier for v5 versioned alerts (#8650)

This commit is contained in:
Srikanth Chekuri
2025-07-30 19:25:27 +05:30
committed by GitHub
parent 1ce150d4b0
commit 5c1f070d8f
16 changed files with 806 additions and 1240 deletions

View File

@@ -3,6 +3,7 @@ package app
import (
"context"
"fmt"
"log/slog"
"net"
"net/http"
_ "net/http/pprof" // http profiler
@@ -18,6 +19,7 @@ import (
"github.com/SigNoz/signoz/pkg/http/middleware"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/signoz"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/telemetrystore"
@@ -104,6 +106,8 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz, jwt *authtypes.JWT)
signoz.TelemetryStore,
signoz.Prometheus,
signoz.Modules.OrgGetter,
signoz.Querier,
signoz.Instrumentation.Logger(),
)
if err != nil {
@@ -421,6 +425,8 @@ func makeRulesManager(
telemetryStore telemetrystore.TelemetryStore,
prometheus prometheus.Prometheus,
orgGetter organization.Getter,
querier querier.Querier,
logger *slog.Logger,
) (*baserules.Manager, error) {
// create manager opts
managerOpts := &baserules.ManagerOptions{
@@ -429,6 +435,8 @@ func makeRulesManager(
Context: context.Background(),
Logger: zap.L(),
Reader: ch,
Querier: querier,
SLogger: logger,
Cache: cache,
EvalDelay: baseconst.GetEvalDelay(),
PrepareTaskFunc: rules.PrepareTaskFunc,

View File

@@ -4,17 +4,17 @@ import (
"context"
"encoding/json"
"fmt"
"log/slog"
"math"
"strings"
"sync"
"time"
"go.uber.org/zap"
"github.com/SigNoz/signoz/ee/query-service/anomaly"
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/query-service/common"
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/transition"
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/valuer"
@@ -30,6 +30,11 @@ import (
baserules "github.com/SigNoz/signoz/pkg/query-service/rules"
querierV5 "github.com/SigNoz/signoz/pkg/querier"
anomalyV2 "github.com/SigNoz/signoz/ee/anomaly"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
yaml "gopkg.in/yaml.v2"
)
@@ -47,7 +52,14 @@ type AnomalyRule struct {
// querierV2 is used for alerts created after the introduction of new metrics query builder
querierV2 interfaces.Querier
provider anomaly.Provider
// querierV5 is used for alerts migrated after the introduction of new query builder
querierV5 querierV5.Querier
provider anomaly.Provider
providerV2 anomalyV2.Provider
version string
logger *slog.Logger
seasonality anomaly.Seasonality
}
@@ -57,11 +69,15 @@ func NewAnomalyRule(
orgID valuer.UUID,
p *ruletypes.PostableRule,
reader interfaces.Reader,
querierV5 querierV5.Querier,
logger *slog.Logger,
cache cache.Cache,
opts ...baserules.RuleOption,
) (*AnomalyRule, error) {
zap.L().Info("creating new AnomalyRule", zap.String("id", id), zap.Any("opts", opts))
logger.Info("creating new AnomalyRule", "rule_id", id)
opts = append(opts, baserules.WithLogger(logger))
if p.RuleCondition.CompareOp == ruletypes.ValueIsBelow {
target := -1 * *p.RuleCondition.Target
@@ -88,7 +104,7 @@ func NewAnomalyRule(
t.seasonality = anomaly.SeasonalityDaily
}
zap.L().Info("using seasonality", zap.String("seasonality", t.seasonality.String()))
logger.Info("using seasonality", "seasonality", t.seasonality.String())
querierOptsV2 := querierV2.QuerierOptions{
Reader: reader,
@@ -117,6 +133,27 @@ func NewAnomalyRule(
anomaly.WithReader[*anomaly.WeeklyProvider](reader),
)
}
if t.seasonality == anomaly.SeasonalityHourly {
t.providerV2 = anomalyV2.NewHourlyProvider(
anomalyV2.WithQuerier[*anomalyV2.HourlyProvider](querierV5),
anomalyV2.WithLogger[*anomalyV2.HourlyProvider](logger),
)
} else if t.seasonality == anomaly.SeasonalityDaily {
t.providerV2 = anomalyV2.NewDailyProvider(
anomalyV2.WithQuerier[*anomalyV2.DailyProvider](querierV5),
anomalyV2.WithLogger[*anomalyV2.DailyProvider](logger),
)
} else if t.seasonality == anomaly.SeasonalityWeekly {
t.providerV2 = anomalyV2.NewWeeklyProvider(
anomalyV2.WithQuerier[*anomalyV2.WeeklyProvider](querierV5),
anomalyV2.WithLogger[*anomalyV2.WeeklyProvider](logger),
)
}
t.querierV5 = querierV5
t.version = p.Version
t.logger = logger
return &t, nil
}
@@ -124,9 +161,11 @@ func (r *AnomalyRule) Type() ruletypes.RuleType {
return RuleTypeAnomaly
}
func (r *AnomalyRule) prepareQueryRange(ts time.Time) (*v3.QueryRangeParamsV3, error) {
func (r *AnomalyRule) prepareQueryRange(ctx context.Context, ts time.Time) (*v3.QueryRangeParamsV3, error) {
zap.L().Info("prepareQueryRange", zap.Int64("ts", ts.UnixMilli()), zap.Int64("evalWindow", r.EvalWindow().Milliseconds()), zap.Int64("evalDelay", r.EvalDelay().Milliseconds()))
r.logger.InfoContext(
ctx, "prepare query range request v4", "ts", ts.UnixMilli(), "eval_window", r.EvalWindow().Milliseconds(), "eval_delay", r.EvalDelay().Milliseconds(),
)
start := ts.Add(-time.Duration(r.EvalWindow())).UnixMilli()
end := ts.UnixMilli()
@@ -156,13 +195,33 @@ func (r *AnomalyRule) prepareQueryRange(ts time.Time) (*v3.QueryRangeParamsV3, e
}, nil
}
func (r *AnomalyRule) prepareQueryRangeV5(ctx context.Context, ts time.Time) (*qbtypes.QueryRangeRequest, error) {
r.logger.InfoContext(ctx, "prepare query range request v5", "ts", ts.UnixMilli(), "eval_window", r.EvalWindow().Milliseconds(), "eval_delay", r.EvalDelay().Milliseconds())
startTs, endTs := r.Timestamps(ts)
start, end := startTs.UnixMilli(), endTs.UnixMilli()
req := &qbtypes.QueryRangeRequest{
Start: uint64(start),
End: uint64(end),
RequestType: qbtypes.RequestTypeTimeSeries,
CompositeQuery: qbtypes.CompositeQuery{
Queries: make([]qbtypes.QueryEnvelope, 0),
},
NoCache: true,
}
copy(r.Condition().CompositeQuery.Queries, req.CompositeQuery.Queries)
return req, nil
}
func (r *AnomalyRule) GetSelectedQuery() string {
return r.Condition().GetSelectedQueryName()
}
func (r *AnomalyRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID, ts time.Time) (ruletypes.Vector, error) {
params, err := r.prepareQueryRange(ts)
params, err := r.prepareQueryRange(ctx, ts)
if err != nil {
return nil, err
}
@@ -190,7 +249,50 @@ func (r *AnomalyRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID, t
var resultVector ruletypes.Vector
scoresJSON, _ := json.Marshal(queryResult.AnomalyScores)
zap.L().Info("anomaly scores", zap.String("scores", string(scoresJSON)))
r.logger.InfoContext(ctx, "anomaly scores", "scores", string(scoresJSON))
for _, series := range queryResult.AnomalyScores {
smpl, shouldAlert := r.ShouldAlert(*series)
if shouldAlert {
resultVector = append(resultVector, smpl)
}
}
return resultVector, nil
}
func (r *AnomalyRule) buildAndRunQueryV5(ctx context.Context, orgID valuer.UUID, ts time.Time) (ruletypes.Vector, error) {
params, err := r.prepareQueryRangeV5(ctx, ts)
if err != nil {
return nil, err
}
anomalies, err := r.providerV2.GetAnomalies(ctx, orgID, &anomalyV2.AnomaliesRequest{
Params: *params,
Seasonality: anomalyV2.Seasonality{String: valuer.NewString(r.seasonality.String())},
})
if err != nil {
return nil, err
}
var qbResult *qbtypes.TimeSeriesData
for _, result := range anomalies.Results {
if result.QueryName == r.GetSelectedQuery() {
qbResult = result
break
}
}
if qbResult == nil {
r.logger.WarnContext(ctx, "nil qb result", "ts", ts.UnixMilli())
}
queryResult := transition.ConvertV5TimeSeriesDataToV4Result(qbResult)
var resultVector ruletypes.Vector
scoresJSON, _ := json.Marshal(queryResult.AnomalyScores)
r.logger.InfoContext(ctx, "anomaly scores", "scores", string(scoresJSON))
for _, series := range queryResult.AnomalyScores {
smpl, shouldAlert := r.ShouldAlert(*series)
@@ -206,8 +308,17 @@ func (r *AnomalyRule) Eval(ctx context.Context, ts time.Time) (interface{}, erro
prevState := r.State()
valueFormatter := formatter.FromUnit(r.Unit())
res, err := r.buildAndRunQuery(ctx, r.OrgID(), ts)
var res ruletypes.Vector
var err error
if r.version == "v5" {
r.logger.InfoContext(ctx, "running v5 query")
res, err = r.buildAndRunQueryV5(ctx, r.OrgID(), ts)
} else {
r.logger.InfoContext(ctx, "running v4 query")
res, err = r.buildAndRunQuery(ctx, r.OrgID(), ts)
}
if err != nil {
return nil, err
}
@@ -226,7 +337,7 @@ func (r *AnomalyRule) Eval(ctx context.Context, ts time.Time) (interface{}, erro
value := valueFormatter.Format(smpl.V, r.Unit())
threshold := valueFormatter.Format(r.TargetVal(), r.Unit())
zap.L().Debug("Alert template data for rule", zap.String("name", r.Name()), zap.String("formatter", valueFormatter.Name()), zap.String("value", value), zap.String("threshold", threshold))
r.logger.DebugContext(ctx, "Alert template data for rule", "rule_name", r.Name(), "formatter", valueFormatter.Name(), "value", value, "threshold", threshold)
tmplData := ruletypes.AlertTemplateData(l, value, threshold)
// Inject some convenience variables that are easier to remember for users
@@ -247,7 +358,7 @@ func (r *AnomalyRule) Eval(ctx context.Context, ts time.Time) (interface{}, erro
result, err := tmpl.Expand()
if err != nil {
result = fmt.Sprintf("<error expanding template: %s>", err)
zap.L().Error("Expanding alert template failed", zap.Error(err), zap.Any("data", tmplData))
r.logger.ErrorContext(ctx, "Expanding alert template failed", "error", err, "data", tmplData, "rule_name", r.Name())
}
return result
}
@@ -276,7 +387,7 @@ func (r *AnomalyRule) Eval(ctx context.Context, ts time.Time) (interface{}, erro
resultFPs[h] = struct{}{}
if _, ok := alerts[h]; ok {
zap.L().Error("the alert query returns duplicate records", zap.String("ruleid", r.ID()), zap.Any("alert", alerts[h]))
r.logger.ErrorContext(ctx, "the alert query returns duplicate records", "rule_id", r.ID(), "alert", alerts[h])
err = fmt.Errorf("duplicate alert found, vector contains metrics with the same labelset after applying alert labels")
return nil, err
}
@@ -294,7 +405,7 @@ func (r *AnomalyRule) Eval(ctx context.Context, ts time.Time) (interface{}, erro
}
}
zap.L().Info("number of alerts found", zap.String("name", r.Name()), zap.Int("count", len(alerts)))
r.logger.InfoContext(ctx, "number of alerts found", "rule_name", r.Name(), "alerts_count", len(alerts))
// alerts[h] is ready, add or update active list now
for h, a := range alerts {
@@ -317,7 +428,7 @@ func (r *AnomalyRule) Eval(ctx context.Context, ts time.Time) (interface{}, erro
for fp, a := range r.Active {
labelsJSON, err := json.Marshal(a.QueryResultLables)
if err != nil {
zap.L().Error("error marshaling labels", zap.Error(err), zap.Any("labels", a.Labels))
r.logger.ErrorContext(ctx, "error marshaling labels", "error", err, "labels", a.Labels)
}
if _, ok := resultFPs[fp]; !ok {
// If the alert was previously firing, keep it around for a given

View File

@@ -27,6 +27,8 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
opts.OrgID,
opts.Rule,
opts.Reader,
opts.Querier,
opts.SLogger,
baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay),
baserules.WithSQLStore(opts.SQLStore),
)
@@ -47,7 +49,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
ruleId,
opts.OrgID,
opts.Rule,
opts.Logger,
opts.SLogger,
opts.Reader,
opts.ManagerOpts.Prometheus,
baserules.WithSQLStore(opts.SQLStore),
@@ -69,6 +71,8 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
opts.OrgID,
opts.Rule,
opts.Reader,
opts.Querier,
opts.SLogger,
opts.Cache,
baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay),
baserules.WithSQLStore(opts.SQLStore),
@@ -126,6 +130,8 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
opts.OrgID,
parsedRule,
opts.Reader,
opts.Querier,
opts.SLogger,
baserules.WithSendAlways(),
baserules.WithSendUnmatched(),
baserules.WithSQLStore(opts.SQLStore),
@@ -143,7 +149,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
alertname,
opts.OrgID,
parsedRule,
opts.Logger,
opts.SLogger,
opts.Reader,
opts.ManagerOpts.Prometheus,
baserules.WithSendAlways(),
@@ -162,6 +168,8 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
opts.OrgID,
parsedRule,
opts.Reader,
opts.Querier,
opts.SLogger,
opts.Cache,
baserules.WithSendAlways(),
baserules.WithSendUnmatched(),