Compare commits

..

5 Commits

Author SHA1 Message Date
Jatinderjit Singh
6faa86baf0 Use logger.With for rule.id 2026-04-07 23:43:04 +05:30
Jatinderjit Singh
02a5bf832f Ignore SQLite shm and wal files 2026-04-07 23:43:04 +05:30
Jatinderjit Singh
2010dd6df7 Remove unnecessary nesting of notifyFunc 2026-04-07 23:43:04 +05:30
Jatinderjit Singh
41529219e4 Log stacktrace for panics 2026-04-07 23:43:04 +05:30
Jatinderjit Singh
16a9df5244 Log receiverName 2026-04-07 23:43:04 +05:30
12 changed files with 164 additions and 281 deletions

2
.gitignore vendored
View File

@@ -51,6 +51,8 @@ ee/query-service/tests/test-deploy/data/
# local data
*.backup
*.db
*.db-shm
*.db-wal
**/db
/deploy/docker/clickhouse-setup/data/
/deploy/docker-swarm/clickhouse-setup/data/

View File

@@ -49,7 +49,6 @@ func NewAnomalyRule(
logger *slog.Logger,
opts ...baserules.RuleOption,
) (*AnomalyRule, error) {
logger.Info("creating new AnomalyRule", slog.String("rule.id", id))
opts = append(opts, baserules.WithLogger(logger))
@@ -59,44 +58,44 @@ func NewAnomalyRule(
return nil, err
}
t := AnomalyRule{
r := AnomalyRule{
BaseRule: baseRule,
querier: querier,
version: p.Version,
logger: logger.With(slog.String("rule.id", id)),
}
switch p.RuleCondition.Seasonality {
case ruletypes.SeasonalityHourly:
t.seasonality = anomaly.SeasonalityHourly
r.seasonality = anomaly.SeasonalityHourly
case ruletypes.SeasonalityDaily:
t.seasonality = anomaly.SeasonalityDaily
r.seasonality = anomaly.SeasonalityDaily
case ruletypes.SeasonalityWeekly:
t.seasonality = anomaly.SeasonalityWeekly
r.seasonality = anomaly.SeasonalityWeekly
default:
t.seasonality = anomaly.SeasonalityDaily
r.seasonality = anomaly.SeasonalityDaily
}
logger.Info("using seasonality", slog.String("rule.id", id), slog.String("rule.seasonality", t.seasonality.StringValue()))
r.logger.Info("using seasonality", slog.String("rule.seasonality", r.seasonality.StringValue()))
if t.seasonality == anomaly.SeasonalityHourly {
t.provider = anomaly.NewHourlyProvider(
if r.seasonality == anomaly.SeasonalityHourly {
r.provider = anomaly.NewHourlyProvider(
anomaly.WithQuerier[*anomaly.HourlyProvider](querier),
anomaly.WithLogger[*anomaly.HourlyProvider](logger),
anomaly.WithLogger[*anomaly.HourlyProvider](r.logger),
)
} else if t.seasonality == anomaly.SeasonalityDaily {
t.provider = anomaly.NewDailyProvider(
} else if r.seasonality == anomaly.SeasonalityDaily {
r.provider = anomaly.NewDailyProvider(
anomaly.WithQuerier[*anomaly.DailyProvider](querier),
anomaly.WithLogger[*anomaly.DailyProvider](logger),
anomaly.WithLogger[*anomaly.DailyProvider](r.logger),
)
} else if t.seasonality == anomaly.SeasonalityWeekly {
t.provider = anomaly.NewWeeklyProvider(
} else if r.seasonality == anomaly.SeasonalityWeekly {
r.provider = anomaly.NewWeeklyProvider(
anomaly.WithQuerier[*anomaly.WeeklyProvider](querier),
anomaly.WithLogger[*anomaly.WeeklyProvider](logger),
anomaly.WithLogger[*anomaly.WeeklyProvider](r.logger),
)
}
t.querier = querier
t.version = p.Version
t.logger = logger
return &t, nil
return &r, nil
}
func (r *AnomalyRule) Type() ruletypes.RuleType {
@@ -104,8 +103,11 @@ func (r *AnomalyRule) Type() ruletypes.RuleType {
}
func (r *AnomalyRule) prepareQueryRange(ctx context.Context, ts time.Time) *qbtypes.QueryRangeRequest {
r.logger.InfoContext(ctx, "prepare query range request", slog.String("rule.id", r.ID()), slog.Int64("ts", ts.UnixMilli()), slog.Int64("eval.window_ms", r.EvalWindow().Milliseconds()), slog.Int64("eval.delay_ms", r.EvalDelay().Milliseconds()))
r.logger.InfoContext(
ctx, "prepare query range request", slog.Int64("ts", ts.UnixMilli()),
slog.Int64("eval.window_ms", r.EvalWindow().Milliseconds()),
slog.Int64("eval.delay_ms", r.EvalDelay().Milliseconds()),
)
startTs, endTs := r.Timestamps(ts)
start, end := startTs.UnixMilli(), endTs.UnixMilli()
@@ -145,7 +147,7 @@ func (r *AnomalyRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID, t
}
if queryResult == nil {
r.logger.WarnContext(ctx, "nil qb result", slog.String("rule.id", r.ID()), slog.Int64("ts", ts.UnixMilli()))
r.logger.WarnContext(ctx, "nil qb result", slog.Int64("ts", ts.UnixMilli()))
return ruletypes.Vector{}, nil
}
@@ -156,7 +158,7 @@ func (r *AnomalyRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID, t
if missingDataAlert := r.HandleMissingDataAlert(ctx, ts, hasData); missingDataAlert != nil {
return ruletypes.Vector{*missingDataAlert}, nil
} else if !hasData {
r.logger.WarnContext(ctx, "no anomaly result", slog.String("rule.id", r.ID()))
r.logger.WarnContext(ctx, "no anomaly result")
return ruletypes.Vector{}, nil
}
@@ -164,7 +166,7 @@ func (r *AnomalyRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID, t
scoresJSON, _ := json.Marshal(queryResult.Aggregations[0].AnomalyScores)
// TODO(srikanthccv): this could be noisy but we do this to answer false alert requests
r.logger.InfoContext(ctx, "anomaly scores", slog.String("rule.id", r.ID()), slog.String("anomaly.scores", string(scoresJSON)))
r.logger.InfoContext(ctx, "anomaly scores", slog.String("anomaly.scores", string(scoresJSON)))
// Filter out new series if newGroupEvalDelay is configured
seriesToProcess := queryResult.Aggregations[0].AnomalyScores
@@ -172,7 +174,7 @@ func (r *AnomalyRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID, t
filteredSeries, filterErr := r.BaseRule.FilterNewSeries(ctx, ts, seriesToProcess)
// In case of error we log the error and continue with the original series
if filterErr != nil {
r.logger.ErrorContext(ctx, "error filtering new series", slog.String("rule.id", r.ID()), errors.Attr(filterErr))
r.logger.ErrorContext(ctx, "error filtering new series", errors.Attr(filterErr))
} else {
seriesToProcess = filteredSeries
}
@@ -180,7 +182,11 @@ func (r *AnomalyRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID, t
for _, series := range seriesToProcess {
if !r.Condition().ShouldEval(series) {
r.logger.InfoContext(ctx, "not enough data points to evaluate series, skipping", slog.String("rule.id", r.ID()), slog.Int("series.num_points", len(series.Values)), slog.Int("series.required_points", r.Condition().RequiredNumPoints))
r.logger.InfoContext(
ctx, "not enough data points to evaluate series, skipping",
slog.Int("series.num_points", len(series.Values)),
slog.Int("series.required_points", r.Condition().RequiredNumPoints),
)
continue
}
results, err := r.Threshold.Eval(series, r.Unit(), ruletypes.EvalData{
@@ -204,7 +210,7 @@ func (r *AnomalyRule) Eval(ctx context.Context, ts time.Time) (int, error) {
var res ruletypes.Vector
var err error
r.logger.InfoContext(ctx, "running query", slog.String("rule.id", r.ID()))
r.logger.InfoContext(ctx, "running query")
res, err = r.buildAndRunQuery(ctx, r.OrgID(), ts)
if err != nil {
@@ -230,7 +236,10 @@ func (r *AnomalyRule) Eval(ctx context.Context, ts time.Time) (int, error) {
}
value := valueFormatter.Format(smpl.V, r.Unit())
threshold := valueFormatter.Format(smpl.Target, smpl.TargetUnit)
r.logger.DebugContext(ctx, "alert template data for rule", slog.String("rule.id", r.ID()), slog.String("formatter.name", valueFormatter.Name()), slog.String("alert.value", value), slog.String("alert.threshold", threshold))
r.logger.DebugContext(
ctx, "alert template data for rule", slog.String("formatter.name", valueFormatter.Name()),
slog.String("alert.value", value), slog.String("alert.threshold", threshold),
)
tmplData := ruletypes.AlertTemplateData(l, value, threshold)
// Inject some convenience variables that are easier to remember for users
@@ -250,7 +259,7 @@ func (r *AnomalyRule) Eval(ctx context.Context, ts time.Time) (int, error) {
result, err := tmpl.Expand()
if err != nil {
result = fmt.Sprintf("<error expanding template: %s>", err)
r.logger.ErrorContext(ctx, "expanding alert template failed", slog.String("rule.id", r.ID()), errors.Attr(err), slog.Any("alert.template_data", tmplData))
r.logger.ErrorContext(ctx, "expanding alert template failed", errors.Attr(err), slog.Any("alert.template_data", tmplData))
}
return result
}
@@ -280,7 +289,7 @@ func (r *AnomalyRule) Eval(ctx context.Context, ts time.Time) (int, error) {
resultFPs[h] = struct{}{}
if _, ok := alerts[h]; ok {
r.logger.ErrorContext(ctx, "the alert query returns duplicate records", slog.String("rule.id", r.ID()), slog.Any("alert", alerts[h]))
r.logger.ErrorContext(ctx, "the alert query returns duplicate records", slog.Any("alert", alerts[h]))
err = errors.NewInternalf(errors.CodeInternal, "duplicate alert found, vector contains metrics with the same labelset after applying alert labels")
return 0, err
}
@@ -299,7 +308,7 @@ func (r *AnomalyRule) Eval(ctx context.Context, ts time.Time) (int, error) {
}
}
r.logger.InfoContext(ctx, "number of alerts found", slog.String("rule.id", r.ID()), slog.Int("alert.count", len(alerts)))
r.logger.InfoContext(ctx, "number of alerts found", slog.Int("alert.count", len(alerts)))
// alerts[h] is ready, add or update active list now
for h, a := range alerts {
// Check whether we already have alerting state for the identifying label set.
@@ -326,7 +335,7 @@ func (r *AnomalyRule) Eval(ctx context.Context, ts time.Time) (int, error) {
for fp, a := range r.Active {
labelsJSON, err := json.Marshal(a.QueryResultLabels)
if err != nil {
r.logger.ErrorContext(ctx, "error marshaling labels", slog.String("rule.id", r.ID()), errors.Attr(err), slog.Any("alert.labels", a.Labels))
r.logger.ErrorContext(ctx, "error marshaling labels", errors.Attr(err), slog.Any("alert.labels", a.Labels))
}
if _, ok := resultFPs[fp]; !ok {
// If the alert was previously firing, keep it around for a given
@@ -381,7 +390,7 @@ func (r *AnomalyRule) Eval(ctx context.Context, ts time.Time) (int, error) {
state = ruletypes.StateFiring
}
a.State = state
r.logger.DebugContext(ctx, "converting alert state", slog.String("rule.id", r.ID()), slog.Any("alert.state", state))
r.logger.DebugContext(ctx, "converting alert state", slog.Any("alert.state", state))
itemsToAdd = append(itemsToAdd, rulestatehistorytypes.RuleStateHistory{
RuleID: r.ID(),
RuleName: r.Name(),

View File

@@ -328,7 +328,12 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *dispatch.Route) {
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
_, _, err := d.stage.Exec(ctx, d.logger, alerts...)
if err != nil {
logger := d.logger.With(slog.Int("num_alerts", len(alerts)), errors.Attr(err))
receiverName, _ := notify.ReceiverName(ctx)
logger := d.logger.With(
slog.String("receiver", receiverName),
slog.Int("num_alerts", len(alerts)),
errors.Attr(err),
)
if errors.Is(ctx.Err(), context.Canceled) {
// It is expected for the context to be canceled on
// configuration reload or shutdown. In this case, the

View File

@@ -383,15 +383,15 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
spec.Aggregations[i].Temporality = temp
}
}
if spec.Aggregations[i].Temporality == metrictypes.Unknown {
missingMetrics = append(missingMetrics, spec.Aggregations[i].MetricName)
continue
}
if spec.Aggregations[i].MetricName != "" && spec.Aggregations[i].Type == metrictypes.UnspecifiedType {
if foundMetricType, ok := metricTypes[spec.Aggregations[i].MetricName]; ok && foundMetricType != metrictypes.UnspecifiedType {
spec.Aggregations[i].Type = foundMetricType
}
}
if spec.Aggregations[i].Type == metrictypes.UnspecifiedType {
missingMetrics = append(missingMetrics, spec.Aggregations[i].MetricName)
continue
}
presentAggregations = append(presentAggregations, spec.Aggregations[i])
}
if len(presentAggregations) == 0 {

View File

@@ -1,145 +0,0 @@
package querier
import (
"context"
"testing"
"time"
cmock "github.com/srikanthccv/ClickHouse-go-mock"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type queryMatcherAny struct{}
func (m *queryMatcherAny) Match(string, string) error { return nil }
// mockMetricStmtBuilder implements qbtypes.StatementBuilder[qbtypes.MetricAggregation]
// and returns a fixed query string so the mock ClickHouse can match it.
type mockMetricStmtBuilder struct{}
func (m *mockMetricStmtBuilder) Build(_ context.Context, _, _ uint64, _ qbtypes.RequestType, _ qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], _ map[string]qbtypes.VariableItem) (*qbtypes.Statement, error) {
return &qbtypes.Statement{
Query: "SELECT ts, value FROM signoz_metrics",
Args: nil,
}, nil
}
func TestQueryRange_MetricTypeMissing(t *testing.T) {
// When a metric has UnspecifiedType and is not found in the metadata store,
// the querier should return a not-found error, even if the request provides a temporality
providerSettings := instrumentationtest.New().ToProviderSettings()
metadataStore := telemetrytypestest.NewMockMetadataStore()
q := New(
providerSettings,
nil, // telemetryStore
metadataStore,
nil, // prometheus
nil, // traceStmtBuilder
nil, // logStmtBuilder
nil, // metricStmtBuilder
nil, // meterStmtBuilder
nil, // traceOperatorStmtBuilder
nil, // bucketCache
)
req := &qbtypes.QueryRangeRequest{
Start: uint64(time.Now().Add(-5 * time.Minute).UnixMilli()),
End: uint64(time.Now().UnixMilli()),
RequestType: qbtypes.RequestTypeTimeSeries,
CompositeQuery: qbtypes.CompositeQuery{
Queries: []qbtypes.QueryEnvelope{{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "A",
StepInterval: qbtypes.Step{Duration: time.Minute},
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "unknown_metric",
Temporality: metrictypes.Cumulative,
TimeAggregation: metrictypes.TimeAggregationRate,
SpaceAggregation: metrictypes.SpaceAggregationSum,
},
},
Signal: telemetrytypes.SignalMetrics,
},
}},
},
}
_, err := q.QueryRange(context.Background(), valuer.GenerateUUID(), req)
require.Error(t, err)
assert.Contains(t, err.Error(), "could not find the metric unknown_metric")
}
func TestQueryRange_MetricTypeFromStore(t *testing.T) {
// When a metric has UnspecifiedType but the metadata store returns a valid type,
// the metric should not be treated as missing.
providerSettings := instrumentationtest.New().ToProviderSettings()
metadataStore := telemetrytypestest.NewMockMetadataStore()
metadataStore.TypeMap["my_metric"] = metrictypes.SumType
metadataStore.TemporalityMap["my_metric"] = metrictypes.Cumulative
telemetryStore := telemetrystoretest.New(telemetrystore.Config{}, &queryMatcherAny{})
cols := []cmock.ColumnType{
{Name: "ts", Type: "DateTime"},
{Name: "value", Type: "Float64"},
}
rows := cmock.NewRows(cols, [][]any{
{time.Now(), float64(42)},
})
telemetryStore.Mock().
ExpectQuery("SELECT any").
WillReturnRows(rows)
q := New(
providerSettings,
telemetryStore,
metadataStore,
nil, // prometheus
nil, // traceStmtBuilder
nil, // logStmtBuilder
&mockMetricStmtBuilder{}, // metricStmtBuilder
nil, // meterStmtBuilder
nil, // traceOperatorStmtBuilder
nil, // bucketCache
)
req := &qbtypes.QueryRangeRequest{
Start: uint64(time.Now().Add(-5 * time.Minute).UnixMilli()),
End: uint64(time.Now().UnixMilli()),
RequestType: qbtypes.RequestTypeTimeSeries,
CompositeQuery: qbtypes.CompositeQuery{
Queries: []qbtypes.QueryEnvelope{{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Name: "A",
StepInterval: qbtypes.Step{Duration: time.Minute},
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "my_metric",
TimeAggregation: metrictypes.TimeAggregationRate,
SpaceAggregation: metrictypes.SpaceAggregationSum,
},
},
Signal: telemetrytypes.SignalMetrics,
},
}},
},
}
resp, err := q.QueryRange(context.Background(), valuer.GenerateUUID(), req)
require.NoError(t, err)
require.NotNil(t, resp)
}

View File

@@ -110,7 +110,7 @@ func WithEvalDelay(dur valuer.TextDuration) RuleOption {
func WithLogger(logger *slog.Logger) RuleOption {
return func(r *BaseRule) {
r.logger = logger
r.logger = logger.With(slog.String("rule.id", r.id))
}
}
@@ -248,7 +248,7 @@ func (r *BaseRule) SelectedQuery(ctx context.Context) string {
if r.ruleCondition.SelectedQuery != "" {
return r.ruleCondition.SelectedQuery
}
r.logger.WarnContext(ctx, "missing selected query", slog.String("rule.id", r.ID()))
r.logger.WarnContext(ctx, "missing selected query")
return r.ruleCondition.SelectedQueryName()
}
@@ -386,7 +386,7 @@ func (r *BaseRule) RecordRuleStateHistory(ctx context.Context, prevState, curren
}
if err := r.ruleStateHistoryModule.RecordRuleStateHistory(ctx, r.ID(), r.handledRestart, itemsToAdd); err != nil {
r.logger.ErrorContext(ctx, "error while recording rule state history", slog.String("rule.id", r.ID()), errors.Attr(err), slog.Any("items_to_add", itemsToAdd))
r.logger.ErrorContext(ctx, "error while recording rule state history", errors.Attr(err), slog.Any("items_to_add", itemsToAdd))
return err
}
r.handledRestart = true
@@ -580,7 +580,12 @@ func (r *BaseRule) FilterNewSeries(ctx context.Context, ts time.Time, series []*
// Check if first_seen + delay has passed
if maxFirstSeen+newGroupEvalDelayMs > evalTimeMs {
// Still within grace period, skip this series
r.logger.InfoContext(ctx, "skipping new series", slog.String("rule.id", r.ID()), slog.Int("series.index", i), slog.Int64("series.max_first_seen", maxFirstSeen), slog.Int64("eval.time_ms", evalTimeMs), slog.Int64("eval.delay_ms", newGroupEvalDelayMs), slog.Any("series.labels", series[i].Labels))
r.logger.InfoContext(
ctx, "skipping new series",
slog.Int("series.index", i), slog.Int64("series.max_first_seen", maxFirstSeen),
slog.Int64("eval.time_ms", evalTimeMs), slog.Int64("eval.delay_ms", newGroupEvalDelayMs),
slog.Any("series.labels", series[i].Labels),
)
continue
}
@@ -590,7 +595,11 @@ func (r *BaseRule) FilterNewSeries(ctx context.Context, ts time.Time, series []*
skippedCount := len(series) - len(filteredSeries)
if skippedCount > 0 {
r.logger.InfoContext(ctx, "filtered new series", slog.String("rule.id", r.ID()), slog.Int("series.skipped_count", skippedCount), slog.Int("series.total_count", len(series)), slog.Int64("eval.delay_ms", newGroupEvalDelayMs))
r.logger.InfoContext(
ctx, "filtered new series",
slog.Int("series.skipped_count", skippedCount), slog.Int("series.total_count", len(series)),
slog.Int64("eval.delay_ms", newGroupEvalDelayMs),
)
}
return filteredSeries, nil
@@ -611,7 +620,7 @@ func (r *BaseRule) HandleMissingDataAlert(ctx context.Context, ts time.Time, has
return nil
}
r.logger.InfoContext(ctx, "no data found for rule condition", slog.String("rule.id", r.ID()))
r.logger.InfoContext(ctx, "no data found for rule condition")
lbls := ruletypes.NewBuilder()
if !r.lastTimestampWithDatapoints.IsZero() {
lbls.Set(ruletypes.LabelLastSeen, r.lastTimestampWithDatapoints.Format(ruletypes.AlertTimeFormat))

View File

@@ -438,7 +438,7 @@ func (m *Manager) editTask(_ context.Context, orgID valuer.UUID, rule *ruletypes
Logger: m.opts.Logger,
Cache: m.cache,
ManagerOpts: m.opts,
NotifyFunc: m.prepareNotifyFunc(),
NotifyFunc: m.notifyFunc,
SQLStore: m.sqlstore,
OrgID: orgID,
})
@@ -651,7 +651,7 @@ func (m *Manager) addTask(_ context.Context, orgID valuer.UUID, rule *ruletypes.
Logger: m.opts.Logger,
Cache: m.cache,
ManagerOpts: m.opts,
NotifyFunc: m.prepareNotifyFunc(),
NotifyFunc: m.notifyFunc,
SQLStore: m.sqlstore,
OrgID: orgID,
})
@@ -754,68 +754,63 @@ func (m *Manager) TriggeredAlerts() []*ruletypes.NamedAlert {
// NotifyFunc sends notifications about a set of alerts generated by the given expression.
type NotifyFunc func(ctx context.Context, orgID string, expr string, alerts ...*ruletypes.Alert)
// prepareNotifyFunc implements the NotifyFunc for a Notifier.
func (m *Manager) prepareNotifyFunc() NotifyFunc {
return func(ctx context.Context, orgID string, expr string, alerts ...*ruletypes.Alert) {
var res []*alertmanagertypes.PostableAlert
// notifyFunc implements the NotifyFunc for a Notifier.
func (m *Manager) notifyFunc(ctx context.Context, orgID string, expr string, alerts ...*ruletypes.Alert) {
var res []*alertmanagertypes.PostableAlert
for _, alert := range alerts {
generatorURL := alert.GeneratorURL
for _, alert := range alerts {
generatorURL := alert.GeneratorURL
a := &alertmanagertypes.PostableAlert{
Annotations: alert.Annotations.Map(),
StartsAt: strfmt.DateTime(alert.FiredAt),
Alert: alertmanagertypes.AlertModel{
Labels: alert.Labels.Map(),
GeneratorURL: strfmt.URI(generatorURL),
},
}
if !alert.ResolvedAt.IsZero() {
a.EndsAt = strfmt.DateTime(alert.ResolvedAt)
} else {
a.EndsAt = strfmt.DateTime(alert.ValidUntil)
}
res = append(res, a)
a := &alertmanagertypes.PostableAlert{
Annotations: alert.Annotations.Map(),
StartsAt: strfmt.DateTime(alert.FiredAt),
Alert: alertmanagertypes.AlertModel{
Labels: alert.Labels.Map(),
GeneratorURL: strfmt.URI(generatorURL),
},
}
if !alert.ResolvedAt.IsZero() {
a.EndsAt = strfmt.DateTime(alert.ResolvedAt)
} else {
a.EndsAt = strfmt.DateTime(alert.ValidUntil)
}
if len(alerts) > 0 {
m.alertmanager.PutAlerts(ctx, orgID, res)
}
res = append(res, a)
}
if len(alerts) > 0 {
m.alertmanager.PutAlerts(ctx, orgID, res)
}
}
func (m *Manager) prepareTestNotifyFunc() NotifyFunc {
return func(ctx context.Context, orgID string, expr string, alerts ...*ruletypes.Alert) {
if len(alerts) == 0 {
return
}
ruleID := alerts[0].Labels.Map()[ruletypes.AlertRuleIDLabel]
receiverMap := make(map[*alertmanagertypes.PostableAlert][]string)
for _, alert := range alerts {
generatorURL := alert.GeneratorURL
func (m *Manager) testNotifyFunc(ctx context.Context, orgID string, expr string, alerts ...*ruletypes.Alert) {
if len(alerts) == 0 {
return
}
ruleID := alerts[0].Labels.Map()[ruletypes.AlertRuleIDLabel]
receiverMap := make(map[*alertmanagertypes.PostableAlert][]string)
for _, alert := range alerts {
generatorURL := alert.GeneratorURL
a := &alertmanagertypes.PostableAlert{}
a.Annotations = alert.Annotations.Map()
a.StartsAt = strfmt.DateTime(alert.FiredAt)
labelsMap := alert.Labels.Map()
labelsMap[ruletypes.TestAlertLabel] = "true"
a.Alert = alertmanagertypes.AlertModel{
Labels: labelsMap,
GeneratorURL: strfmt.URI(generatorURL),
}
if !alert.ResolvedAt.IsZero() {
a.EndsAt = strfmt.DateTime(alert.ResolvedAt)
} else {
a.EndsAt = strfmt.DateTime(alert.ValidUntil)
}
receiverMap[a] = alert.Receivers
a := &alertmanagertypes.PostableAlert{}
a.Annotations = alert.Annotations.Map()
a.StartsAt = strfmt.DateTime(alert.FiredAt)
labelsMap := alert.Labels.Map()
labelsMap[ruletypes.TestAlertLabel] = "true"
a.Alert = alertmanagertypes.AlertModel{
Labels: labelsMap,
GeneratorURL: strfmt.URI(generatorURL),
}
err := m.alertmanager.TestAlert(ctx, orgID, ruleID, receiverMap)
if err != nil {
m.logger.ErrorContext(ctx, "failed to send test notification", errors.Attr(err))
return
if !alert.ResolvedAt.IsZero() {
a.EndsAt = strfmt.DateTime(alert.ResolvedAt)
} else {
a.EndsAt = strfmt.DateTime(alert.ValidUntil)
}
receiverMap[a] = alert.Receivers
}
err := m.alertmanager.TestAlert(ctx, orgID, ruleID, receiverMap)
if err != nil {
m.logger.ErrorContext(ctx, "failed to send test notification", errors.Attr(err))
}
}
@@ -1041,7 +1036,7 @@ func (m *Manager) TestNotification(ctx context.Context, orgID valuer.UUID, ruleS
Logger: m.opts.Logger,
Cache: m.cache,
ManagerOpts: m.opts,
NotifyFunc: m.prepareTestNotifyFunc(),
NotifyFunc: m.testNotifyFunc,
SQLStore: m.sqlstore,
OrgID: orgID,
})

View File

@@ -48,14 +48,13 @@ func NewPromRule(
version: postableRule.Version,
prometheus: prometheus,
}
p.logger = logger
query, err := p.getPqlQuery(context.Background())
if err != nil {
// can not generate a valid prom QL query
return nil, err
}
logger.Info("creating new prom rule", slog.String("rule.id", id), slog.String("rule.query", query))
p.logger.Info("creating new prom rule", slog.String("rule.query", query))
return &p, nil
}
@@ -97,7 +96,7 @@ func (r *PromRule) buildAndRunQuery(ctx context.Context, ts time.Time) (ruletype
if err != nil {
return nil, err
}
r.logger.InfoContext(ctx, "evaluating promql query", slog.String("rule.id", r.ID()), slog.String("rule.query", q))
r.logger.InfoContext(ctx, "evaluating promql query", slog.String("rule.query", q))
res, err := r.RunAlertQuery(ctx, q, start, end, interval)
if err != nil {
r.SetHealth(ruletypes.HealthBad)
@@ -117,7 +116,7 @@ func (r *PromRule) buildAndRunQuery(ctx context.Context, ts time.Time) (ruletype
filteredSeries, filterErr := r.BaseRule.FilterNewSeries(ctx, ts, matrixToProcess)
// In case of error we log the error and continue with the original series
if filterErr != nil {
r.logger.ErrorContext(ctx, "error filtering new series", slog.String("rule.id", r.ID()), errors.Attr(filterErr))
r.logger.ErrorContext(ctx, "error filtering new series", errors.Attr(filterErr))
} else {
matrixToProcess = filteredSeries
}
@@ -129,7 +128,9 @@ func (r *PromRule) buildAndRunQuery(ctx context.Context, ts time.Time) (ruletype
if !r.Condition().ShouldEval(series) {
r.logger.InfoContext(
ctx, "not enough data points to evaluate series, skipping",
"rule.id", r.ID(), "num_points", len(series.Values), "required_points", r.Condition().RequiredNumPoints,
slog.String("rule.id", r.ID()),
slog.Int("num_points", len(series.Values)),
slog.Int("required_points", r.Condition().RequiredNumPoints),
)
continue
}
@@ -173,7 +174,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (int, error) {
for _, lbl := range result.Metric {
l[lbl.Name] = lbl.Value
}
r.logger.DebugContext(ctx, "alerting for series", slog.String("rule.id", r.ID()), slog.Any("series", result))
r.logger.DebugContext(ctx, "alerting for series", slog.Any("series", result))
threshold := valueFormatter.Format(result.Target, result.TargetUnit)
@@ -193,7 +194,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (int, error) {
result, err := tmpl.Expand()
if err != nil {
result = fmt.Sprintf("<error expanding template: %s>", err)
r.logger.WarnContext(ctx, "expanding alert template failed", slog.String("rule.id", r.ID()), errors.Attr(err), slog.Any("alert.template_data", tmplData))
r.logger.WarnContext(ctx, "expanding alert template failed", errors.Attr(err), slog.Any("alert.template_data", tmplData))
}
return result
}
@@ -244,7 +245,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (int, error) {
}
}
r.logger.InfoContext(ctx, "number of alerts found", slog.String("rule.id", r.ID()), slog.Int("alert.count", len(alerts)))
r.logger.InfoContext(ctx, "number of alerts found", slog.Int("alert.count", len(alerts)))
// alerts[h] is ready, add or update active list now
for h, a := range alerts {
// Check whether we already have alerting state for the identifying label set.
@@ -271,7 +272,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (int, error) {
for fp, a := range r.Active {
labelsJSON, err := json.Marshal(a.QueryResultLabels)
if err != nil {
r.logger.ErrorContext(ctx, "error marshaling labels", slog.String("rule.id", r.ID()), errors.Attr(err))
r.logger.ErrorContext(ctx, "error marshaling labels", errors.Attr(err))
}
if _, ok := resultFPs[fp]; !ok {
// If the alert was previously firing, keep it around for a given
@@ -325,7 +326,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (int, error) {
state = ruletypes.StateFiring
}
a.State = state
r.logger.DebugContext(ctx, "converting alert state", slog.String("rule.id", r.ID()), slog.Any("alert.state", state))
r.logger.DebugContext(ctx, "converting alert state", slog.Any("alert.state", state))
itemsToAdd = append(itemsToAdd, rulestatehistorytypes.RuleStateHistory{
RuleID: r.ID(),
RuleName: r.Name(),

View File

@@ -2,6 +2,7 @@ package rules
import (
"context"
"runtime/debug"
"sort"
"sync"
"time"
@@ -308,7 +309,11 @@ func (g *RuleTask) Eval(ctx context.Context, ts time.Time) {
defer func() {
if r := recover(); r != nil {
g.logger.ErrorContext(ctx, "panic during threshold rule evaluation", "panic", r)
g.logger.ErrorContext(
ctx, "panic during threshold rule evaluation",
slog.Any("panic", r),
slog.String("stack", string(debug.Stack())),
)
}
}()

View File

@@ -17,7 +17,7 @@ import (
"github.com/stretchr/testify/require"
)
func prepareQuerierForMetrics(t *testing.T, telemetryStore telemetrystore.TelemetryStore) (querier.Querier, *telemetrytypestest.MockMetadataStore) {
func prepareQuerierForMetrics(t *testing.T, telemetryStore telemetrystore.TelemetryStore) querier.Querier {
providerSettings := instrumentationtest.New().ToProviderSettings()
metadataStore := telemetrytypestest.NewMockMetadataStore()
@@ -50,7 +50,7 @@ func prepareQuerierForMetrics(t *testing.T, telemetryStore telemetrystore.Teleme
nil, // meterStmtBuilder
nil, // traceOperatorStmtBuilder
nil, // bucketCache
), metadataStore
)
}
func prepareQuerierForLogs(telemetryStore telemetrystore.TelemetryStore, keysMap map[string][]*telemetrytypes.TelemetryFieldKey) querier.Querier {

View File

@@ -40,7 +40,7 @@ func NewThresholdRule(
logger *slog.Logger,
opts ...RuleOption,
) (*ThresholdRule, error) {
logger.Info("creating new ThresholdRule", "id", id)
logger.Info("creating new ThresholdRule", slog.String("rule.id", id))
opts = append(opts, WithLogger(logger))
@@ -76,7 +76,6 @@ func (r *ThresholdRule) prepareQueryRange(ctx context.Context, ts time.Time) (*q
slog.Int64("ts", ts.UnixMilli()),
slog.Int64("eval_window", r.evalWindow.Milliseconds()),
slog.Int64("eval_delay", r.evalDelay.Milliseconds()),
slog.String("rule.id", r.ID()),
)
startTs, endTs := r.Timestamps(ts)
@@ -199,7 +198,7 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID,
results = append(results, tsData)
} else {
// NOTE: should not happen but just to ensure we don't miss it if it happens for some reason
r.logger.WarnContext(ctx, "expected qbtypes.TimeSeriesData but got unexpected type", slog.String("rule.id", r.ID()), slog.String("item.type", reflect.TypeOf(item).String()))
r.logger.WarnContext(ctx, "expected qbtypes.TimeSeriesData but got unexpected type", slog.String("item.type", reflect.TypeOf(item).String()))
}
}
@@ -225,7 +224,7 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID,
var resultVector ruletypes.Vector
if queryResult == nil || len(queryResult.Aggregations) == 0 || queryResult.Aggregations[0] == nil {
r.logger.WarnContext(ctx, "query result is nil", slog.String("rule.id", r.ID()), slog.String("query.name", selectedQuery))
r.logger.WarnContext(ctx, "query result is nil", slog.String("query.name", selectedQuery))
return resultVector, nil
}
@@ -235,7 +234,7 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID,
filteredSeries, filterErr := r.BaseRule.FilterNewSeries(ctx, ts, seriesToProcess)
// In case of error we log the error and continue with the original series
if filterErr != nil {
r.logger.ErrorContext(ctx, "error filtering new series", slog.String("rule.id", r.ID()), errors.Attr(filterErr))
r.logger.ErrorContext(ctx, "error filtering new series", errors.Attr(filterErr))
} else {
seriesToProcess = filteredSeries
}
@@ -243,7 +242,11 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID,
for _, series := range seriesToProcess {
if !r.Condition().ShouldEval(series) {
r.logger.InfoContext(ctx, "not enough data points to evaluate series, skipping", slog.String("rule.id", r.ID()), slog.Int("series.num_points", len(series.Values)), slog.Int("series.required_points", r.Condition().RequiredNumPoints))
r.logger.InfoContext(
ctx, "not enough data points to evaluate series, skipping",
slog.Int("series.num_points", len(series.Values)),
slog.Int("series.required_points", r.Condition().RequiredNumPoints),
)
continue
}
resultSeries, err := r.Threshold.Eval(series, r.Unit(), ruletypes.EvalData{
@@ -294,7 +297,10 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (int, error) {
value := valueFormatter.Format(smpl.V, r.Unit())
// todo(aniket): handle different threshold
threshold := valueFormatter.Format(smpl.Target, smpl.TargetUnit)
r.logger.DebugContext(ctx, "alert template data for rule", slog.String("rule.id", r.ID()), slog.String("formatter.name", valueFormatter.Name()), slog.String("alert.value", value), slog.String("alert.threshold", threshold))
r.logger.DebugContext(
ctx, "alert template data for rule", slog.String("formatter.name", valueFormatter.Name()),
slog.String("alert.value", value), slog.String("alert.threshold", threshold),
)
tmplData := ruletypes.AlertTemplateData(l, value, threshold)
// Inject some convenience variables that are easier to remember for users
@@ -313,7 +319,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (int, error) {
result, err := tmpl.Expand()
if err != nil {
result = fmt.Sprintf("<error expanding template: %s>", err)
r.logger.ErrorContext(ctx, "expanding alert template failed", slog.String("rule.id", r.ID()), errors.Attr(err), slog.Any("alert.template_data", tmplData))
r.logger.ErrorContext(ctx, "expanding alert template failed", errors.Attr(err), slog.Any("alert.template_data", tmplData))
}
return result
}
@@ -345,13 +351,13 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (int, error) {
case ruletypes.AlertTypeTraces:
link := r.prepareLinksToTraces(ctx, ts, smpl.Metric)
if link != "" && r.hostFromSource() != "" {
r.logger.InfoContext(ctx, "adding traces link to annotations", slog.String("rule.id", r.ID()), slog.String("annotation.link", fmt.Sprintf("%s/traces-explorer?%s", r.hostFromSource(), link)))
r.logger.InfoContext(ctx, "adding traces link to annotations", slog.String("annotation.link", fmt.Sprintf("%s/traces-explorer?%s", r.hostFromSource(), link)))
annotations = append(annotations, ruletypes.Label{Name: "related_traces", Value: fmt.Sprintf("%s/traces-explorer?%s", r.hostFromSource(), link)})
}
case ruletypes.AlertTypeLogs:
link := r.prepareLinksToLogs(ctx, ts, smpl.Metric)
if link != "" && r.hostFromSource() != "" {
r.logger.InfoContext(ctx, "adding logs link to annotations", slog.String("rule.id", r.ID()), slog.String("annotation.link", fmt.Sprintf("%s/logs/logs-explorer?%s", r.hostFromSource(), link)))
r.logger.InfoContext(ctx, "adding logs link to annotations", slog.String("annotation.link", fmt.Sprintf("%s/logs/logs-explorer?%s", r.hostFromSource(), link)))
annotations = append(annotations, ruletypes.Label{Name: "related_logs", Value: fmt.Sprintf("%s/logs/logs-explorer?%s", r.hostFromSource(), link)})
}
}
@@ -378,7 +384,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (int, error) {
}
}
r.logger.InfoContext(ctx, "number of alerts found", slog.String("rule.id", r.ID()), slog.Int("alert.count", len(alerts)))
r.logger.InfoContext(ctx, "number of alerts found", slog.Int("alert.count", len(alerts)))
// alerts[h] is ready, add or update active list now
for h, a := range alerts {
@@ -406,7 +412,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (int, error) {
for fp, a := range r.Active {
labelsJSON, err := json.Marshal(a.QueryResultLabels)
if err != nil {
r.logger.ErrorContext(ctx, "error marshaling labels", slog.String("rule.id", r.ID()), errors.Attr(err), slog.Any("alert.labels", a.Labels))
r.logger.ErrorContext(ctx, "error marshaling labels", errors.Attr(err), slog.Any("alert.labels", a.Labels))
}
if _, ok := resultFPs[fp]; !ok {
// If the alert was previously firing, keep it around for a given
@@ -415,7 +421,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (int, error) {
delete(r.Active, fp)
}
if a.State != ruletypes.StateInactive {
r.logger.DebugContext(ctx, "converting firing alert to inactive", slog.String("rule.id", r.ID()))
r.logger.DebugContext(ctx, "converting firing alert to inactive")
a.State = ruletypes.StateInactive
a.ResolvedAt = ts
itemsToAdd = append(itemsToAdd, rulestatehistorytypes.RuleStateHistory{
@@ -433,7 +439,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (int, error) {
}
if a.State == ruletypes.StatePending && ts.Sub(a.ActiveAt) >= r.holdDuration.Duration() {
r.logger.DebugContext(ctx, "converting pending alert to firing", slog.String("rule.id", r.ID()))
r.logger.DebugContext(ctx, "converting pending alert to firing")
a.State = ruletypes.StateFiring
a.FiredAt = ts
state := ruletypes.StateFiring
@@ -463,7 +469,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (int, error) {
state = ruletypes.StateFiring
}
a.State = state
r.logger.DebugContext(ctx, "converting alert state", slog.String("rule.id", r.ID()), slog.Any("alert.state", state))
r.logger.DebugContext(ctx, "converting alert state", slog.Any("alert.state", state))
itemsToAdd = append(itemsToAdd, rulestatehistorytypes.RuleStateHistory{
RuleID: r.ID(),
RuleName: r.Name(),

View File

@@ -511,8 +511,7 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
}
telemetryStore := telemetrystoretest.New(telemetrystore.Config{}, &queryMatcherAny{})
querier, mockMetadataStore := prepareQuerierForMetrics(t, telemetryStore)
mockMetadataStore.TypeMap["signoz_calls_total"] = metrictypes.SumType
querier := prepareQuerierForMetrics(t, telemetryStore)
cols := []cmock.ColumnType{
{Name: "ts", Type: "DateTime"},
@@ -728,8 +727,7 @@ func TestThresholdRuleNoData(t *testing.T) {
WithArgs(nil, nil, nil, nil, nil, nil, nil, nil).
WillReturnRows(rows)
querier, mockMetadataStore := prepareQuerierForMetrics(t, telemetryStore)
mockMetadataStore.TypeMap["signoz_calls_total"] = metrictypes.SumType
querier := prepareQuerierForMetrics(t, telemetryStore)
var target float64 = 0
postableRule.RuleCondition.Thresholds = &ruletypes.RuleThresholdData{
@@ -1117,8 +1115,7 @@ func TestMultipleThresholdRule(t *testing.T) {
WithArgs(nil, nil, nil, nil, nil, nil, nil, nil).
WillReturnRows(rows)
querier, mockMetadataStore := prepareQuerierForMetrics(t, telemetryStore)
mockMetadataStore.TypeMap["signoz_calls_total"] = metrictypes.SumType
querier := prepareQuerierForMetrics(t, telemetryStore)
postableRule.RuleCondition.CompareOperator = c.compareOperator
postableRule.RuleCondition.MatchType = c.matchType
@@ -1906,8 +1903,7 @@ func TestThresholdEval_RequireMinPoints(t *testing.T) {
WithArgs(nil, nil, nil, nil, nil, nil, nil, nil).
WillReturnRows(rows)
querier, mockMetadataStore := prepareQuerierForMetrics(t, telemetryStore)
mockMetadataStore.TypeMap["signoz_calls_total"] = metrictypes.SumType
querier := prepareQuerierForMetrics(t, telemetryStore)
rc := postableRule.RuleCondition
rc.Target = &c.target