mirror of
https://github.com/SigNoz/signoz.git
synced 2026-04-07 20:50:26 +01:00
Compare commits
5 Commits
nv/4189-2
...
chore/logg
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6faa86baf0 | ||
|
|
02a5bf832f | ||
|
|
2010dd6df7 | ||
|
|
41529219e4 | ||
|
|
16a9df5244 |
2
.gitignore
vendored
2
.gitignore
vendored
@@ -51,6 +51,8 @@ ee/query-service/tests/test-deploy/data/
|
||||
# local data
|
||||
*.backup
|
||||
*.db
|
||||
*.db-shm
|
||||
*.db-wal
|
||||
**/db
|
||||
/deploy/docker/clickhouse-setup/data/
|
||||
/deploy/docker-swarm/clickhouse-setup/data/
|
||||
|
||||
@@ -49,7 +49,6 @@ func NewAnomalyRule(
|
||||
logger *slog.Logger,
|
||||
opts ...baserules.RuleOption,
|
||||
) (*AnomalyRule, error) {
|
||||
|
||||
logger.Info("creating new AnomalyRule", slog.String("rule.id", id))
|
||||
|
||||
opts = append(opts, baserules.WithLogger(logger))
|
||||
@@ -59,44 +58,44 @@ func NewAnomalyRule(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
t := AnomalyRule{
|
||||
r := AnomalyRule{
|
||||
BaseRule: baseRule,
|
||||
querier: querier,
|
||||
version: p.Version,
|
||||
logger: logger.With(slog.String("rule.id", id)),
|
||||
}
|
||||
|
||||
switch p.RuleCondition.Seasonality {
|
||||
case ruletypes.SeasonalityHourly:
|
||||
t.seasonality = anomaly.SeasonalityHourly
|
||||
r.seasonality = anomaly.SeasonalityHourly
|
||||
case ruletypes.SeasonalityDaily:
|
||||
t.seasonality = anomaly.SeasonalityDaily
|
||||
r.seasonality = anomaly.SeasonalityDaily
|
||||
case ruletypes.SeasonalityWeekly:
|
||||
t.seasonality = anomaly.SeasonalityWeekly
|
||||
r.seasonality = anomaly.SeasonalityWeekly
|
||||
default:
|
||||
t.seasonality = anomaly.SeasonalityDaily
|
||||
r.seasonality = anomaly.SeasonalityDaily
|
||||
}
|
||||
|
||||
logger.Info("using seasonality", slog.String("rule.id", id), slog.String("rule.seasonality", t.seasonality.StringValue()))
|
||||
r.logger.Info("using seasonality", slog.String("rule.seasonality", r.seasonality.StringValue()))
|
||||
|
||||
if t.seasonality == anomaly.SeasonalityHourly {
|
||||
t.provider = anomaly.NewHourlyProvider(
|
||||
if r.seasonality == anomaly.SeasonalityHourly {
|
||||
r.provider = anomaly.NewHourlyProvider(
|
||||
anomaly.WithQuerier[*anomaly.HourlyProvider](querier),
|
||||
anomaly.WithLogger[*anomaly.HourlyProvider](logger),
|
||||
anomaly.WithLogger[*anomaly.HourlyProvider](r.logger),
|
||||
)
|
||||
} else if t.seasonality == anomaly.SeasonalityDaily {
|
||||
t.provider = anomaly.NewDailyProvider(
|
||||
} else if r.seasonality == anomaly.SeasonalityDaily {
|
||||
r.provider = anomaly.NewDailyProvider(
|
||||
anomaly.WithQuerier[*anomaly.DailyProvider](querier),
|
||||
anomaly.WithLogger[*anomaly.DailyProvider](logger),
|
||||
anomaly.WithLogger[*anomaly.DailyProvider](r.logger),
|
||||
)
|
||||
} else if t.seasonality == anomaly.SeasonalityWeekly {
|
||||
t.provider = anomaly.NewWeeklyProvider(
|
||||
} else if r.seasonality == anomaly.SeasonalityWeekly {
|
||||
r.provider = anomaly.NewWeeklyProvider(
|
||||
anomaly.WithQuerier[*anomaly.WeeklyProvider](querier),
|
||||
anomaly.WithLogger[*anomaly.WeeklyProvider](logger),
|
||||
anomaly.WithLogger[*anomaly.WeeklyProvider](r.logger),
|
||||
)
|
||||
}
|
||||
|
||||
t.querier = querier
|
||||
t.version = p.Version
|
||||
t.logger = logger
|
||||
return &t, nil
|
||||
return &r, nil
|
||||
}
|
||||
|
||||
func (r *AnomalyRule) Type() ruletypes.RuleType {
|
||||
@@ -104,8 +103,11 @@ func (r *AnomalyRule) Type() ruletypes.RuleType {
|
||||
}
|
||||
|
||||
func (r *AnomalyRule) prepareQueryRange(ctx context.Context, ts time.Time) *qbtypes.QueryRangeRequest {
|
||||
|
||||
r.logger.InfoContext(ctx, "prepare query range request", slog.String("rule.id", r.ID()), slog.Int64("ts", ts.UnixMilli()), slog.Int64("eval.window_ms", r.EvalWindow().Milliseconds()), slog.Int64("eval.delay_ms", r.EvalDelay().Milliseconds()))
|
||||
r.logger.InfoContext(
|
||||
ctx, "prepare query range request", slog.Int64("ts", ts.UnixMilli()),
|
||||
slog.Int64("eval.window_ms", r.EvalWindow().Milliseconds()),
|
||||
slog.Int64("eval.delay_ms", r.EvalDelay().Milliseconds()),
|
||||
)
|
||||
|
||||
startTs, endTs := r.Timestamps(ts)
|
||||
start, end := startTs.UnixMilli(), endTs.UnixMilli()
|
||||
@@ -145,7 +147,7 @@ func (r *AnomalyRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID, t
|
||||
}
|
||||
|
||||
if queryResult == nil {
|
||||
r.logger.WarnContext(ctx, "nil qb result", slog.String("rule.id", r.ID()), slog.Int64("ts", ts.UnixMilli()))
|
||||
r.logger.WarnContext(ctx, "nil qb result", slog.Int64("ts", ts.UnixMilli()))
|
||||
return ruletypes.Vector{}, nil
|
||||
}
|
||||
|
||||
@@ -156,7 +158,7 @@ func (r *AnomalyRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID, t
|
||||
if missingDataAlert := r.HandleMissingDataAlert(ctx, ts, hasData); missingDataAlert != nil {
|
||||
return ruletypes.Vector{*missingDataAlert}, nil
|
||||
} else if !hasData {
|
||||
r.logger.WarnContext(ctx, "no anomaly result", slog.String("rule.id", r.ID()))
|
||||
r.logger.WarnContext(ctx, "no anomaly result")
|
||||
return ruletypes.Vector{}, nil
|
||||
}
|
||||
|
||||
@@ -164,7 +166,7 @@ func (r *AnomalyRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID, t
|
||||
|
||||
scoresJSON, _ := json.Marshal(queryResult.Aggregations[0].AnomalyScores)
|
||||
// TODO(srikanthccv): this could be noisy but we do this to answer false alert requests
|
||||
r.logger.InfoContext(ctx, "anomaly scores", slog.String("rule.id", r.ID()), slog.String("anomaly.scores", string(scoresJSON)))
|
||||
r.logger.InfoContext(ctx, "anomaly scores", slog.String("anomaly.scores", string(scoresJSON)))
|
||||
|
||||
// Filter out new series if newGroupEvalDelay is configured
|
||||
seriesToProcess := queryResult.Aggregations[0].AnomalyScores
|
||||
@@ -172,7 +174,7 @@ func (r *AnomalyRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID, t
|
||||
filteredSeries, filterErr := r.BaseRule.FilterNewSeries(ctx, ts, seriesToProcess)
|
||||
// In case of error we log the error and continue with the original series
|
||||
if filterErr != nil {
|
||||
r.logger.ErrorContext(ctx, "error filtering new series", slog.String("rule.id", r.ID()), errors.Attr(filterErr))
|
||||
r.logger.ErrorContext(ctx, "error filtering new series", errors.Attr(filterErr))
|
||||
} else {
|
||||
seriesToProcess = filteredSeries
|
||||
}
|
||||
@@ -180,7 +182,11 @@ func (r *AnomalyRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID, t
|
||||
|
||||
for _, series := range seriesToProcess {
|
||||
if !r.Condition().ShouldEval(series) {
|
||||
r.logger.InfoContext(ctx, "not enough data points to evaluate series, skipping", slog.String("rule.id", r.ID()), slog.Int("series.num_points", len(series.Values)), slog.Int("series.required_points", r.Condition().RequiredNumPoints))
|
||||
r.logger.InfoContext(
|
||||
ctx, "not enough data points to evaluate series, skipping",
|
||||
slog.Int("series.num_points", len(series.Values)),
|
||||
slog.Int("series.required_points", r.Condition().RequiredNumPoints),
|
||||
)
|
||||
continue
|
||||
}
|
||||
results, err := r.Threshold.Eval(series, r.Unit(), ruletypes.EvalData{
|
||||
@@ -204,7 +210,7 @@ func (r *AnomalyRule) Eval(ctx context.Context, ts time.Time) (int, error) {
|
||||
var res ruletypes.Vector
|
||||
var err error
|
||||
|
||||
r.logger.InfoContext(ctx, "running query", slog.String("rule.id", r.ID()))
|
||||
r.logger.InfoContext(ctx, "running query")
|
||||
res, err = r.buildAndRunQuery(ctx, r.OrgID(), ts)
|
||||
|
||||
if err != nil {
|
||||
@@ -230,7 +236,10 @@ func (r *AnomalyRule) Eval(ctx context.Context, ts time.Time) (int, error) {
|
||||
}
|
||||
value := valueFormatter.Format(smpl.V, r.Unit())
|
||||
threshold := valueFormatter.Format(smpl.Target, smpl.TargetUnit)
|
||||
r.logger.DebugContext(ctx, "alert template data for rule", slog.String("rule.id", r.ID()), slog.String("formatter.name", valueFormatter.Name()), slog.String("alert.value", value), slog.String("alert.threshold", threshold))
|
||||
r.logger.DebugContext(
|
||||
ctx, "alert template data for rule", slog.String("formatter.name", valueFormatter.Name()),
|
||||
slog.String("alert.value", value), slog.String("alert.threshold", threshold),
|
||||
)
|
||||
|
||||
tmplData := ruletypes.AlertTemplateData(l, value, threshold)
|
||||
// Inject some convenience variables that are easier to remember for users
|
||||
@@ -250,7 +259,7 @@ func (r *AnomalyRule) Eval(ctx context.Context, ts time.Time) (int, error) {
|
||||
result, err := tmpl.Expand()
|
||||
if err != nil {
|
||||
result = fmt.Sprintf("<error expanding template: %s>", err)
|
||||
r.logger.ErrorContext(ctx, "expanding alert template failed", slog.String("rule.id", r.ID()), errors.Attr(err), slog.Any("alert.template_data", tmplData))
|
||||
r.logger.ErrorContext(ctx, "expanding alert template failed", errors.Attr(err), slog.Any("alert.template_data", tmplData))
|
||||
}
|
||||
return result
|
||||
}
|
||||
@@ -280,7 +289,7 @@ func (r *AnomalyRule) Eval(ctx context.Context, ts time.Time) (int, error) {
|
||||
resultFPs[h] = struct{}{}
|
||||
|
||||
if _, ok := alerts[h]; ok {
|
||||
r.logger.ErrorContext(ctx, "the alert query returns duplicate records", slog.String("rule.id", r.ID()), slog.Any("alert", alerts[h]))
|
||||
r.logger.ErrorContext(ctx, "the alert query returns duplicate records", slog.Any("alert", alerts[h]))
|
||||
err = errors.NewInternalf(errors.CodeInternal, "duplicate alert found, vector contains metrics with the same labelset after applying alert labels")
|
||||
return 0, err
|
||||
}
|
||||
@@ -299,7 +308,7 @@ func (r *AnomalyRule) Eval(ctx context.Context, ts time.Time) (int, error) {
|
||||
}
|
||||
}
|
||||
|
||||
r.logger.InfoContext(ctx, "number of alerts found", slog.String("rule.id", r.ID()), slog.Int("alert.count", len(alerts)))
|
||||
r.logger.InfoContext(ctx, "number of alerts found", slog.Int("alert.count", len(alerts)))
|
||||
// alerts[h] is ready, add or update active list now
|
||||
for h, a := range alerts {
|
||||
// Check whether we already have alerting state for the identifying label set.
|
||||
@@ -326,7 +335,7 @@ func (r *AnomalyRule) Eval(ctx context.Context, ts time.Time) (int, error) {
|
||||
for fp, a := range r.Active {
|
||||
labelsJSON, err := json.Marshal(a.QueryResultLabels)
|
||||
if err != nil {
|
||||
r.logger.ErrorContext(ctx, "error marshaling labels", slog.String("rule.id", r.ID()), errors.Attr(err), slog.Any("alert.labels", a.Labels))
|
||||
r.logger.ErrorContext(ctx, "error marshaling labels", errors.Attr(err), slog.Any("alert.labels", a.Labels))
|
||||
}
|
||||
if _, ok := resultFPs[fp]; !ok {
|
||||
// If the alert was previously firing, keep it around for a given
|
||||
@@ -381,7 +390,7 @@ func (r *AnomalyRule) Eval(ctx context.Context, ts time.Time) (int, error) {
|
||||
state = ruletypes.StateFiring
|
||||
}
|
||||
a.State = state
|
||||
r.logger.DebugContext(ctx, "converting alert state", slog.String("rule.id", r.ID()), slog.Any("alert.state", state))
|
||||
r.logger.DebugContext(ctx, "converting alert state", slog.Any("alert.state", state))
|
||||
itemsToAdd = append(itemsToAdd, rulestatehistorytypes.RuleStateHistory{
|
||||
RuleID: r.ID(),
|
||||
RuleName: r.Name(),
|
||||
|
||||
@@ -328,7 +328,12 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *dispatch.Route) {
|
||||
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
|
||||
_, _, err := d.stage.Exec(ctx, d.logger, alerts...)
|
||||
if err != nil {
|
||||
logger := d.logger.With(slog.Int("num_alerts", len(alerts)), errors.Attr(err))
|
||||
receiverName, _ := notify.ReceiverName(ctx)
|
||||
logger := d.logger.With(
|
||||
slog.String("receiver", receiverName),
|
||||
slog.Int("num_alerts", len(alerts)),
|
||||
errors.Attr(err),
|
||||
)
|
||||
if errors.Is(ctx.Err(), context.Canceled) {
|
||||
// It is expected for the context to be canceled on
|
||||
// configuration reload or shutdown. In this case, the
|
||||
|
||||
@@ -383,15 +383,15 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
|
||||
spec.Aggregations[i].Temporality = temp
|
||||
}
|
||||
}
|
||||
if spec.Aggregations[i].Temporality == metrictypes.Unknown {
|
||||
missingMetrics = append(missingMetrics, spec.Aggregations[i].MetricName)
|
||||
continue
|
||||
}
|
||||
if spec.Aggregations[i].MetricName != "" && spec.Aggregations[i].Type == metrictypes.UnspecifiedType {
|
||||
if foundMetricType, ok := metricTypes[spec.Aggregations[i].MetricName]; ok && foundMetricType != metrictypes.UnspecifiedType {
|
||||
spec.Aggregations[i].Type = foundMetricType
|
||||
}
|
||||
}
|
||||
if spec.Aggregations[i].Type == metrictypes.UnspecifiedType {
|
||||
missingMetrics = append(missingMetrics, spec.Aggregations[i].MetricName)
|
||||
continue
|
||||
}
|
||||
presentAggregations = append(presentAggregations, spec.Aggregations[i])
|
||||
}
|
||||
if len(presentAggregations) == 0 {
|
||||
|
||||
@@ -1,145 +0,0 @@
|
||||
package querier
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
cmock "github.com/srikanthccv/ClickHouse-go-mock"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
|
||||
"github.com/SigNoz/signoz/pkg/types/metrictypes"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type queryMatcherAny struct{}
|
||||
|
||||
func (m *queryMatcherAny) Match(string, string) error { return nil }
|
||||
|
||||
// mockMetricStmtBuilder implements qbtypes.StatementBuilder[qbtypes.MetricAggregation]
|
||||
// and returns a fixed query string so the mock ClickHouse can match it.
|
||||
type mockMetricStmtBuilder struct{}
|
||||
|
||||
func (m *mockMetricStmtBuilder) Build(_ context.Context, _, _ uint64, _ qbtypes.RequestType, _ qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], _ map[string]qbtypes.VariableItem) (*qbtypes.Statement, error) {
|
||||
return &qbtypes.Statement{
|
||||
Query: "SELECT ts, value FROM signoz_metrics",
|
||||
Args: nil,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func TestQueryRange_MetricTypeMissing(t *testing.T) {
|
||||
// When a metric has UnspecifiedType and is not found in the metadata store,
|
||||
// the querier should return a not-found error, even if the request provides a temporality
|
||||
providerSettings := instrumentationtest.New().ToProviderSettings()
|
||||
metadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
|
||||
q := New(
|
||||
providerSettings,
|
||||
nil, // telemetryStore
|
||||
metadataStore,
|
||||
nil, // prometheus
|
||||
nil, // traceStmtBuilder
|
||||
nil, // logStmtBuilder
|
||||
nil, // metricStmtBuilder
|
||||
nil, // meterStmtBuilder
|
||||
nil, // traceOperatorStmtBuilder
|
||||
nil, // bucketCache
|
||||
)
|
||||
|
||||
req := &qbtypes.QueryRangeRequest{
|
||||
Start: uint64(time.Now().Add(-5 * time.Minute).UnixMilli()),
|
||||
End: uint64(time.Now().UnixMilli()),
|
||||
RequestType: qbtypes.RequestTypeTimeSeries,
|
||||
CompositeQuery: qbtypes.CompositeQuery{
|
||||
Queries: []qbtypes.QueryEnvelope{{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
||||
Name: "A",
|
||||
StepInterval: qbtypes.Step{Duration: time.Minute},
|
||||
Aggregations: []qbtypes.MetricAggregation{
|
||||
{
|
||||
MetricName: "unknown_metric",
|
||||
Temporality: metrictypes.Cumulative,
|
||||
TimeAggregation: metrictypes.TimeAggregationRate,
|
||||
SpaceAggregation: metrictypes.SpaceAggregationSum,
|
||||
},
|
||||
},
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
},
|
||||
}},
|
||||
},
|
||||
}
|
||||
|
||||
_, err := q.QueryRange(context.Background(), valuer.GenerateUUID(), req)
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "could not find the metric unknown_metric")
|
||||
}
|
||||
|
||||
func TestQueryRange_MetricTypeFromStore(t *testing.T) {
|
||||
// When a metric has UnspecifiedType but the metadata store returns a valid type,
|
||||
// the metric should not be treated as missing.
|
||||
providerSettings := instrumentationtest.New().ToProviderSettings()
|
||||
metadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
metadataStore.TypeMap["my_metric"] = metrictypes.SumType
|
||||
metadataStore.TemporalityMap["my_metric"] = metrictypes.Cumulative
|
||||
|
||||
telemetryStore := telemetrystoretest.New(telemetrystore.Config{}, &queryMatcherAny{})
|
||||
|
||||
cols := []cmock.ColumnType{
|
||||
{Name: "ts", Type: "DateTime"},
|
||||
{Name: "value", Type: "Float64"},
|
||||
}
|
||||
rows := cmock.NewRows(cols, [][]any{
|
||||
{time.Now(), float64(42)},
|
||||
})
|
||||
telemetryStore.Mock().
|
||||
ExpectQuery("SELECT any").
|
||||
WillReturnRows(rows)
|
||||
|
||||
q := New(
|
||||
providerSettings,
|
||||
telemetryStore,
|
||||
metadataStore,
|
||||
nil, // prometheus
|
||||
nil, // traceStmtBuilder
|
||||
nil, // logStmtBuilder
|
||||
&mockMetricStmtBuilder{}, // metricStmtBuilder
|
||||
nil, // meterStmtBuilder
|
||||
nil, // traceOperatorStmtBuilder
|
||||
nil, // bucketCache
|
||||
)
|
||||
|
||||
req := &qbtypes.QueryRangeRequest{
|
||||
Start: uint64(time.Now().Add(-5 * time.Minute).UnixMilli()),
|
||||
End: uint64(time.Now().UnixMilli()),
|
||||
RequestType: qbtypes.RequestTypeTimeSeries,
|
||||
CompositeQuery: qbtypes.CompositeQuery{
|
||||
Queries: []qbtypes.QueryEnvelope{{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
||||
Name: "A",
|
||||
StepInterval: qbtypes.Step{Duration: time.Minute},
|
||||
Aggregations: []qbtypes.MetricAggregation{
|
||||
{
|
||||
MetricName: "my_metric",
|
||||
TimeAggregation: metrictypes.TimeAggregationRate,
|
||||
SpaceAggregation: metrictypes.SpaceAggregationSum,
|
||||
},
|
||||
},
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
},
|
||||
}},
|
||||
},
|
||||
}
|
||||
|
||||
resp, err := q.QueryRange(context.Background(), valuer.GenerateUUID(), req)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, resp)
|
||||
}
|
||||
@@ -110,7 +110,7 @@ func WithEvalDelay(dur valuer.TextDuration) RuleOption {
|
||||
|
||||
func WithLogger(logger *slog.Logger) RuleOption {
|
||||
return func(r *BaseRule) {
|
||||
r.logger = logger
|
||||
r.logger = logger.With(slog.String("rule.id", r.id))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -248,7 +248,7 @@ func (r *BaseRule) SelectedQuery(ctx context.Context) string {
|
||||
if r.ruleCondition.SelectedQuery != "" {
|
||||
return r.ruleCondition.SelectedQuery
|
||||
}
|
||||
r.logger.WarnContext(ctx, "missing selected query", slog.String("rule.id", r.ID()))
|
||||
r.logger.WarnContext(ctx, "missing selected query")
|
||||
return r.ruleCondition.SelectedQueryName()
|
||||
}
|
||||
|
||||
@@ -386,7 +386,7 @@ func (r *BaseRule) RecordRuleStateHistory(ctx context.Context, prevState, curren
|
||||
}
|
||||
|
||||
if err := r.ruleStateHistoryModule.RecordRuleStateHistory(ctx, r.ID(), r.handledRestart, itemsToAdd); err != nil {
|
||||
r.logger.ErrorContext(ctx, "error while recording rule state history", slog.String("rule.id", r.ID()), errors.Attr(err), slog.Any("items_to_add", itemsToAdd))
|
||||
r.logger.ErrorContext(ctx, "error while recording rule state history", errors.Attr(err), slog.Any("items_to_add", itemsToAdd))
|
||||
return err
|
||||
}
|
||||
r.handledRestart = true
|
||||
@@ -580,7 +580,12 @@ func (r *BaseRule) FilterNewSeries(ctx context.Context, ts time.Time, series []*
|
||||
// Check if first_seen + delay has passed
|
||||
if maxFirstSeen+newGroupEvalDelayMs > evalTimeMs {
|
||||
// Still within grace period, skip this series
|
||||
r.logger.InfoContext(ctx, "skipping new series", slog.String("rule.id", r.ID()), slog.Int("series.index", i), slog.Int64("series.max_first_seen", maxFirstSeen), slog.Int64("eval.time_ms", evalTimeMs), slog.Int64("eval.delay_ms", newGroupEvalDelayMs), slog.Any("series.labels", series[i].Labels))
|
||||
r.logger.InfoContext(
|
||||
ctx, "skipping new series",
|
||||
slog.Int("series.index", i), slog.Int64("series.max_first_seen", maxFirstSeen),
|
||||
slog.Int64("eval.time_ms", evalTimeMs), slog.Int64("eval.delay_ms", newGroupEvalDelayMs),
|
||||
slog.Any("series.labels", series[i].Labels),
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -590,7 +595,11 @@ func (r *BaseRule) FilterNewSeries(ctx context.Context, ts time.Time, series []*
|
||||
|
||||
skippedCount := len(series) - len(filteredSeries)
|
||||
if skippedCount > 0 {
|
||||
r.logger.InfoContext(ctx, "filtered new series", slog.String("rule.id", r.ID()), slog.Int("series.skipped_count", skippedCount), slog.Int("series.total_count", len(series)), slog.Int64("eval.delay_ms", newGroupEvalDelayMs))
|
||||
r.logger.InfoContext(
|
||||
ctx, "filtered new series",
|
||||
slog.Int("series.skipped_count", skippedCount), slog.Int("series.total_count", len(series)),
|
||||
slog.Int64("eval.delay_ms", newGroupEvalDelayMs),
|
||||
)
|
||||
}
|
||||
|
||||
return filteredSeries, nil
|
||||
@@ -611,7 +620,7 @@ func (r *BaseRule) HandleMissingDataAlert(ctx context.Context, ts time.Time, has
|
||||
return nil
|
||||
}
|
||||
|
||||
r.logger.InfoContext(ctx, "no data found for rule condition", slog.String("rule.id", r.ID()))
|
||||
r.logger.InfoContext(ctx, "no data found for rule condition")
|
||||
lbls := ruletypes.NewBuilder()
|
||||
if !r.lastTimestampWithDatapoints.IsZero() {
|
||||
lbls.Set(ruletypes.LabelLastSeen, r.lastTimestampWithDatapoints.Format(ruletypes.AlertTimeFormat))
|
||||
|
||||
@@ -438,7 +438,7 @@ func (m *Manager) editTask(_ context.Context, orgID valuer.UUID, rule *ruletypes
|
||||
Logger: m.opts.Logger,
|
||||
Cache: m.cache,
|
||||
ManagerOpts: m.opts,
|
||||
NotifyFunc: m.prepareNotifyFunc(),
|
||||
NotifyFunc: m.notifyFunc,
|
||||
SQLStore: m.sqlstore,
|
||||
OrgID: orgID,
|
||||
})
|
||||
@@ -651,7 +651,7 @@ func (m *Manager) addTask(_ context.Context, orgID valuer.UUID, rule *ruletypes.
|
||||
Logger: m.opts.Logger,
|
||||
Cache: m.cache,
|
||||
ManagerOpts: m.opts,
|
||||
NotifyFunc: m.prepareNotifyFunc(),
|
||||
NotifyFunc: m.notifyFunc,
|
||||
SQLStore: m.sqlstore,
|
||||
OrgID: orgID,
|
||||
})
|
||||
@@ -754,68 +754,63 @@ func (m *Manager) TriggeredAlerts() []*ruletypes.NamedAlert {
|
||||
// NotifyFunc sends notifications about a set of alerts generated by the given expression.
|
||||
type NotifyFunc func(ctx context.Context, orgID string, expr string, alerts ...*ruletypes.Alert)
|
||||
|
||||
// prepareNotifyFunc implements the NotifyFunc for a Notifier.
|
||||
func (m *Manager) prepareNotifyFunc() NotifyFunc {
|
||||
return func(ctx context.Context, orgID string, expr string, alerts ...*ruletypes.Alert) {
|
||||
var res []*alertmanagertypes.PostableAlert
|
||||
// notifyFunc implements the NotifyFunc for a Notifier.
|
||||
func (m *Manager) notifyFunc(ctx context.Context, orgID string, expr string, alerts ...*ruletypes.Alert) {
|
||||
var res []*alertmanagertypes.PostableAlert
|
||||
|
||||
for _, alert := range alerts {
|
||||
generatorURL := alert.GeneratorURL
|
||||
for _, alert := range alerts {
|
||||
generatorURL := alert.GeneratorURL
|
||||
|
||||
a := &alertmanagertypes.PostableAlert{
|
||||
Annotations: alert.Annotations.Map(),
|
||||
StartsAt: strfmt.DateTime(alert.FiredAt),
|
||||
Alert: alertmanagertypes.AlertModel{
|
||||
Labels: alert.Labels.Map(),
|
||||
GeneratorURL: strfmt.URI(generatorURL),
|
||||
},
|
||||
}
|
||||
if !alert.ResolvedAt.IsZero() {
|
||||
a.EndsAt = strfmt.DateTime(alert.ResolvedAt)
|
||||
} else {
|
||||
a.EndsAt = strfmt.DateTime(alert.ValidUntil)
|
||||
}
|
||||
|
||||
res = append(res, a)
|
||||
a := &alertmanagertypes.PostableAlert{
|
||||
Annotations: alert.Annotations.Map(),
|
||||
StartsAt: strfmt.DateTime(alert.FiredAt),
|
||||
Alert: alertmanagertypes.AlertModel{
|
||||
Labels: alert.Labels.Map(),
|
||||
GeneratorURL: strfmt.URI(generatorURL),
|
||||
},
|
||||
}
|
||||
if !alert.ResolvedAt.IsZero() {
|
||||
a.EndsAt = strfmt.DateTime(alert.ResolvedAt)
|
||||
} else {
|
||||
a.EndsAt = strfmt.DateTime(alert.ValidUntil)
|
||||
}
|
||||
|
||||
if len(alerts) > 0 {
|
||||
m.alertmanager.PutAlerts(ctx, orgID, res)
|
||||
}
|
||||
res = append(res, a)
|
||||
}
|
||||
|
||||
if len(alerts) > 0 {
|
||||
m.alertmanager.PutAlerts(ctx, orgID, res)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) prepareTestNotifyFunc() NotifyFunc {
|
||||
return func(ctx context.Context, orgID string, expr string, alerts ...*ruletypes.Alert) {
|
||||
if len(alerts) == 0 {
|
||||
return
|
||||
}
|
||||
ruleID := alerts[0].Labels.Map()[ruletypes.AlertRuleIDLabel]
|
||||
receiverMap := make(map[*alertmanagertypes.PostableAlert][]string)
|
||||
for _, alert := range alerts {
|
||||
generatorURL := alert.GeneratorURL
|
||||
func (m *Manager) testNotifyFunc(ctx context.Context, orgID string, expr string, alerts ...*ruletypes.Alert) {
|
||||
if len(alerts) == 0 {
|
||||
return
|
||||
}
|
||||
ruleID := alerts[0].Labels.Map()[ruletypes.AlertRuleIDLabel]
|
||||
receiverMap := make(map[*alertmanagertypes.PostableAlert][]string)
|
||||
for _, alert := range alerts {
|
||||
generatorURL := alert.GeneratorURL
|
||||
|
||||
a := &alertmanagertypes.PostableAlert{}
|
||||
a.Annotations = alert.Annotations.Map()
|
||||
a.StartsAt = strfmt.DateTime(alert.FiredAt)
|
||||
labelsMap := alert.Labels.Map()
|
||||
labelsMap[ruletypes.TestAlertLabel] = "true"
|
||||
a.Alert = alertmanagertypes.AlertModel{
|
||||
Labels: labelsMap,
|
||||
GeneratorURL: strfmt.URI(generatorURL),
|
||||
}
|
||||
if !alert.ResolvedAt.IsZero() {
|
||||
a.EndsAt = strfmt.DateTime(alert.ResolvedAt)
|
||||
} else {
|
||||
a.EndsAt = strfmt.DateTime(alert.ValidUntil)
|
||||
}
|
||||
receiverMap[a] = alert.Receivers
|
||||
a := &alertmanagertypes.PostableAlert{}
|
||||
a.Annotations = alert.Annotations.Map()
|
||||
a.StartsAt = strfmt.DateTime(alert.FiredAt)
|
||||
labelsMap := alert.Labels.Map()
|
||||
labelsMap[ruletypes.TestAlertLabel] = "true"
|
||||
a.Alert = alertmanagertypes.AlertModel{
|
||||
Labels: labelsMap,
|
||||
GeneratorURL: strfmt.URI(generatorURL),
|
||||
}
|
||||
err := m.alertmanager.TestAlert(ctx, orgID, ruleID, receiverMap)
|
||||
if err != nil {
|
||||
m.logger.ErrorContext(ctx, "failed to send test notification", errors.Attr(err))
|
||||
return
|
||||
if !alert.ResolvedAt.IsZero() {
|
||||
a.EndsAt = strfmt.DateTime(alert.ResolvedAt)
|
||||
} else {
|
||||
a.EndsAt = strfmt.DateTime(alert.ValidUntil)
|
||||
}
|
||||
receiverMap[a] = alert.Receivers
|
||||
}
|
||||
err := m.alertmanager.TestAlert(ctx, orgID, ruleID, receiverMap)
|
||||
if err != nil {
|
||||
m.logger.ErrorContext(ctx, "failed to send test notification", errors.Attr(err))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1041,7 +1036,7 @@ func (m *Manager) TestNotification(ctx context.Context, orgID valuer.UUID, ruleS
|
||||
Logger: m.opts.Logger,
|
||||
Cache: m.cache,
|
||||
ManagerOpts: m.opts,
|
||||
NotifyFunc: m.prepareTestNotifyFunc(),
|
||||
NotifyFunc: m.testNotifyFunc,
|
||||
SQLStore: m.sqlstore,
|
||||
OrgID: orgID,
|
||||
})
|
||||
|
||||
@@ -48,14 +48,13 @@ func NewPromRule(
|
||||
version: postableRule.Version,
|
||||
prometheus: prometheus,
|
||||
}
|
||||
p.logger = logger
|
||||
|
||||
query, err := p.getPqlQuery(context.Background())
|
||||
if err != nil {
|
||||
// can not generate a valid prom QL query
|
||||
return nil, err
|
||||
}
|
||||
logger.Info("creating new prom rule", slog.String("rule.id", id), slog.String("rule.query", query))
|
||||
p.logger.Info("creating new prom rule", slog.String("rule.query", query))
|
||||
return &p, nil
|
||||
}
|
||||
|
||||
@@ -97,7 +96,7 @@ func (r *PromRule) buildAndRunQuery(ctx context.Context, ts time.Time) (ruletype
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r.logger.InfoContext(ctx, "evaluating promql query", slog.String("rule.id", r.ID()), slog.String("rule.query", q))
|
||||
r.logger.InfoContext(ctx, "evaluating promql query", slog.String("rule.query", q))
|
||||
res, err := r.RunAlertQuery(ctx, q, start, end, interval)
|
||||
if err != nil {
|
||||
r.SetHealth(ruletypes.HealthBad)
|
||||
@@ -117,7 +116,7 @@ func (r *PromRule) buildAndRunQuery(ctx context.Context, ts time.Time) (ruletype
|
||||
filteredSeries, filterErr := r.BaseRule.FilterNewSeries(ctx, ts, matrixToProcess)
|
||||
// In case of error we log the error and continue with the original series
|
||||
if filterErr != nil {
|
||||
r.logger.ErrorContext(ctx, "error filtering new series", slog.String("rule.id", r.ID()), errors.Attr(filterErr))
|
||||
r.logger.ErrorContext(ctx, "error filtering new series", errors.Attr(filterErr))
|
||||
} else {
|
||||
matrixToProcess = filteredSeries
|
||||
}
|
||||
@@ -129,7 +128,9 @@ func (r *PromRule) buildAndRunQuery(ctx context.Context, ts time.Time) (ruletype
|
||||
if !r.Condition().ShouldEval(series) {
|
||||
r.logger.InfoContext(
|
||||
ctx, "not enough data points to evaluate series, skipping",
|
||||
"rule.id", r.ID(), "num_points", len(series.Values), "required_points", r.Condition().RequiredNumPoints,
|
||||
slog.String("rule.id", r.ID()),
|
||||
slog.Int("num_points", len(series.Values)),
|
||||
slog.Int("required_points", r.Condition().RequiredNumPoints),
|
||||
)
|
||||
continue
|
||||
}
|
||||
@@ -173,7 +174,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (int, error) {
|
||||
for _, lbl := range result.Metric {
|
||||
l[lbl.Name] = lbl.Value
|
||||
}
|
||||
r.logger.DebugContext(ctx, "alerting for series", slog.String("rule.id", r.ID()), slog.Any("series", result))
|
||||
r.logger.DebugContext(ctx, "alerting for series", slog.Any("series", result))
|
||||
|
||||
threshold := valueFormatter.Format(result.Target, result.TargetUnit)
|
||||
|
||||
@@ -193,7 +194,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (int, error) {
|
||||
result, err := tmpl.Expand()
|
||||
if err != nil {
|
||||
result = fmt.Sprintf("<error expanding template: %s>", err)
|
||||
r.logger.WarnContext(ctx, "expanding alert template failed", slog.String("rule.id", r.ID()), errors.Attr(err), slog.Any("alert.template_data", tmplData))
|
||||
r.logger.WarnContext(ctx, "expanding alert template failed", errors.Attr(err), slog.Any("alert.template_data", tmplData))
|
||||
}
|
||||
return result
|
||||
}
|
||||
@@ -244,7 +245,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (int, error) {
|
||||
}
|
||||
}
|
||||
|
||||
r.logger.InfoContext(ctx, "number of alerts found", slog.String("rule.id", r.ID()), slog.Int("alert.count", len(alerts)))
|
||||
r.logger.InfoContext(ctx, "number of alerts found", slog.Int("alert.count", len(alerts)))
|
||||
// alerts[h] is ready, add or update active list now
|
||||
for h, a := range alerts {
|
||||
// Check whether we already have alerting state for the identifying label set.
|
||||
@@ -271,7 +272,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (int, error) {
|
||||
for fp, a := range r.Active {
|
||||
labelsJSON, err := json.Marshal(a.QueryResultLabels)
|
||||
if err != nil {
|
||||
r.logger.ErrorContext(ctx, "error marshaling labels", slog.String("rule.id", r.ID()), errors.Attr(err))
|
||||
r.logger.ErrorContext(ctx, "error marshaling labels", errors.Attr(err))
|
||||
}
|
||||
if _, ok := resultFPs[fp]; !ok {
|
||||
// If the alert was previously firing, keep it around for a given
|
||||
@@ -325,7 +326,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time) (int, error) {
|
||||
state = ruletypes.StateFiring
|
||||
}
|
||||
a.State = state
|
||||
r.logger.DebugContext(ctx, "converting alert state", slog.String("rule.id", r.ID()), slog.Any("alert.state", state))
|
||||
r.logger.DebugContext(ctx, "converting alert state", slog.Any("alert.state", state))
|
||||
itemsToAdd = append(itemsToAdd, rulestatehistorytypes.RuleStateHistory{
|
||||
RuleID: r.ID(),
|
||||
RuleName: r.Name(),
|
||||
|
||||
@@ -2,6 +2,7 @@ package rules
|
||||
|
||||
import (
|
||||
"context"
|
||||
"runtime/debug"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -308,7 +309,11 @@ func (g *RuleTask) Eval(ctx context.Context, ts time.Time) {
|
||||
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
g.logger.ErrorContext(ctx, "panic during threshold rule evaluation", "panic", r)
|
||||
g.logger.ErrorContext(
|
||||
ctx, "panic during threshold rule evaluation",
|
||||
slog.Any("panic", r),
|
||||
slog.String("stack", string(debug.Stack())),
|
||||
)
|
||||
}
|
||||
}()
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func prepareQuerierForMetrics(t *testing.T, telemetryStore telemetrystore.TelemetryStore) (querier.Querier, *telemetrytypestest.MockMetadataStore) {
|
||||
func prepareQuerierForMetrics(t *testing.T, telemetryStore telemetrystore.TelemetryStore) querier.Querier {
|
||||
providerSettings := instrumentationtest.New().ToProviderSettings()
|
||||
metadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
|
||||
@@ -50,7 +50,7 @@ func prepareQuerierForMetrics(t *testing.T, telemetryStore telemetrystore.Teleme
|
||||
nil, // meterStmtBuilder
|
||||
nil, // traceOperatorStmtBuilder
|
||||
nil, // bucketCache
|
||||
), metadataStore
|
||||
)
|
||||
}
|
||||
|
||||
func prepareQuerierForLogs(telemetryStore telemetrystore.TelemetryStore, keysMap map[string][]*telemetrytypes.TelemetryFieldKey) querier.Querier {
|
||||
|
||||
@@ -40,7 +40,7 @@ func NewThresholdRule(
|
||||
logger *slog.Logger,
|
||||
opts ...RuleOption,
|
||||
) (*ThresholdRule, error) {
|
||||
logger.Info("creating new ThresholdRule", "id", id)
|
||||
logger.Info("creating new ThresholdRule", slog.String("rule.id", id))
|
||||
|
||||
opts = append(opts, WithLogger(logger))
|
||||
|
||||
@@ -76,7 +76,6 @@ func (r *ThresholdRule) prepareQueryRange(ctx context.Context, ts time.Time) (*q
|
||||
slog.Int64("ts", ts.UnixMilli()),
|
||||
slog.Int64("eval_window", r.evalWindow.Milliseconds()),
|
||||
slog.Int64("eval_delay", r.evalDelay.Milliseconds()),
|
||||
slog.String("rule.id", r.ID()),
|
||||
)
|
||||
|
||||
startTs, endTs := r.Timestamps(ts)
|
||||
@@ -199,7 +198,7 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID,
|
||||
results = append(results, tsData)
|
||||
} else {
|
||||
// NOTE: should not happen but just to ensure we don't miss it if it happens for some reason
|
||||
r.logger.WarnContext(ctx, "expected qbtypes.TimeSeriesData but got unexpected type", slog.String("rule.id", r.ID()), slog.String("item.type", reflect.TypeOf(item).String()))
|
||||
r.logger.WarnContext(ctx, "expected qbtypes.TimeSeriesData but got unexpected type", slog.String("item.type", reflect.TypeOf(item).String()))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -225,7 +224,7 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID,
|
||||
var resultVector ruletypes.Vector
|
||||
|
||||
if queryResult == nil || len(queryResult.Aggregations) == 0 || queryResult.Aggregations[0] == nil {
|
||||
r.logger.WarnContext(ctx, "query result is nil", slog.String("rule.id", r.ID()), slog.String("query.name", selectedQuery))
|
||||
r.logger.WarnContext(ctx, "query result is nil", slog.String("query.name", selectedQuery))
|
||||
return resultVector, nil
|
||||
}
|
||||
|
||||
@@ -235,7 +234,7 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID,
|
||||
filteredSeries, filterErr := r.BaseRule.FilterNewSeries(ctx, ts, seriesToProcess)
|
||||
// In case of error we log the error and continue with the original series
|
||||
if filterErr != nil {
|
||||
r.logger.ErrorContext(ctx, "error filtering new series", slog.String("rule.id", r.ID()), errors.Attr(filterErr))
|
||||
r.logger.ErrorContext(ctx, "error filtering new series", errors.Attr(filterErr))
|
||||
} else {
|
||||
seriesToProcess = filteredSeries
|
||||
}
|
||||
@@ -243,7 +242,11 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, orgID valuer.UUID,
|
||||
|
||||
for _, series := range seriesToProcess {
|
||||
if !r.Condition().ShouldEval(series) {
|
||||
r.logger.InfoContext(ctx, "not enough data points to evaluate series, skipping", slog.String("rule.id", r.ID()), slog.Int("series.num_points", len(series.Values)), slog.Int("series.required_points", r.Condition().RequiredNumPoints))
|
||||
r.logger.InfoContext(
|
||||
ctx, "not enough data points to evaluate series, skipping",
|
||||
slog.Int("series.num_points", len(series.Values)),
|
||||
slog.Int("series.required_points", r.Condition().RequiredNumPoints),
|
||||
)
|
||||
continue
|
||||
}
|
||||
resultSeries, err := r.Threshold.Eval(series, r.Unit(), ruletypes.EvalData{
|
||||
@@ -294,7 +297,10 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (int, error) {
|
||||
value := valueFormatter.Format(smpl.V, r.Unit())
|
||||
// todo(aniket): handle different threshold
|
||||
threshold := valueFormatter.Format(smpl.Target, smpl.TargetUnit)
|
||||
r.logger.DebugContext(ctx, "alert template data for rule", slog.String("rule.id", r.ID()), slog.String("formatter.name", valueFormatter.Name()), slog.String("alert.value", value), slog.String("alert.threshold", threshold))
|
||||
r.logger.DebugContext(
|
||||
ctx, "alert template data for rule", slog.String("formatter.name", valueFormatter.Name()),
|
||||
slog.String("alert.value", value), slog.String("alert.threshold", threshold),
|
||||
)
|
||||
|
||||
tmplData := ruletypes.AlertTemplateData(l, value, threshold)
|
||||
// Inject some convenience variables that are easier to remember for users
|
||||
@@ -313,7 +319,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (int, error) {
|
||||
result, err := tmpl.Expand()
|
||||
if err != nil {
|
||||
result = fmt.Sprintf("<error expanding template: %s>", err)
|
||||
r.logger.ErrorContext(ctx, "expanding alert template failed", slog.String("rule.id", r.ID()), errors.Attr(err), slog.Any("alert.template_data", tmplData))
|
||||
r.logger.ErrorContext(ctx, "expanding alert template failed", errors.Attr(err), slog.Any("alert.template_data", tmplData))
|
||||
}
|
||||
return result
|
||||
}
|
||||
@@ -345,13 +351,13 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (int, error) {
|
||||
case ruletypes.AlertTypeTraces:
|
||||
link := r.prepareLinksToTraces(ctx, ts, smpl.Metric)
|
||||
if link != "" && r.hostFromSource() != "" {
|
||||
r.logger.InfoContext(ctx, "adding traces link to annotations", slog.String("rule.id", r.ID()), slog.String("annotation.link", fmt.Sprintf("%s/traces-explorer?%s", r.hostFromSource(), link)))
|
||||
r.logger.InfoContext(ctx, "adding traces link to annotations", slog.String("annotation.link", fmt.Sprintf("%s/traces-explorer?%s", r.hostFromSource(), link)))
|
||||
annotations = append(annotations, ruletypes.Label{Name: "related_traces", Value: fmt.Sprintf("%s/traces-explorer?%s", r.hostFromSource(), link)})
|
||||
}
|
||||
case ruletypes.AlertTypeLogs:
|
||||
link := r.prepareLinksToLogs(ctx, ts, smpl.Metric)
|
||||
if link != "" && r.hostFromSource() != "" {
|
||||
r.logger.InfoContext(ctx, "adding logs link to annotations", slog.String("rule.id", r.ID()), slog.String("annotation.link", fmt.Sprintf("%s/logs/logs-explorer?%s", r.hostFromSource(), link)))
|
||||
r.logger.InfoContext(ctx, "adding logs link to annotations", slog.String("annotation.link", fmt.Sprintf("%s/logs/logs-explorer?%s", r.hostFromSource(), link)))
|
||||
annotations = append(annotations, ruletypes.Label{Name: "related_logs", Value: fmt.Sprintf("%s/logs/logs-explorer?%s", r.hostFromSource(), link)})
|
||||
}
|
||||
}
|
||||
@@ -378,7 +384,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (int, error) {
|
||||
}
|
||||
}
|
||||
|
||||
r.logger.InfoContext(ctx, "number of alerts found", slog.String("rule.id", r.ID()), slog.Int("alert.count", len(alerts)))
|
||||
r.logger.InfoContext(ctx, "number of alerts found", slog.Int("alert.count", len(alerts)))
|
||||
|
||||
// alerts[h] is ready, add or update active list now
|
||||
for h, a := range alerts {
|
||||
@@ -406,7 +412,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (int, error) {
|
||||
for fp, a := range r.Active {
|
||||
labelsJSON, err := json.Marshal(a.QueryResultLabels)
|
||||
if err != nil {
|
||||
r.logger.ErrorContext(ctx, "error marshaling labels", slog.String("rule.id", r.ID()), errors.Attr(err), slog.Any("alert.labels", a.Labels))
|
||||
r.logger.ErrorContext(ctx, "error marshaling labels", errors.Attr(err), slog.Any("alert.labels", a.Labels))
|
||||
}
|
||||
if _, ok := resultFPs[fp]; !ok {
|
||||
// If the alert was previously firing, keep it around for a given
|
||||
@@ -415,7 +421,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (int, error) {
|
||||
delete(r.Active, fp)
|
||||
}
|
||||
if a.State != ruletypes.StateInactive {
|
||||
r.logger.DebugContext(ctx, "converting firing alert to inactive", slog.String("rule.id", r.ID()))
|
||||
r.logger.DebugContext(ctx, "converting firing alert to inactive")
|
||||
a.State = ruletypes.StateInactive
|
||||
a.ResolvedAt = ts
|
||||
itemsToAdd = append(itemsToAdd, rulestatehistorytypes.RuleStateHistory{
|
||||
@@ -433,7 +439,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (int, error) {
|
||||
}
|
||||
|
||||
if a.State == ruletypes.StatePending && ts.Sub(a.ActiveAt) >= r.holdDuration.Duration() {
|
||||
r.logger.DebugContext(ctx, "converting pending alert to firing", slog.String("rule.id", r.ID()))
|
||||
r.logger.DebugContext(ctx, "converting pending alert to firing")
|
||||
a.State = ruletypes.StateFiring
|
||||
a.FiredAt = ts
|
||||
state := ruletypes.StateFiring
|
||||
@@ -463,7 +469,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (int, error) {
|
||||
state = ruletypes.StateFiring
|
||||
}
|
||||
a.State = state
|
||||
r.logger.DebugContext(ctx, "converting alert state", slog.String("rule.id", r.ID()), slog.Any("alert.state", state))
|
||||
r.logger.DebugContext(ctx, "converting alert state", slog.Any("alert.state", state))
|
||||
itemsToAdd = append(itemsToAdd, rulestatehistorytypes.RuleStateHistory{
|
||||
RuleID: r.ID(),
|
||||
RuleName: r.Name(),
|
||||
|
||||
@@ -511,8 +511,7 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
|
||||
}
|
||||
telemetryStore := telemetrystoretest.New(telemetrystore.Config{}, &queryMatcherAny{})
|
||||
|
||||
querier, mockMetadataStore := prepareQuerierForMetrics(t, telemetryStore)
|
||||
mockMetadataStore.TypeMap["signoz_calls_total"] = metrictypes.SumType
|
||||
querier := prepareQuerierForMetrics(t, telemetryStore)
|
||||
|
||||
cols := []cmock.ColumnType{
|
||||
{Name: "ts", Type: "DateTime"},
|
||||
@@ -728,8 +727,7 @@ func TestThresholdRuleNoData(t *testing.T) {
|
||||
WithArgs(nil, nil, nil, nil, nil, nil, nil, nil).
|
||||
WillReturnRows(rows)
|
||||
|
||||
querier, mockMetadataStore := prepareQuerierForMetrics(t, telemetryStore)
|
||||
mockMetadataStore.TypeMap["signoz_calls_total"] = metrictypes.SumType
|
||||
querier := prepareQuerierForMetrics(t, telemetryStore)
|
||||
|
||||
var target float64 = 0
|
||||
postableRule.RuleCondition.Thresholds = &ruletypes.RuleThresholdData{
|
||||
@@ -1117,8 +1115,7 @@ func TestMultipleThresholdRule(t *testing.T) {
|
||||
WithArgs(nil, nil, nil, nil, nil, nil, nil, nil).
|
||||
WillReturnRows(rows)
|
||||
|
||||
querier, mockMetadataStore := prepareQuerierForMetrics(t, telemetryStore)
|
||||
mockMetadataStore.TypeMap["signoz_calls_total"] = metrictypes.SumType
|
||||
querier := prepareQuerierForMetrics(t, telemetryStore)
|
||||
|
||||
postableRule.RuleCondition.CompareOperator = c.compareOperator
|
||||
postableRule.RuleCondition.MatchType = c.matchType
|
||||
@@ -1906,8 +1903,7 @@ func TestThresholdEval_RequireMinPoints(t *testing.T) {
|
||||
WithArgs(nil, nil, nil, nil, nil, nil, nil, nil).
|
||||
WillReturnRows(rows)
|
||||
|
||||
querier, mockMetadataStore := prepareQuerierForMetrics(t, telemetryStore)
|
||||
mockMetadataStore.TypeMap["signoz_calls_total"] = metrictypes.SumType
|
||||
querier := prepareQuerierForMetrics(t, telemetryStore)
|
||||
|
||||
rc := postableRule.RuleCondition
|
||||
rc.Target = &c.target
|
||||
|
||||
Reference in New Issue
Block a user