mirror of
https://github.com/SigNoz/signoz.git
synced 2026-02-26 18:32:35 +00:00
Compare commits
1 Commits
fix/remove
...
multiple-t
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7e1947800b |
@@ -26,6 +26,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "evaluation is invalid: %v", err)
|
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "evaluation is invalid: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if opts.Rule.RuleType == ruletypes.RuleTypeThreshold {
|
if opts.Rule.RuleType == ruletypes.RuleTypeThreshold {
|
||||||
// create a threshold rule
|
// create a threshold rule
|
||||||
tr, err := baserules.NewThresholdRule(
|
tr, err := baserules.NewThresholdRule(
|
||||||
|
|||||||
@@ -74,6 +74,21 @@ type Alertmanager interface {
|
|||||||
CreateInhibitRules(ctx context.Context, orgID valuer.UUID, rules []amConfig.InhibitRule) error
|
CreateInhibitRules(ctx context.Context, orgID valuer.UUID, rules []amConfig.InhibitRule) error
|
||||||
DeleteAllInhibitRulesByRuleId(ctx context.Context, orgID valuer.UUID, ruleId string) error
|
DeleteAllInhibitRulesByRuleId(ctx context.Context, orgID valuer.UUID, ruleId string) error
|
||||||
|
|
||||||
|
// Planned Maintenance CRUD
|
||||||
|
GetAllPlannedMaintenance(ctx context.Context, orgID string) ([]*alertmanagertypes.GettablePlannedMaintenance, error)
|
||||||
|
GetPlannedMaintenanceByID(ctx context.Context, id valuer.UUID) (*alertmanagertypes.GettablePlannedMaintenance, error)
|
||||||
|
CreatePlannedMaintenance(ctx context.Context, maintenance alertmanagertypes.GettablePlannedMaintenance) (valuer.UUID, error)
|
||||||
|
EditPlannedMaintenance(ctx context.Context, maintenance alertmanagertypes.GettablePlannedMaintenance, id valuer.UUID) error
|
||||||
|
DeletePlannedMaintenance(ctx context.Context, id valuer.UUID) error
|
||||||
|
|
||||||
|
// Rule State History
|
||||||
|
RecordRuleStateHistory(ctx context.Context, orgID string, entries []alertmanagertypes.RuleStateHistory) error
|
||||||
|
GetLastSavedRuleStateHistory(ctx context.Context, ruleID string) ([]alertmanagertypes.RuleStateHistory, error)
|
||||||
|
GetRuleStateHistoryTimeline(ctx context.Context, orgID string, ruleID string, params *alertmanagertypes.QueryRuleStateHistory) (*alertmanagertypes.RuleStateTimeline, error)
|
||||||
|
GetRuleStateHistoryTopContributors(ctx context.Context, orgID string, ruleID string, params *alertmanagertypes.QueryRuleStateHistory) ([]alertmanagertypes.RuleStateHistoryContributor, error)
|
||||||
|
GetOverallStateTransitions(ctx context.Context, orgID string, ruleID string, params *alertmanagertypes.QueryRuleStateHistory) ([]alertmanagertypes.RuleStateTransition, error)
|
||||||
|
GetRuleStats(ctx context.Context, orgID string, ruleID string, params *alertmanagertypes.QueryRuleStateHistory) (*alertmanagertypes.RuleStats, error)
|
||||||
|
|
||||||
// Collects stats for the organization.
|
// Collects stats for the organization.
|
||||||
statsreporter.StatsCollector
|
statsreporter.StatsCollector
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -68,20 +68,30 @@ type Server struct {
|
|||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
stopc chan struct{}
|
stopc chan struct{}
|
||||||
notificationManager nfmanager.NotificationManager
|
notificationManager nfmanager.NotificationManager
|
||||||
|
|
||||||
|
// maintenanceExprMuter is an optional muter for expression-based maintenance scoping
|
||||||
|
maintenanceExprMuter types.Muter
|
||||||
|
// muteStageMetrics are created once and reused across SetConfig calls
|
||||||
|
muteStageMetrics *notify.Metrics
|
||||||
|
|
||||||
|
// signozRegisterer is used for metrics in the pipeline
|
||||||
|
signozRegisterer prometheus.Registerer
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(ctx context.Context, logger *slog.Logger, registry prometheus.Registerer, srvConfig Config, orgID string, stateStore alertmanagertypes.StateStore, nfManager nfmanager.NotificationManager) (*Server, error) {
|
func New(ctx context.Context, logger *slog.Logger, registry prometheus.Registerer, srvConfig Config, orgID string, stateStore alertmanagertypes.StateStore, nfManager nfmanager.NotificationManager, maintenanceExprMuter types.Muter) (*Server, error) {
|
||||||
server := &Server{
|
server := &Server{
|
||||||
logger: logger.With("pkg", "go.signoz.io/pkg/alertmanager/alertmanagerserver"),
|
logger: logger.With("pkg", "go.signoz.io/pkg/alertmanager/alertmanagerserver"),
|
||||||
registry: registry,
|
registry: registry,
|
||||||
srvConfig: srvConfig,
|
srvConfig: srvConfig,
|
||||||
orgID: orgID,
|
orgID: orgID,
|
||||||
stateStore: stateStore,
|
stateStore: stateStore,
|
||||||
stopc: make(chan struct{}),
|
stopc: make(chan struct{}),
|
||||||
notificationManager: nfManager,
|
notificationManager: nfManager,
|
||||||
|
maintenanceExprMuter: maintenanceExprMuter,
|
||||||
}
|
}
|
||||||
signozRegisterer := prometheus.WrapRegistererWithPrefix("signoz_", registry)
|
server.signozRegisterer = prometheus.WrapRegistererWithPrefix("signoz_", registry)
|
||||||
signozRegisterer = prometheus.WrapRegistererWith(prometheus.Labels{"org_id": server.orgID}, signozRegisterer)
|
server.signozRegisterer = prometheus.WrapRegistererWith(prometheus.Labels{"org_id": server.orgID}, server.signozRegisterer)
|
||||||
|
signozRegisterer := server.signozRegisterer
|
||||||
// initialize marker
|
// initialize marker
|
||||||
server.marker = alertmanagertypes.NewMarker(signozRegisterer)
|
server.marker = alertmanagertypes.NewMarker(signozRegisterer)
|
||||||
|
|
||||||
@@ -198,6 +208,11 @@ func New(ctx context.Context, logger *slog.Logger, registry prometheus.Registere
|
|||||||
server.pipelineBuilder = notify.NewPipelineBuilder(signozRegisterer, featurecontrol.NoopFlags{})
|
server.pipelineBuilder = notify.NewPipelineBuilder(signozRegisterer, featurecontrol.NoopFlags{})
|
||||||
server.dispatcherMetrics = NewDispatcherMetrics(false, signozRegisterer)
|
server.dispatcherMetrics = NewDispatcherMetrics(false, signozRegisterer)
|
||||||
|
|
||||||
|
if server.maintenanceExprMuter != nil {
|
||||||
|
muteRegisterer := prometheus.WrapRegistererWithPrefix("maintenance_mute_", signozRegisterer)
|
||||||
|
server.muteStageMetrics = notify.NewMetrics(muteRegisterer, featurecontrol.NoopFlags{})
|
||||||
|
}
|
||||||
|
|
||||||
return server, nil
|
return server, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -205,6 +220,9 @@ func (server *Server) GetAlerts(ctx context.Context, params alertmanagertypes.Ge
|
|||||||
return alertmanagertypes.NewGettableAlertsFromAlertProvider(server.alerts, server.alertmanagerConfig, server.marker.Status, func(labels model.LabelSet) {
|
return alertmanagertypes.NewGettableAlertsFromAlertProvider(server.alerts, server.alertmanagerConfig, server.marker.Status, func(labels model.LabelSet) {
|
||||||
server.inhibitor.Mutes(labels)
|
server.inhibitor.Mutes(labels)
|
||||||
server.silencer.Mutes(labels)
|
server.silencer.Mutes(labels)
|
||||||
|
if server.maintenanceExprMuter != nil {
|
||||||
|
server.maintenanceExprMuter.Mutes(labels)
|
||||||
|
}
|
||||||
}, params)
|
}, params)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -293,6 +311,14 @@ func (server *Server) SetConfig(ctx context.Context, alertmanagerConfig *alertma
|
|||||||
pipelinePeer,
|
pipelinePeer,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Inject expression-based maintenance muter into the pipeline
|
||||||
|
if server.maintenanceExprMuter != nil {
|
||||||
|
ms := notify.NewMuteStage(server.maintenanceExprMuter, server.muteStageMetrics)
|
||||||
|
for name, stage := range pipeline {
|
||||||
|
pipeline[name] = notify.MultiStage{ms, stage}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
timeoutFunc := func(d time.Duration) time.Duration {
|
timeoutFunc := func(d time.Duration) time.Duration {
|
||||||
if d < notify.MinTimeout {
|
if d < notify.MinTimeout {
|
||||||
d = notify.MinTimeout
|
d = notify.MinTimeout
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -22,9 +23,35 @@ import (
|
|||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// testMuter implements types.Muter for testing maintenance expression muting.
|
||||||
|
type testMuter struct {
|
||||||
|
mu sync.RWMutex
|
||||||
|
muteFunc func(model.LabelSet) bool
|
||||||
|
calls []model.LabelSet
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *testMuter) Mutes(labels model.LabelSet) bool {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
m.calls = append(m.calls, labels)
|
||||||
|
if m.muteFunc != nil {
|
||||||
|
return m.muteFunc(labels)
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *testMuter) getCalls() []model.LabelSet {
|
||||||
|
m.mu.RLock()
|
||||||
|
defer m.mu.RUnlock()
|
||||||
|
result := make([]model.LabelSet, len(m.calls))
|
||||||
|
copy(result, m.calls)
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
func TestEndToEndAlertManagerFlow(t *testing.T) {
|
func TestEndToEndAlertManagerFlow(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
providerSettings := instrumentationtest.New().ToProviderSettings()
|
providerSettings := instrumentationtest.New().ToProviderSettings()
|
||||||
@@ -90,7 +117,7 @@ func TestEndToEndAlertManagerFlow(t *testing.T) {
|
|||||||
stateStore := alertmanagertypestest.NewStateStore()
|
stateStore := alertmanagertypestest.NewStateStore()
|
||||||
registry := prometheus.NewRegistry()
|
registry := prometheus.NewRegistry()
|
||||||
logger := slog.New(slog.DiscardHandler)
|
logger := slog.New(slog.DiscardHandler)
|
||||||
server, err := New(context.Background(), logger, registry, srvCfg, orgID, stateStore, notificationManager)
|
server, err := New(context.Background(), logger, registry, srvCfg, orgID, stateStore, notificationManager, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
amConfig, err := alertmanagertypes.NewDefaultConfig(srvCfg.Global, srvCfg.Route, orgID)
|
amConfig, err := alertmanagertypes.NewDefaultConfig(srvCfg.Global, srvCfg.Route, orgID)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@@ -221,3 +248,257 @@ func TestEndToEndAlertManagerFlow(t *testing.T) {
|
|||||||
require.Equal(t, "{__receiver__=\"webhook\"}:{cluster=\"prod-cluster\", instance=\"server-03\", ruleId=\"high-cpu-usage\"}", alertGroups[2].GroupKey)
|
require.Equal(t, "{__receiver__=\"webhook\"}:{cluster=\"prod-cluster\", instance=\"server-03\", ruleId=\"high-cpu-usage\"}", alertGroups[2].GroupKey)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestEndToEndMaintenanceMuting verifies that the maintenance expression muter
|
||||||
|
// integrates correctly with the alertmanager server pipeline:
|
||||||
|
// 1. MuteStage is injected into the notification pipeline when a muter is provided
|
||||||
|
// 2. Alerts remain visible in GetAlerts during maintenance (muting suppresses
|
||||||
|
// notifications, not alert visibility)
|
||||||
|
// 3. The muter is called during GetAlerts for status resolution
|
||||||
|
func TestEndToEndMaintenanceMuting(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
providerSettings := instrumentationtest.New().ToProviderSettings()
|
||||||
|
|
||||||
|
store := nfroutingstoretest.NewMockSQLRouteStore()
|
||||||
|
store.MatchExpectationsInOrder(false)
|
||||||
|
notificationManager, err := rulebasednotification.New(ctx, providerSettings, nfmanager.Config{}, store)
|
||||||
|
require.NoError(t, err)
|
||||||
|
orgID := "test-org-maintenance"
|
||||||
|
|
||||||
|
// Create a muter that mutes alerts with severity == "critical"
|
||||||
|
muter := &testMuter{
|
||||||
|
muteFunc: func(labels model.LabelSet) bool {
|
||||||
|
return string(labels["severity"]) == "critical"
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
srvCfg := NewConfig()
|
||||||
|
stateStore := alertmanagertypestest.NewStateStore()
|
||||||
|
registry := prometheus.NewRegistry()
|
||||||
|
logger := slog.New(slog.DiscardHandler)
|
||||||
|
|
||||||
|
// Create server WITH the maintenance muter
|
||||||
|
server, err := New(ctx, logger, registry, srvCfg, orgID, stateStore, notificationManager, muter)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
amConfig, err := alertmanagertypes.NewDefaultConfig(srvCfg.Global, srvCfg.Route, orgID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = server.SetConfig(ctx, amConfig)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Put a mix of alerts: 2 critical (should be muted) and 1 warning (should not)
|
||||||
|
now := time.Now()
|
||||||
|
testAlerts := []*alertmanagertypes.PostableAlert{
|
||||||
|
{
|
||||||
|
Alert: alertmanagertypes.AlertModel{
|
||||||
|
Labels: map[string]string{
|
||||||
|
"ruleId": "disk-usage",
|
||||||
|
"severity": "critical",
|
||||||
|
"env": "prod",
|
||||||
|
"alertname": "DiskUsageHigh",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Annotations: map[string]string{"summary": "Disk usage critical"},
|
||||||
|
StartsAt: strfmt.DateTime(now.Add(-5 * time.Minute)),
|
||||||
|
EndsAt: strfmt.DateTime(time.Time{}),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Alert: alertmanagertypes.AlertModel{
|
||||||
|
Labels: map[string]string{
|
||||||
|
"ruleId": "disk-usage",
|
||||||
|
"severity": "warning",
|
||||||
|
"env": "prod",
|
||||||
|
"alertname": "DiskUsageHigh",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Annotations: map[string]string{"summary": "Disk usage warning"},
|
||||||
|
StartsAt: strfmt.DateTime(now.Add(-3 * time.Minute)),
|
||||||
|
EndsAt: strfmt.DateTime(time.Time{}),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Alert: alertmanagertypes.AlertModel{
|
||||||
|
Labels: map[string]string{
|
||||||
|
"ruleId": "memory-usage",
|
||||||
|
"severity": "critical",
|
||||||
|
"env": "staging",
|
||||||
|
"alertname": "MemoryUsageHigh",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Annotations: map[string]string{"summary": "Memory usage critical"},
|
||||||
|
StartsAt: strfmt.DateTime(now.Add(-2 * time.Minute)),
|
||||||
|
EndsAt: strfmt.DateTime(time.Time{}),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
err = server.PutAlerts(ctx, testAlerts)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
|
t.Run("alerts_visible_during_maintenance", func(t *testing.T) {
|
||||||
|
// Maintenance muting suppresses notifications, NOT alert visibility.
|
||||||
|
// All 3 alerts should still be returned by GetAlerts.
|
||||||
|
req, err := http.NewRequest(http.MethodGet, "/alerts", nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
params, err := alertmanagertypes.NewGettableAlertsParams(req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
alerts, err := server.GetAlerts(ctx, params)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, alerts, 3, "All alerts should be visible during maintenance")
|
||||||
|
|
||||||
|
// Verify labels are intact
|
||||||
|
severities := map[string]int{}
|
||||||
|
for _, alert := range alerts {
|
||||||
|
severities[alert.Alert.Labels["severity"]]++
|
||||||
|
}
|
||||||
|
assert.Equal(t, 2, severities["critical"])
|
||||||
|
assert.Equal(t, 1, severities["warning"])
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("muter_called_during_get_alerts", func(t *testing.T) {
|
||||||
|
// The muter should have been called for each alert during GetAlerts.
|
||||||
|
calls := muter.getCalls()
|
||||||
|
assert.GreaterOrEqual(t, len(calls), 3, "Muter should be called for each alert")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("muter_correctly_identifies_targets", func(t *testing.T) {
|
||||||
|
// Verify the muter returns correct results for different label sets
|
||||||
|
assert.True(t, muter.Mutes(model.LabelSet{"severity": "critical", "env": "prod"}),
|
||||||
|
"Should mute critical alerts")
|
||||||
|
assert.False(t, muter.Mutes(model.LabelSet{"severity": "warning", "env": "prod"}),
|
||||||
|
"Should not mute warning alerts")
|
||||||
|
assert.True(t, muter.Mutes(model.LabelSet{"severity": "critical", "env": "staging"}),
|
||||||
|
"Should mute critical regardless of env")
|
||||||
|
})
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestEndToEndMaintenanceCatchAll verifies that a catch-all muter (always returns true)
|
||||||
|
// mutes all alerts while keeping them visible.
|
||||||
|
func TestEndToEndMaintenanceCatchAll(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
providerSettings := instrumentationtest.New().ToProviderSettings()
|
||||||
|
|
||||||
|
store := nfroutingstoretest.NewMockSQLRouteStore()
|
||||||
|
store.MatchExpectationsInOrder(false)
|
||||||
|
notificationManager, err := rulebasednotification.New(ctx, providerSettings, nfmanager.Config{}, store)
|
||||||
|
require.NoError(t, err)
|
||||||
|
orgID := "test-org-catchall"
|
||||||
|
|
||||||
|
// Catch-all muter: mutes everything
|
||||||
|
muter := &testMuter{
|
||||||
|
muteFunc: func(labels model.LabelSet) bool {
|
||||||
|
return true
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
srvCfg := NewConfig()
|
||||||
|
stateStore := alertmanagertypestest.NewStateStore()
|
||||||
|
registry := prometheus.NewRegistry()
|
||||||
|
logger := slog.New(slog.DiscardHandler)
|
||||||
|
|
||||||
|
server, err := New(ctx, logger, registry, srvCfg, orgID, stateStore, notificationManager, muter)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
amConfig, err := alertmanagertypes.NewDefaultConfig(srvCfg.Global, srvCfg.Route, orgID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = server.SetConfig(ctx, amConfig)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
testAlerts := []*alertmanagertypes.PostableAlert{
|
||||||
|
{
|
||||||
|
Alert: alertmanagertypes.AlertModel{
|
||||||
|
Labels: map[string]string{
|
||||||
|
"ruleId": "rule-1", "alertname": "Alert1", "env": "prod",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
StartsAt: strfmt.DateTime(now.Add(-1 * time.Minute)),
|
||||||
|
EndsAt: strfmt.DateTime(time.Time{}),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Alert: alertmanagertypes.AlertModel{
|
||||||
|
Labels: map[string]string{
|
||||||
|
"ruleId": "rule-2", "alertname": "Alert2", "env": "staging",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
StartsAt: strfmt.DateTime(now.Add(-1 * time.Minute)),
|
||||||
|
EndsAt: strfmt.DateTime(time.Time{}),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
err = server.PutAlerts(ctx, testAlerts)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
|
req, err := http.NewRequest(http.MethodGet, "/alerts", nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
params, err := alertmanagertypes.NewGettableAlertsParams(req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
alerts, err := server.GetAlerts(ctx, params)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Len(t, alerts, 2, "All alerts should remain visible even when catch-all muter is active")
|
||||||
|
|
||||||
|
// Verify the muter was called for each alert
|
||||||
|
calls := muter.getCalls()
|
||||||
|
assert.GreaterOrEqual(t, len(calls), 2, "Muter should be called for each alert")
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestEndToEndNoMuter verifies the server works correctly without a muter (nil),
|
||||||
|
// matching the existing behavior where no maintenance muting is configured.
|
||||||
|
func TestEndToEndNoMuter(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
providerSettings := instrumentationtest.New().ToProviderSettings()
|
||||||
|
|
||||||
|
store := nfroutingstoretest.NewMockSQLRouteStore()
|
||||||
|
store.MatchExpectationsInOrder(false)
|
||||||
|
notificationManager, err := rulebasednotification.New(ctx, providerSettings, nfmanager.Config{}, store)
|
||||||
|
require.NoError(t, err)
|
||||||
|
orgID := "test-org-nomuter"
|
||||||
|
|
||||||
|
srvCfg := NewConfig()
|
||||||
|
stateStore := alertmanagertypestest.NewStateStore()
|
||||||
|
registry := prometheus.NewRegistry()
|
||||||
|
logger := slog.New(slog.DiscardHandler)
|
||||||
|
|
||||||
|
// Create server WITHOUT a muter (nil)
|
||||||
|
server, err := New(ctx, logger, registry, srvCfg, orgID, stateStore, notificationManager, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
amConfig, err := alertmanagertypes.NewDefaultConfig(srvCfg.Global, srvCfg.Route, orgID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = server.SetConfig(ctx, amConfig)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
testAlerts := []*alertmanagertypes.PostableAlert{
|
||||||
|
{
|
||||||
|
Alert: alertmanagertypes.AlertModel{
|
||||||
|
Labels: map[string]string{
|
||||||
|
"ruleId": "rule-1", "alertname": "Alert1", "severity": "critical",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
StartsAt: strfmt.DateTime(now.Add(-1 * time.Minute)),
|
||||||
|
EndsAt: strfmt.DateTime(time.Time{}),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
err = server.PutAlerts(ctx, testAlerts)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
|
req, err := http.NewRequest(http.MethodGet, "/alerts", nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
params, err := alertmanagertypes.NewGettableAlertsParams(req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
alerts, err := server.GetAlerts(ctx, params)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Len(t, alerts, 1, "Alert should be returned when no muter is configured")
|
||||||
|
assert.Equal(t, "critical", alerts[0].Alert.Labels["severity"])
|
||||||
|
}
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ import (
|
|||||||
|
|
||||||
func TestServerSetConfigAndStop(t *testing.T) {
|
func TestServerSetConfigAndStop(t *testing.T) {
|
||||||
notificationManager := nfmanagertest.NewMock()
|
notificationManager := nfmanagertest.NewMock()
|
||||||
server, err := New(context.Background(), slog.New(slog.DiscardHandler), prometheus.NewRegistry(), NewConfig(), "1", alertmanagertypestest.NewStateStore(), notificationManager)
|
server, err := New(context.Background(), slog.New(slog.DiscardHandler), prometheus.NewRegistry(), NewConfig(), "1", alertmanagertypestest.NewStateStore(), notificationManager, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
amConfig, err := alertmanagertypes.NewDefaultConfig(alertmanagertypes.GlobalConfig{}, alertmanagertypes.RouteConfig{GroupInterval: 1 * time.Minute, RepeatInterval: 1 * time.Minute, GroupWait: 1 * time.Minute}, "1")
|
amConfig, err := alertmanagertypes.NewDefaultConfig(alertmanagertypes.GlobalConfig{}, alertmanagertypes.RouteConfig{GroupInterval: 1 * time.Minute, RepeatInterval: 1 * time.Minute, GroupWait: 1 * time.Minute}, "1")
|
||||||
@@ -37,7 +37,7 @@ func TestServerSetConfigAndStop(t *testing.T) {
|
|||||||
|
|
||||||
func TestServerTestReceiverTypeWebhook(t *testing.T) {
|
func TestServerTestReceiverTypeWebhook(t *testing.T) {
|
||||||
notificationManager := nfmanagertest.NewMock()
|
notificationManager := nfmanagertest.NewMock()
|
||||||
server, err := New(context.Background(), slog.New(slog.DiscardHandler), prometheus.NewRegistry(), NewConfig(), "1", alertmanagertypestest.NewStateStore(), notificationManager)
|
server, err := New(context.Background(), slog.New(slog.DiscardHandler), prometheus.NewRegistry(), NewConfig(), "1", alertmanagertypestest.NewStateStore(), notificationManager, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
amConfig, err := alertmanagertypes.NewDefaultConfig(alertmanagertypes.GlobalConfig{}, alertmanagertypes.RouteConfig{GroupInterval: 1 * time.Minute, RepeatInterval: 1 * time.Minute, GroupWait: 1 * time.Minute}, "1")
|
amConfig, err := alertmanagertypes.NewDefaultConfig(alertmanagertypes.GlobalConfig{}, alertmanagertypes.RouteConfig{GroupInterval: 1 * time.Minute, RepeatInterval: 1 * time.Minute, GroupWait: 1 * time.Minute}, "1")
|
||||||
@@ -85,7 +85,7 @@ func TestServerPutAlerts(t *testing.T) {
|
|||||||
srvCfg := NewConfig()
|
srvCfg := NewConfig()
|
||||||
srvCfg.Route.GroupInterval = 1 * time.Second
|
srvCfg.Route.GroupInterval = 1 * time.Second
|
||||||
notificationManager := nfmanagertest.NewMock()
|
notificationManager := nfmanagertest.NewMock()
|
||||||
server, err := New(context.Background(), slog.New(slog.DiscardHandler), prometheus.NewRegistry(), srvCfg, "1", stateStore, notificationManager)
|
server, err := New(context.Background(), slog.New(slog.DiscardHandler), prometheus.NewRegistry(), srvCfg, "1", stateStore, notificationManager, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
amConfig, err := alertmanagertypes.NewDefaultConfig(srvCfg.Global, srvCfg.Route, "1")
|
amConfig, err := alertmanagertypes.NewDefaultConfig(srvCfg.Global, srvCfg.Route, "1")
|
||||||
@@ -133,7 +133,7 @@ func TestServerTestAlert(t *testing.T) {
|
|||||||
srvCfg := NewConfig()
|
srvCfg := NewConfig()
|
||||||
srvCfg.Route.GroupInterval = 1 * time.Second
|
srvCfg.Route.GroupInterval = 1 * time.Second
|
||||||
notificationManager := nfmanagertest.NewMock()
|
notificationManager := nfmanagertest.NewMock()
|
||||||
server, err := New(context.Background(), slog.New(slog.DiscardHandler), prometheus.NewRegistry(), srvCfg, "1", stateStore, notificationManager)
|
server, err := New(context.Background(), slog.New(slog.DiscardHandler), prometheus.NewRegistry(), srvCfg, "1", stateStore, notificationManager, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
amConfig, err := alertmanagertypes.NewDefaultConfig(srvCfg.Global, srvCfg.Route, "1")
|
amConfig, err := alertmanagertypes.NewDefaultConfig(srvCfg.Global, srvCfg.Route, "1")
|
||||||
@@ -238,7 +238,7 @@ func TestServerTestAlertContinuesOnFailure(t *testing.T) {
|
|||||||
srvCfg := NewConfig()
|
srvCfg := NewConfig()
|
||||||
srvCfg.Route.GroupInterval = 1 * time.Second
|
srvCfg.Route.GroupInterval = 1 * time.Second
|
||||||
notificationManager := nfmanagertest.NewMock()
|
notificationManager := nfmanagertest.NewMock()
|
||||||
server, err := New(context.Background(), slog.New(slog.DiscardHandler), prometheus.NewRegistry(), srvCfg, "1", stateStore, notificationManager)
|
server, err := New(context.Background(), slog.New(slog.DiscardHandler), prometheus.NewRegistry(), srvCfg, "1", stateStore, notificationManager, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
amConfig, err := alertmanagertypes.NewDefaultConfig(srvCfg.Global, srvCfg.Route, "1")
|
amConfig, err := alertmanagertypes.NewDefaultConfig(srvCfg.Global, srvCfg.Route, "1")
|
||||||
|
|||||||
@@ -0,0 +1,531 @@
|
|||||||
|
package clickhousealertmanagerstore
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/ClickHouse/clickhouse-go/v2"
|
||||||
|
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
|
||||||
|
"github.com/SigNoz/signoz/pkg/valuer"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
signozHistoryDBName = "signoz_analytics"
|
||||||
|
ruleStateHistoryTableName = "distributed_rule_state_history_v2"
|
||||||
|
|
||||||
|
maxPointsInTimeSeries = 300
|
||||||
|
)
|
||||||
|
|
||||||
|
type stateHistoryStore struct {
|
||||||
|
conn clickhouse.Conn
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewStateHistoryStore(conn clickhouse.Conn) alertmanagertypes.StateHistoryStore {
|
||||||
|
return &stateHistoryStore{conn: conn}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *stateHistoryStore) WriteRuleStateHistory(ctx context.Context, entries []alertmanagertypes.RuleStateHistory) error {
|
||||||
|
if len(entries) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
statement, err := s.conn.PrepareBatch(ctx, fmt.Sprintf(
|
||||||
|
"INSERT INTO %s.%s (org_id, rule_id, rule_name, overall_state, overall_state_changed, state, state_changed, unix_milli, labels, fingerprint, value) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)",
|
||||||
|
signozHistoryDBName, ruleStateHistoryTableName))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer statement.Abort()
|
||||||
|
|
||||||
|
for _, h := range entries {
|
||||||
|
if err := statement.Append(
|
||||||
|
h.OrgID,
|
||||||
|
h.RuleID, h.RuleName,
|
||||||
|
h.OverallState, h.OverallStateChanged,
|
||||||
|
h.State, h.StateChanged,
|
||||||
|
h.UnixMilli, h.Labels,
|
||||||
|
h.Fingerprint, h.Value,
|
||||||
|
); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return statement.Send()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *stateHistoryStore) GetLastSavedRuleStateHistory(ctx context.Context, ruleID string) ([]alertmanagertypes.RuleStateHistory, error) {
|
||||||
|
query := fmt.Sprintf(
|
||||||
|
"SELECT org_id, rule_id, rule_name, overall_state, overall_state_changed, state, state_changed, unix_milli, labels, fingerprint, value FROM %s.%s WHERE rule_id = '%s' AND state_changed = true ORDER BY unix_milli DESC LIMIT 1 BY fingerprint",
|
||||||
|
signozHistoryDBName, ruleStateHistoryTableName, ruleID)
|
||||||
|
|
||||||
|
rows, err := s.conn.Query(ctx, query)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var results []alertmanagertypes.RuleStateHistory
|
||||||
|
for rows.Next() {
|
||||||
|
var h alertmanagertypes.RuleStateHistory
|
||||||
|
if err := rows.Scan(
|
||||||
|
&h.OrgID,
|
||||||
|
&h.RuleID, &h.RuleName,
|
||||||
|
&h.OverallState, &h.OverallStateChanged,
|
||||||
|
&h.State, &h.StateChanged,
|
||||||
|
&h.UnixMilli, &h.Labels,
|
||||||
|
&h.Fingerprint, &h.Value,
|
||||||
|
); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
results = append(results, h)
|
||||||
|
}
|
||||||
|
|
||||||
|
return results, rows.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *stateHistoryStore) GetRuleStateHistoryTimeline(
|
||||||
|
ctx context.Context, orgID string, ruleID string, params *alertmanagertypes.QueryRuleStateHistory,
|
||||||
|
) (*alertmanagertypes.RuleStateTimeline, error) {
|
||||||
|
var conditions []string
|
||||||
|
|
||||||
|
conditions = append(conditions, fmt.Sprintf("org_id = '%s'", orgID))
|
||||||
|
conditions = append(conditions, fmt.Sprintf("rule_id = '%s'", ruleID))
|
||||||
|
conditions = append(conditions, fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", params.Start, params.End))
|
||||||
|
|
||||||
|
if params.State.StringValue() != "" {
|
||||||
|
conditions = append(conditions, fmt.Sprintf("state = '%s'", params.State.StringValue()))
|
||||||
|
}
|
||||||
|
|
||||||
|
whereClause := strings.Join(conditions, " AND ")
|
||||||
|
|
||||||
|
// Main query — paginated results.
|
||||||
|
query := fmt.Sprintf(
|
||||||
|
"SELECT org_id, rule_id, rule_name, overall_state, overall_state_changed, state, state_changed, unix_milli, labels, fingerprint, value FROM %s.%s WHERE %s ORDER BY unix_milli %s LIMIT %d OFFSET %d",
|
||||||
|
signozHistoryDBName, ruleStateHistoryTableName, whereClause, params.Order.StringValue(), params.Limit, params.Offset)
|
||||||
|
|
||||||
|
rows, err := s.conn.Query(ctx, query)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var items []alertmanagertypes.RuleStateHistory
|
||||||
|
for rows.Next() {
|
||||||
|
var h alertmanagertypes.RuleStateHistory
|
||||||
|
if err := rows.Scan(
|
||||||
|
&h.OrgID,
|
||||||
|
&h.RuleID, &h.RuleName,
|
||||||
|
&h.OverallState, &h.OverallStateChanged,
|
||||||
|
&h.State, &h.StateChanged,
|
||||||
|
&h.UnixMilli, &h.Labels,
|
||||||
|
&h.Fingerprint, &h.Value,
|
||||||
|
); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
items = append(items, h)
|
||||||
|
}
|
||||||
|
if err := rows.Err(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Count query.
|
||||||
|
var total uint64
|
||||||
|
countQuery := fmt.Sprintf("SELECT count(*) FROM %s.%s WHERE %s",
|
||||||
|
signozHistoryDBName, ruleStateHistoryTableName, whereClause)
|
||||||
|
if err := s.conn.QueryRow(ctx, countQuery).Scan(&total); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Labels query — distinct labels for the rule.
|
||||||
|
labelsQuery := fmt.Sprintf("SELECT DISTINCT labels FROM %s.%s WHERE org_id = '%s' AND rule_id = '%s'",
|
||||||
|
signozHistoryDBName, ruleStateHistoryTableName, orgID, ruleID)
|
||||||
|
labelRows, err := s.conn.Query(ctx, labelsQuery)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer labelRows.Close()
|
||||||
|
|
||||||
|
labelsMap := make(map[string][]string)
|
||||||
|
for labelRows.Next() {
|
||||||
|
var rawLabel string
|
||||||
|
if err := labelRows.Scan(&rawLabel); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
label := map[string]string{}
|
||||||
|
if err := json.Unmarshal([]byte(rawLabel), &label); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for k, v := range label {
|
||||||
|
labelsMap[k] = append(labelsMap[k], v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if items == nil {
|
||||||
|
items = []alertmanagertypes.RuleStateHistory{}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &alertmanagertypes.RuleStateTimeline{
|
||||||
|
Items: items,
|
||||||
|
Total: total,
|
||||||
|
Labels: labelsMap,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *stateHistoryStore) GetRuleStateHistoryTopContributors(
|
||||||
|
ctx context.Context, orgID string, ruleID string, params *alertmanagertypes.QueryRuleStateHistory,
|
||||||
|
) ([]alertmanagertypes.RuleStateHistoryContributor, error) {
|
||||||
|
query := fmt.Sprintf(`SELECT
|
||||||
|
fingerprint,
|
||||||
|
any(labels) as labels,
|
||||||
|
count(*) as count
|
||||||
|
FROM %s.%s
|
||||||
|
WHERE org_id = '%s' AND rule_id = '%s' AND (state_changed = true) AND (state = 'firing') AND unix_milli >= %d AND unix_milli <= %d
|
||||||
|
GROUP BY fingerprint
|
||||||
|
HAVING labels != '{}'
|
||||||
|
ORDER BY count DESC`,
|
||||||
|
signozHistoryDBName, ruleStateHistoryTableName, orgID, ruleID, params.Start, params.End)
|
||||||
|
|
||||||
|
rows, err := s.conn.Query(ctx, query)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var contributors []alertmanagertypes.RuleStateHistoryContributor
|
||||||
|
for rows.Next() {
|
||||||
|
var c alertmanagertypes.RuleStateHistoryContributor
|
||||||
|
if err := rows.Scan(&c.Fingerprint, &c.Labels, &c.Count); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
contributors = append(contributors, c)
|
||||||
|
}
|
||||||
|
|
||||||
|
if contributors == nil {
|
||||||
|
contributors = []alertmanagertypes.RuleStateHistoryContributor{}
|
||||||
|
}
|
||||||
|
|
||||||
|
return contributors, rows.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *stateHistoryStore) GetOverallStateTransitions(
|
||||||
|
ctx context.Context, orgID string, ruleID string, params *alertmanagertypes.QueryRuleStateHistory,
|
||||||
|
) ([]alertmanagertypes.RuleStateTransition, error) {
|
||||||
|
tmpl := `WITH firing_events AS (
|
||||||
|
SELECT
|
||||||
|
rule_id,
|
||||||
|
state,
|
||||||
|
unix_milli AS firing_time
|
||||||
|
FROM %s.%s
|
||||||
|
WHERE overall_state = 'firing'
|
||||||
|
AND overall_state_changed = true
|
||||||
|
AND org_id = '%s'
|
||||||
|
AND rule_id = '%s'
|
||||||
|
AND unix_milli >= %d AND unix_milli <= %d
|
||||||
|
),
|
||||||
|
resolution_events AS (
|
||||||
|
SELECT
|
||||||
|
rule_id,
|
||||||
|
state,
|
||||||
|
unix_milli AS resolution_time
|
||||||
|
FROM %s.%s
|
||||||
|
WHERE overall_state = 'inactive'
|
||||||
|
AND overall_state_changed = true
|
||||||
|
AND org_id = '%s'
|
||||||
|
AND rule_id = '%s'
|
||||||
|
AND unix_milli >= %d AND unix_milli <= %d
|
||||||
|
),
|
||||||
|
matched_events AS (
|
||||||
|
SELECT
|
||||||
|
f.rule_id,
|
||||||
|
f.state,
|
||||||
|
f.firing_time,
|
||||||
|
MIN(r.resolution_time) AS resolution_time
|
||||||
|
FROM firing_events f
|
||||||
|
LEFT JOIN resolution_events r
|
||||||
|
ON f.rule_id = r.rule_id
|
||||||
|
WHERE r.resolution_time > f.firing_time
|
||||||
|
GROUP BY f.rule_id, f.state, f.firing_time
|
||||||
|
)
|
||||||
|
SELECT *
|
||||||
|
FROM matched_events
|
||||||
|
ORDER BY firing_time ASC;`
|
||||||
|
|
||||||
|
query := fmt.Sprintf(tmpl,
|
||||||
|
signozHistoryDBName, ruleStateHistoryTableName, orgID, ruleID, params.Start, params.End,
|
||||||
|
signozHistoryDBName, ruleStateHistoryTableName, orgID, ruleID, params.Start, params.End)
|
||||||
|
|
||||||
|
type transition struct {
|
||||||
|
RuleID string `ch:"rule_id"`
|
||||||
|
State string `ch:"state"`
|
||||||
|
FiringTime int64 `ch:"firing_time"`
|
||||||
|
ResolutionTime int64 `ch:"resolution_time"`
|
||||||
|
}
|
||||||
|
|
||||||
|
rows, err := s.conn.Query(ctx, query)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var transitions []transition
|
||||||
|
for rows.Next() {
|
||||||
|
var t transition
|
||||||
|
if err := rows.Scan(&t.RuleID, &t.State, &t.FiringTime, &t.ResolutionTime); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
transitions = append(transitions, t)
|
||||||
|
}
|
||||||
|
if err := rows.Err(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var stateItems []alertmanagertypes.RuleStateTransition
|
||||||
|
|
||||||
|
for idx, item := range transitions {
|
||||||
|
stateItems = append(stateItems, alertmanagertypes.RuleStateTransition{
|
||||||
|
State: alertmanagertypes.AlertState{String: valuer.NewString(item.State)},
|
||||||
|
Start: item.FiringTime,
|
||||||
|
End: item.ResolutionTime,
|
||||||
|
})
|
||||||
|
if idx < len(transitions)-1 {
|
||||||
|
nextStart := transitions[idx+1].FiringTime
|
||||||
|
if nextStart > item.ResolutionTime {
|
||||||
|
stateItems = append(stateItems, alertmanagertypes.RuleStateTransition{
|
||||||
|
State: alertmanagertypes.AlertStateInactive,
|
||||||
|
Start: item.ResolutionTime,
|
||||||
|
End: nextStart,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch the most recent state to fill in edges.
|
||||||
|
var lastStateStr string
|
||||||
|
stateQuery := fmt.Sprintf(
|
||||||
|
"SELECT state FROM %s.%s WHERE org_id = '%s' AND rule_id = '%s' AND unix_milli <= %d ORDER BY unix_milli DESC LIMIT 1",
|
||||||
|
signozHistoryDBName, ruleStateHistoryTableName, orgID, ruleID, params.End)
|
||||||
|
if err := s.conn.QueryRow(ctx, stateQuery).Scan(&lastStateStr); err != nil {
|
||||||
|
lastStateStr = "inactive"
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(transitions) == 0 {
|
||||||
|
stateItems = append(stateItems, alertmanagertypes.RuleStateTransition{
|
||||||
|
State: alertmanagertypes.AlertState{String: valuer.NewString(lastStateStr)},
|
||||||
|
Start: params.Start,
|
||||||
|
End: params.End,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
if lastStateStr == "inactive" {
|
||||||
|
stateItems = append(stateItems, alertmanagertypes.RuleStateTransition{
|
||||||
|
State: alertmanagertypes.AlertStateInactive,
|
||||||
|
Start: transitions[len(transitions)-1].ResolutionTime,
|
||||||
|
End: params.End,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
// Find the most recent firing event.
|
||||||
|
var firingTime int64
|
||||||
|
firingQuery := fmt.Sprintf(
|
||||||
|
"SELECT unix_milli FROM %s.%s WHERE org_id = '%s' AND rule_id = '%s' AND overall_state_changed = true AND overall_state = 'firing' AND unix_milli <= %d ORDER BY unix_milli DESC LIMIT 1",
|
||||||
|
signozHistoryDBName, ruleStateHistoryTableName, orgID, ruleID, params.End)
|
||||||
|
if err := s.conn.QueryRow(ctx, firingQuery).Scan(&firingTime); err != nil {
|
||||||
|
firingTime = transitions[len(transitions)-1].ResolutionTime
|
||||||
|
}
|
||||||
|
stateItems = append(stateItems, alertmanagertypes.RuleStateTransition{
|
||||||
|
State: alertmanagertypes.AlertStateInactive,
|
||||||
|
Start: transitions[len(transitions)-1].ResolutionTime,
|
||||||
|
End: firingTime,
|
||||||
|
})
|
||||||
|
stateItems = append(stateItems, alertmanagertypes.RuleStateTransition{
|
||||||
|
State: alertmanagertypes.AlertStateFiring,
|
||||||
|
Start: firingTime,
|
||||||
|
End: params.End,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return stateItems, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *stateHistoryStore) GetTotalTriggers(
|
||||||
|
ctx context.Context, orgID string, ruleID string, params *alertmanagertypes.QueryRuleStateHistory,
|
||||||
|
) (uint64, error) {
|
||||||
|
query := fmt.Sprintf(
|
||||||
|
"SELECT count(*) FROM %s.%s WHERE org_id = '%s' AND rule_id = '%s' AND (state_changed = true) AND (state = 'firing') AND unix_milli >= %d AND unix_milli <= %d",
|
||||||
|
signozHistoryDBName, ruleStateHistoryTableName, orgID, ruleID, params.Start, params.End)
|
||||||
|
|
||||||
|
var total uint64
|
||||||
|
if err := s.conn.QueryRow(ctx, query).Scan(&total); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return total, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *stateHistoryStore) GetTriggersByInterval(
|
||||||
|
ctx context.Context, orgID string, ruleID string, params *alertmanagertypes.QueryRuleStateHistory,
|
||||||
|
) (*alertmanagertypes.Series, error) {
|
||||||
|
step := minAllowedStepInterval(params.Start, params.End)
|
||||||
|
|
||||||
|
query := fmt.Sprintf(
|
||||||
|
"SELECT count(*), toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts FROM %s.%s WHERE org_id = '%s' AND rule_id = '%s' AND (state_changed = true) AND (state = 'firing') AND unix_milli >= %d AND unix_milli <= %d GROUP BY ts ORDER BY ts ASC",
|
||||||
|
step, signozHistoryDBName, ruleStateHistoryTableName, orgID, ruleID, params.Start, params.End)
|
||||||
|
|
||||||
|
return s.queryTimeSeries(ctx, query)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *stateHistoryStore) GetAvgResolutionTime(
|
||||||
|
ctx context.Context, orgID string, ruleID string, params *alertmanagertypes.QueryRuleStateHistory,
|
||||||
|
) (float64, error) {
|
||||||
|
tmpl := `
|
||||||
|
WITH firing_events AS (
|
||||||
|
SELECT
|
||||||
|
rule_id,
|
||||||
|
state,
|
||||||
|
unix_milli AS firing_time
|
||||||
|
FROM %s.%s
|
||||||
|
WHERE overall_state = 'firing'
|
||||||
|
AND overall_state_changed = true
|
||||||
|
AND org_id = '%s'
|
||||||
|
AND rule_id = '%s'
|
||||||
|
AND unix_milli >= %d AND unix_milli <= %d
|
||||||
|
),
|
||||||
|
resolution_events AS (
|
||||||
|
SELECT
|
||||||
|
rule_id,
|
||||||
|
state,
|
||||||
|
unix_milli AS resolution_time
|
||||||
|
FROM %s.%s
|
||||||
|
WHERE overall_state = 'inactive'
|
||||||
|
AND overall_state_changed = true
|
||||||
|
AND org_id = '%s'
|
||||||
|
AND rule_id = '%s'
|
||||||
|
AND unix_milli >= %d AND unix_milli <= %d
|
||||||
|
),
|
||||||
|
matched_events AS (
|
||||||
|
SELECT
|
||||||
|
f.rule_id,
|
||||||
|
f.state,
|
||||||
|
f.firing_time,
|
||||||
|
MIN(r.resolution_time) AS resolution_time
|
||||||
|
FROM firing_events f
|
||||||
|
LEFT JOIN resolution_events r
|
||||||
|
ON f.rule_id = r.rule_id
|
||||||
|
WHERE r.resolution_time > f.firing_time
|
||||||
|
GROUP BY f.rule_id, f.state, f.firing_time
|
||||||
|
)
|
||||||
|
SELECT AVG(resolution_time - firing_time) / 1000 AS avg_resolution_time
|
||||||
|
FROM matched_events;`
|
||||||
|
|
||||||
|
query := fmt.Sprintf(tmpl,
|
||||||
|
signozHistoryDBName, ruleStateHistoryTableName, orgID, ruleID, params.Start, params.End,
|
||||||
|
signozHistoryDBName, ruleStateHistoryTableName, orgID, ruleID, params.Start, params.End)
|
||||||
|
|
||||||
|
var avgResolutionTime float64
|
||||||
|
if err := s.conn.QueryRow(ctx, query).Scan(&avgResolutionTime); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return avgResolutionTime, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *stateHistoryStore) GetAvgResolutionTimeByInterval(
|
||||||
|
ctx context.Context, orgID string, ruleID string, params *alertmanagertypes.QueryRuleStateHistory,
|
||||||
|
) (*alertmanagertypes.Series, error) {
|
||||||
|
step := minAllowedStepInterval(params.Start, params.End)
|
||||||
|
|
||||||
|
tmpl := `
|
||||||
|
WITH firing_events AS (
|
||||||
|
SELECT
|
||||||
|
rule_id,
|
||||||
|
state,
|
||||||
|
unix_milli AS firing_time
|
||||||
|
FROM %s.%s
|
||||||
|
WHERE overall_state = 'firing'
|
||||||
|
AND overall_state_changed = true
|
||||||
|
AND org_id = '%s'
|
||||||
|
AND rule_id = '%s'
|
||||||
|
AND unix_milli >= %d AND unix_milli <= %d
|
||||||
|
),
|
||||||
|
resolution_events AS (
|
||||||
|
SELECT
|
||||||
|
rule_id,
|
||||||
|
state,
|
||||||
|
unix_milli AS resolution_time
|
||||||
|
FROM %s.%s
|
||||||
|
WHERE overall_state = 'inactive'
|
||||||
|
AND overall_state_changed = true
|
||||||
|
AND org_id = '%s'
|
||||||
|
AND rule_id = '%s'
|
||||||
|
AND unix_milli >= %d AND unix_milli <= %d
|
||||||
|
),
|
||||||
|
matched_events AS (
|
||||||
|
SELECT
|
||||||
|
f.rule_id,
|
||||||
|
f.state,
|
||||||
|
f.firing_time,
|
||||||
|
MIN(r.resolution_time) AS resolution_time
|
||||||
|
FROM firing_events f
|
||||||
|
LEFT JOIN resolution_events r
|
||||||
|
ON f.rule_id = r.rule_id
|
||||||
|
WHERE r.resolution_time > f.firing_time
|
||||||
|
GROUP BY f.rule_id, f.state, f.firing_time
|
||||||
|
)
|
||||||
|
SELECT toStartOfInterval(toDateTime(firing_time / 1000), INTERVAL %d SECOND) AS ts, AVG(resolution_time - firing_time) / 1000 AS avg_resolution_time
|
||||||
|
FROM matched_events
|
||||||
|
GROUP BY ts
|
||||||
|
ORDER BY ts ASC;`
|
||||||
|
|
||||||
|
query := fmt.Sprintf(tmpl,
|
||||||
|
signozHistoryDBName, ruleStateHistoryTableName, orgID, ruleID, params.Start, params.End,
|
||||||
|
signozHistoryDBName, ruleStateHistoryTableName, orgID, ruleID, params.Start, params.End, step)
|
||||||
|
|
||||||
|
return s.queryTimeSeries(ctx, query)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *stateHistoryStore) queryTimeSeries(ctx context.Context, query string) (*alertmanagertypes.Series, error) {
|
||||||
|
rows, err := s.conn.Query(ctx, query)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
series := &alertmanagertypes.Series{
|
||||||
|
Labels: map[string]string{},
|
||||||
|
}
|
||||||
|
for rows.Next() {
|
||||||
|
var value float64
|
||||||
|
var ts interface{}
|
||||||
|
if err := rows.Scan(&value, &ts); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// The timestamp may come back as time.Time from ClickHouse.
|
||||||
|
var timestamp int64
|
||||||
|
switch v := ts.(type) {
|
||||||
|
case int64:
|
||||||
|
timestamp = v
|
||||||
|
default:
|
||||||
|
// Try time.Time
|
||||||
|
if t, ok := ts.(interface{ UnixMilli() int64 }); ok {
|
||||||
|
timestamp = t.UnixMilli()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
series.Points = append(series.Points, alertmanagertypes.Point{
|
||||||
|
Timestamp: timestamp,
|
||||||
|
Value: value,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(series.Points) == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return series, rows.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
func minAllowedStepInterval(start, end int64) int64 {
|
||||||
|
step := (end - start) / maxPointsInTimeSeries / 1000
|
||||||
|
if step < 60 {
|
||||||
|
return 60
|
||||||
|
}
|
||||||
|
return step - step%60
|
||||||
|
}
|
||||||
@@ -0,0 +1,165 @@
|
|||||||
|
package sqlalertmanagerstore
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||||
|
"github.com/SigNoz/signoz/pkg/types"
|
||||||
|
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
|
||||||
|
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||||
|
"github.com/SigNoz/signoz/pkg/valuer"
|
||||||
|
)
|
||||||
|
|
||||||
|
type maintenance struct {
|
||||||
|
sqlstore sqlstore.SQLStore
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMaintenanceStore(store sqlstore.SQLStore) alertmanagertypes.MaintenanceStore {
|
||||||
|
return &maintenance{sqlstore: store}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *maintenance) GetAllPlannedMaintenance(ctx context.Context, orgID string) ([]*alertmanagertypes.GettablePlannedMaintenance, error) {
|
||||||
|
storables := make([]*alertmanagertypes.StorablePlannedMaintenance, 0)
|
||||||
|
err := r.sqlstore.
|
||||||
|
BunDB().
|
||||||
|
NewSelect().
|
||||||
|
Model(&storables).
|
||||||
|
Where("org_id = ?", orgID).
|
||||||
|
Scan(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
result := make([]*alertmanagertypes.GettablePlannedMaintenance, 0, len(storables))
|
||||||
|
for _, s := range storables {
|
||||||
|
result = append(result, alertmanagertypes.ConvertStorableToGettable(s))
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *maintenance) GetPlannedMaintenanceByID(ctx context.Context, id valuer.UUID) (*alertmanagertypes.GettablePlannedMaintenance, error) {
|
||||||
|
storable := new(alertmanagertypes.StorablePlannedMaintenance)
|
||||||
|
err := r.sqlstore.
|
||||||
|
BunDB().
|
||||||
|
NewSelect().
|
||||||
|
Model(storable).
|
||||||
|
Where("id = ?", id.StringValue()).
|
||||||
|
Scan(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return alertmanagertypes.ConvertStorableToGettable(storable), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *maintenance) CreatePlannedMaintenance(ctx context.Context, maintenance alertmanagertypes.GettablePlannedMaintenance) (valuer.UUID, error) {
|
||||||
|
claims, err := authtypes.ClaimsFromContext(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return valuer.UUID{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var ruleIDsStr string
|
||||||
|
if len(maintenance.RuleIDs) > 0 {
|
||||||
|
data, err := json.Marshal(maintenance.RuleIDs)
|
||||||
|
if err != nil {
|
||||||
|
return valuer.UUID{}, err
|
||||||
|
}
|
||||||
|
ruleIDsStr = string(data)
|
||||||
|
}
|
||||||
|
|
||||||
|
storable := alertmanagertypes.StorablePlannedMaintenance{
|
||||||
|
Identifiable: types.Identifiable{
|
||||||
|
ID: valuer.GenerateUUID(),
|
||||||
|
},
|
||||||
|
TimeAuditable: types.TimeAuditable{
|
||||||
|
CreatedAt: time.Now(),
|
||||||
|
UpdatedAt: time.Now(),
|
||||||
|
},
|
||||||
|
UserAuditable: types.UserAuditable{
|
||||||
|
CreatedBy: claims.Email,
|
||||||
|
UpdatedBy: claims.Email,
|
||||||
|
},
|
||||||
|
Name: maintenance.Name,
|
||||||
|
Description: maintenance.Description,
|
||||||
|
Schedule: maintenance.Schedule,
|
||||||
|
RuleIDs: ruleIDsStr,
|
||||||
|
Expression: maintenance.Expression,
|
||||||
|
OrgID: claims.OrgID,
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = r.sqlstore.
|
||||||
|
BunDB().
|
||||||
|
NewInsert().
|
||||||
|
Model(&storable).
|
||||||
|
Exec(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return valuer.UUID{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return storable.ID, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *maintenance) DeletePlannedMaintenance(ctx context.Context, id valuer.UUID) error {
|
||||||
|
_, err := r.sqlstore.
|
||||||
|
BunDB().
|
||||||
|
NewDelete().
|
||||||
|
Model(new(alertmanagertypes.StorablePlannedMaintenance)).
|
||||||
|
Where("id = ?", id.StringValue()).
|
||||||
|
Exec(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *maintenance) EditPlannedMaintenance(ctx context.Context, maintenance alertmanagertypes.GettablePlannedMaintenance, id valuer.UUID) error {
|
||||||
|
claims, err := authtypes.ClaimsFromContext(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var ruleIDsStr string
|
||||||
|
if len(maintenance.RuleIDs) > 0 {
|
||||||
|
data, err := json.Marshal(maintenance.RuleIDs)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
ruleIDsStr = string(data)
|
||||||
|
}
|
||||||
|
|
||||||
|
storable := alertmanagertypes.StorablePlannedMaintenance{
|
||||||
|
Identifiable: types.Identifiable{
|
||||||
|
ID: id,
|
||||||
|
},
|
||||||
|
TimeAuditable: types.TimeAuditable{
|
||||||
|
CreatedAt: maintenance.CreatedAt,
|
||||||
|
UpdatedAt: time.Now(),
|
||||||
|
},
|
||||||
|
UserAuditable: types.UserAuditable{
|
||||||
|
CreatedBy: maintenance.CreatedBy,
|
||||||
|
UpdatedBy: claims.Email,
|
||||||
|
},
|
||||||
|
Name: maintenance.Name,
|
||||||
|
Description: maintenance.Description,
|
||||||
|
Schedule: maintenance.Schedule,
|
||||||
|
RuleIDs: ruleIDsStr,
|
||||||
|
Expression: maintenance.Expression,
|
||||||
|
OrgID: claims.OrgID,
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = r.sqlstore.
|
||||||
|
BunDB().
|
||||||
|
NewUpdate().
|
||||||
|
Model(&storable).
|
||||||
|
Where("id = ?", storable.ID.StringValue()).
|
||||||
|
Exec(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
@@ -1845,3 +1845,216 @@ func (_c *MockAlertmanager_UpdateRoutePolicyByID_Call) RunAndReturn(run func(ctx
|
|||||||
_c.Call.Return(run)
|
_c.Call.Return(run)
|
||||||
return _c
|
return _c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetAllPlannedMaintenance provides a mock function for the type MockAlertmanager
|
||||||
|
func (_mock *MockAlertmanager) GetAllPlannedMaintenance(ctx context.Context, orgID string) ([]*alertmanagertypes.GettablePlannedMaintenance, error) {
|
||||||
|
ret := _mock.Called(ctx, orgID)
|
||||||
|
|
||||||
|
if len(ret) == 0 {
|
||||||
|
panic("no return value specified for GetAllPlannedMaintenance")
|
||||||
|
}
|
||||||
|
|
||||||
|
var r0 []*alertmanagertypes.GettablePlannedMaintenance
|
||||||
|
var r1 error
|
||||||
|
if returnFunc, ok := ret.Get(0).(func(context.Context, string) ([]*alertmanagertypes.GettablePlannedMaintenance, error)); ok {
|
||||||
|
return returnFunc(ctx, orgID)
|
||||||
|
}
|
||||||
|
if ret.Get(0) != nil {
|
||||||
|
r0 = ret.Get(0).([]*alertmanagertypes.GettablePlannedMaintenance)
|
||||||
|
}
|
||||||
|
r1 = ret.Error(1)
|
||||||
|
return r0, r1
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetPlannedMaintenanceByID provides a mock function for the type MockAlertmanager
|
||||||
|
func (_mock *MockAlertmanager) GetPlannedMaintenanceByID(ctx context.Context, id valuer.UUID) (*alertmanagertypes.GettablePlannedMaintenance, error) {
|
||||||
|
ret := _mock.Called(ctx, id)
|
||||||
|
|
||||||
|
if len(ret) == 0 {
|
||||||
|
panic("no return value specified for GetPlannedMaintenanceByID")
|
||||||
|
}
|
||||||
|
|
||||||
|
var r0 *alertmanagertypes.GettablePlannedMaintenance
|
||||||
|
var r1 error
|
||||||
|
if returnFunc, ok := ret.Get(0).(func(context.Context, valuer.UUID) (*alertmanagertypes.GettablePlannedMaintenance, error)); ok {
|
||||||
|
return returnFunc(ctx, id)
|
||||||
|
}
|
||||||
|
if ret.Get(0) != nil {
|
||||||
|
r0 = ret.Get(0).(*alertmanagertypes.GettablePlannedMaintenance)
|
||||||
|
}
|
||||||
|
r1 = ret.Error(1)
|
||||||
|
return r0, r1
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreatePlannedMaintenance provides a mock function for the type MockAlertmanager
|
||||||
|
func (_mock *MockAlertmanager) CreatePlannedMaintenance(ctx context.Context, maintenance alertmanagertypes.GettablePlannedMaintenance) (valuer.UUID, error) {
|
||||||
|
ret := _mock.Called(ctx, maintenance)
|
||||||
|
|
||||||
|
if len(ret) == 0 {
|
||||||
|
panic("no return value specified for CreatePlannedMaintenance")
|
||||||
|
}
|
||||||
|
|
||||||
|
var r0 valuer.UUID
|
||||||
|
var r1 error
|
||||||
|
if returnFunc, ok := ret.Get(0).(func(context.Context, alertmanagertypes.GettablePlannedMaintenance) (valuer.UUID, error)); ok {
|
||||||
|
return returnFunc(ctx, maintenance)
|
||||||
|
}
|
||||||
|
if returnFunc, ok := ret.Get(0).(func(context.Context, alertmanagertypes.GettablePlannedMaintenance) valuer.UUID); ok {
|
||||||
|
r0 = returnFunc(ctx, maintenance)
|
||||||
|
} else {
|
||||||
|
r0 = ret.Get(0).(valuer.UUID)
|
||||||
|
}
|
||||||
|
r1 = ret.Error(1)
|
||||||
|
return r0, r1
|
||||||
|
}
|
||||||
|
|
||||||
|
// EditPlannedMaintenance provides a mock function for the type MockAlertmanager
|
||||||
|
func (_mock *MockAlertmanager) EditPlannedMaintenance(ctx context.Context, maintenance alertmanagertypes.GettablePlannedMaintenance, id valuer.UUID) error {
|
||||||
|
ret := _mock.Called(ctx, maintenance, id)
|
||||||
|
|
||||||
|
if len(ret) == 0 {
|
||||||
|
panic("no return value specified for EditPlannedMaintenance")
|
||||||
|
}
|
||||||
|
|
||||||
|
var r0 error
|
||||||
|
if returnFunc, ok := ret.Get(0).(func(context.Context, alertmanagertypes.GettablePlannedMaintenance, valuer.UUID) error); ok {
|
||||||
|
r0 = returnFunc(ctx, maintenance, id)
|
||||||
|
} else {
|
||||||
|
r0 = ret.Error(0)
|
||||||
|
}
|
||||||
|
return r0
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeletePlannedMaintenance provides a mock function for the type MockAlertmanager
|
||||||
|
func (_mock *MockAlertmanager) DeletePlannedMaintenance(ctx context.Context, id valuer.UUID) error {
|
||||||
|
ret := _mock.Called(ctx, id)
|
||||||
|
|
||||||
|
if len(ret) == 0 {
|
||||||
|
panic("no return value specified for DeletePlannedMaintenance")
|
||||||
|
}
|
||||||
|
|
||||||
|
var r0 error
|
||||||
|
if returnFunc, ok := ret.Get(0).(func(context.Context, valuer.UUID) error); ok {
|
||||||
|
r0 = returnFunc(ctx, id)
|
||||||
|
} else {
|
||||||
|
r0 = ret.Error(0)
|
||||||
|
}
|
||||||
|
return r0
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordRuleStateHistory provides a mock function for the type MockAlertmanager
|
||||||
|
func (_mock *MockAlertmanager) RecordRuleStateHistory(ctx context.Context, orgID string, entries []alertmanagertypes.RuleStateHistory) error {
|
||||||
|
ret := _mock.Called(ctx, orgID, entries)
|
||||||
|
|
||||||
|
if len(ret) == 0 {
|
||||||
|
panic("no return value specified for RecordRuleStateHistory")
|
||||||
|
}
|
||||||
|
|
||||||
|
var r0 error
|
||||||
|
if returnFunc, ok := ret.Get(0).(func(context.Context, string, []alertmanagertypes.RuleStateHistory) error); ok {
|
||||||
|
r0 = returnFunc(ctx, orgID, entries)
|
||||||
|
} else {
|
||||||
|
r0 = ret.Error(0)
|
||||||
|
}
|
||||||
|
return r0
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetLastSavedRuleStateHistory provides a mock function for the type MockAlertmanager
|
||||||
|
func (_mock *MockAlertmanager) GetLastSavedRuleStateHistory(ctx context.Context, ruleID string) ([]alertmanagertypes.RuleStateHistory, error) {
|
||||||
|
ret := _mock.Called(ctx, ruleID)
|
||||||
|
|
||||||
|
if len(ret) == 0 {
|
||||||
|
panic("no return value specified for GetLastSavedRuleStateHistory")
|
||||||
|
}
|
||||||
|
|
||||||
|
var r0 []alertmanagertypes.RuleStateHistory
|
||||||
|
var r1 error
|
||||||
|
if returnFunc, ok := ret.Get(0).(func(context.Context, string) ([]alertmanagertypes.RuleStateHistory, error)); ok {
|
||||||
|
return returnFunc(ctx, ruleID)
|
||||||
|
}
|
||||||
|
if ret.Get(0) != nil {
|
||||||
|
r0 = ret.Get(0).([]alertmanagertypes.RuleStateHistory)
|
||||||
|
}
|
||||||
|
r1 = ret.Error(1)
|
||||||
|
return r0, r1
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetRuleStateHistoryTimeline provides a mock function for the type MockAlertmanager
|
||||||
|
func (_mock *MockAlertmanager) GetRuleStateHistoryTimeline(ctx context.Context, orgID string, ruleID string, params *alertmanagertypes.QueryRuleStateHistory) (*alertmanagertypes.RuleStateTimeline, error) {
|
||||||
|
ret := _mock.Called(ctx, orgID, ruleID, params)
|
||||||
|
|
||||||
|
if len(ret) == 0 {
|
||||||
|
panic("no return value specified for GetRuleStateHistoryTimeline")
|
||||||
|
}
|
||||||
|
|
||||||
|
var r0 *alertmanagertypes.RuleStateTimeline
|
||||||
|
var r1 error
|
||||||
|
if returnFunc, ok := ret.Get(0).(func(context.Context, string, string, *alertmanagertypes.QueryRuleStateHistory) (*alertmanagertypes.RuleStateTimeline, error)); ok {
|
||||||
|
return returnFunc(ctx, orgID, ruleID, params)
|
||||||
|
}
|
||||||
|
if ret.Get(0) != nil {
|
||||||
|
r0 = ret.Get(0).(*alertmanagertypes.RuleStateTimeline)
|
||||||
|
}
|
||||||
|
r1 = ret.Error(1)
|
||||||
|
return r0, r1
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetRuleStateHistoryTopContributors provides a mock function for the type MockAlertmanager
|
||||||
|
func (_mock *MockAlertmanager) GetRuleStateHistoryTopContributors(ctx context.Context, orgID string, ruleID string, params *alertmanagertypes.QueryRuleStateHistory) ([]alertmanagertypes.RuleStateHistoryContributor, error) {
|
||||||
|
ret := _mock.Called(ctx, orgID, ruleID, params)
|
||||||
|
|
||||||
|
if len(ret) == 0 {
|
||||||
|
panic("no return value specified for GetRuleStateHistoryTopContributors")
|
||||||
|
}
|
||||||
|
|
||||||
|
var r0 []alertmanagertypes.RuleStateHistoryContributor
|
||||||
|
var r1 error
|
||||||
|
if returnFunc, ok := ret.Get(0).(func(context.Context, string, string, *alertmanagertypes.QueryRuleStateHistory) ([]alertmanagertypes.RuleStateHistoryContributor, error)); ok {
|
||||||
|
return returnFunc(ctx, orgID, ruleID, params)
|
||||||
|
}
|
||||||
|
if ret.Get(0) != nil {
|
||||||
|
r0 = ret.Get(0).([]alertmanagertypes.RuleStateHistoryContributor)
|
||||||
|
}
|
||||||
|
r1 = ret.Error(1)
|
||||||
|
return r0, r1
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetOverallStateTransitions provides a mock function for the type MockAlertmanager
|
||||||
|
func (_mock *MockAlertmanager) GetOverallStateTransitions(ctx context.Context, orgID string, ruleID string, params *alertmanagertypes.QueryRuleStateHistory) ([]alertmanagertypes.RuleStateTransition, error) {
|
||||||
|
ret := _mock.Called(ctx, orgID, ruleID, params)
|
||||||
|
|
||||||
|
if len(ret) == 0 {
|
||||||
|
panic("no return value specified for GetOverallStateTransitions")
|
||||||
|
}
|
||||||
|
|
||||||
|
var r0 []alertmanagertypes.RuleStateTransition
|
||||||
|
var r1 error
|
||||||
|
if returnFunc, ok := ret.Get(0).(func(context.Context, string, string, *alertmanagertypes.QueryRuleStateHistory) ([]alertmanagertypes.RuleStateTransition, error)); ok {
|
||||||
|
return returnFunc(ctx, orgID, ruleID, params)
|
||||||
|
}
|
||||||
|
if ret.Get(0) != nil {
|
||||||
|
r0 = ret.Get(0).([]alertmanagertypes.RuleStateTransition)
|
||||||
|
}
|
||||||
|
r1 = ret.Error(1)
|
||||||
|
return r0, r1
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetRuleStats provides a mock function for the type MockAlertmanager
|
||||||
|
func (_mock *MockAlertmanager) GetRuleStats(ctx context.Context, orgID string, ruleID string, params *alertmanagertypes.QueryRuleStateHistory) (*alertmanagertypes.RuleStats, error) {
|
||||||
|
ret := _mock.Called(ctx, orgID, ruleID, params)
|
||||||
|
|
||||||
|
if len(ret) == 0 {
|
||||||
|
panic("no return value specified for GetRuleStats")
|
||||||
|
}
|
||||||
|
|
||||||
|
var r0 *alertmanagertypes.RuleStats
|
||||||
|
var r1 error
|
||||||
|
if returnFunc, ok := ret.Get(0).(func(context.Context, string, string, *alertmanagertypes.QueryRuleStateHistory) (*alertmanagertypes.RuleStats, error)); ok {
|
||||||
|
return returnFunc(ctx, orgID, ruleID, params)
|
||||||
|
}
|
||||||
|
if ret.Get(0) != nil {
|
||||||
|
r0 = ret.Get(0).(*alertmanagertypes.RuleStats)
|
||||||
|
}
|
||||||
|
r1 = ret.Error(1)
|
||||||
|
return r0, r1
|
||||||
|
}
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/SigNoz/signoz/pkg/errors"
|
"github.com/SigNoz/signoz/pkg/errors"
|
||||||
@@ -399,3 +400,312 @@ func (api *API) UpdateRoutePolicy(rw http.ResponseWriter, req *http.Request) {
|
|||||||
}
|
}
|
||||||
render.Success(rw, http.StatusOK, result)
|
render.Success(rw, http.StatusOK, result)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (api *API) ListDowntimeSchedules(rw http.ResponseWriter, req *http.Request) {
|
||||||
|
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
claims, err := authtypes.ClaimsFromContext(ctx)
|
||||||
|
if err != nil {
|
||||||
|
render.Error(rw, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
schedules, err := api.alertmanager.GetAllPlannedMaintenance(ctx, claims.OrgID)
|
||||||
|
if err != nil {
|
||||||
|
render.Error(rw, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if q := req.URL.Query().Get("active"); q != "" {
|
||||||
|
active, _ := strconv.ParseBool(q)
|
||||||
|
filtered := make([]*alertmanagertypes.GettablePlannedMaintenance, 0)
|
||||||
|
for _, schedule := range schedules {
|
||||||
|
now := time.Now().In(time.FixedZone(schedule.Schedule.Timezone, 0))
|
||||||
|
if schedule.IsActive(now) == active {
|
||||||
|
filtered = append(filtered, schedule)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
schedules = filtered
|
||||||
|
}
|
||||||
|
|
||||||
|
if q := req.URL.Query().Get("recurring"); q != "" {
|
||||||
|
recurring, _ := strconv.ParseBool(q)
|
||||||
|
filtered := make([]*alertmanagertypes.GettablePlannedMaintenance, 0)
|
||||||
|
for _, schedule := range schedules {
|
||||||
|
if schedule.IsRecurring() == recurring {
|
||||||
|
filtered = append(filtered, schedule)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
schedules = filtered
|
||||||
|
}
|
||||||
|
|
||||||
|
render.Success(rw, http.StatusOK, schedules)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (api *API) GetDowntimeSchedule(rw http.ResponseWriter, req *http.Request) {
|
||||||
|
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
vars := mux.Vars(req)
|
||||||
|
idString, ok := vars["id"]
|
||||||
|
if !ok {
|
||||||
|
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is required in path"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
id, err := valuer.NewUUID(idString)
|
||||||
|
if err != nil {
|
||||||
|
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is not a valid uuid-v7"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
schedule, err := api.alertmanager.GetPlannedMaintenanceByID(ctx, id)
|
||||||
|
if err != nil {
|
||||||
|
render.Error(rw, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
render.Success(rw, http.StatusOK, schedule)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (api *API) CreateDowntimeSchedule(rw http.ResponseWriter, req *http.Request) {
|
||||||
|
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
body, err := io.ReadAll(req.Body)
|
||||||
|
if err != nil {
|
||||||
|
render.Error(rw, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer req.Body.Close() //nolint:errcheck
|
||||||
|
|
||||||
|
var schedule alertmanagertypes.GettablePlannedMaintenance
|
||||||
|
if err := json.Unmarshal(body, &schedule); err != nil {
|
||||||
|
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid request body: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := schedule.Validate(); err != nil {
|
||||||
|
render.Error(rw, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = api.alertmanager.CreatePlannedMaintenance(ctx, schedule)
|
||||||
|
if err != nil {
|
||||||
|
render.Error(rw, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
render.Success(rw, http.StatusOK, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (api *API) EditDowntimeSchedule(rw http.ResponseWriter, req *http.Request) {
|
||||||
|
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
vars := mux.Vars(req)
|
||||||
|
idString, ok := vars["id"]
|
||||||
|
if !ok {
|
||||||
|
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is required in path"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
id, err := valuer.NewUUID(idString)
|
||||||
|
if err != nil {
|
||||||
|
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is not a valid uuid-v7"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
body, err := io.ReadAll(req.Body)
|
||||||
|
if err != nil {
|
||||||
|
render.Error(rw, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer req.Body.Close() //nolint:errcheck
|
||||||
|
|
||||||
|
var schedule alertmanagertypes.GettablePlannedMaintenance
|
||||||
|
if err := json.Unmarshal(body, &schedule); err != nil {
|
||||||
|
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid request body: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := schedule.Validate(); err != nil {
|
||||||
|
render.Error(rw, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err = api.alertmanager.EditPlannedMaintenance(ctx, schedule, id)
|
||||||
|
if err != nil {
|
||||||
|
render.Error(rw, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
render.Success(rw, http.StatusOK, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (api *API) DeleteDowntimeSchedule(rw http.ResponseWriter, req *http.Request) {
|
||||||
|
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
vars := mux.Vars(req)
|
||||||
|
idString, ok := vars["id"]
|
||||||
|
if !ok {
|
||||||
|
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is required in path"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
id, err := valuer.NewUUID(idString)
|
||||||
|
if err != nil {
|
||||||
|
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is not a valid uuid-v7"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err = api.alertmanager.DeletePlannedMaintenance(ctx, id)
|
||||||
|
if err != nil {
|
||||||
|
render.Error(rw, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
render.Success(rw, http.StatusNoContent, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (api *API) GetRuleStateHistoryTimeline(rw http.ResponseWriter, req *http.Request) {
|
||||||
|
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
claims, err := authtypes.ClaimsFromContext(ctx)
|
||||||
|
if err != nil {
|
||||||
|
render.Error(rw, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ruleID := mux.Vars(req)["id"]
|
||||||
|
if ruleID == "" {
|
||||||
|
render.Error(rw, errors.NewInvalidInputf(errors.CodeInvalidInput, "rule ID is required"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var params alertmanagertypes.QueryRuleStateHistory
|
||||||
|
if err := json.NewDecoder(req.Body).Decode(¶ms); err != nil {
|
||||||
|
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid request body: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := params.Validate(); err != nil {
|
||||||
|
render.Error(rw, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
result, err := api.alertmanager.GetRuleStateHistoryTimeline(ctx, claims.OrgID, ruleID, ¶ms)
|
||||||
|
if err != nil {
|
||||||
|
render.Error(rw, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
render.Success(rw, http.StatusOK, result)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (api *API) GetRuleStats(rw http.ResponseWriter, req *http.Request) {
|
||||||
|
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
claims, err := authtypes.ClaimsFromContext(ctx)
|
||||||
|
if err != nil {
|
||||||
|
render.Error(rw, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ruleID := mux.Vars(req)["id"]
|
||||||
|
if ruleID == "" {
|
||||||
|
render.Error(rw, errors.NewInvalidInputf(errors.CodeInvalidInput, "rule ID is required"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var params alertmanagertypes.QueryRuleStateHistory
|
||||||
|
if err := json.NewDecoder(req.Body).Decode(¶ms); err != nil {
|
||||||
|
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid request body: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := params.Validate(); err != nil {
|
||||||
|
render.Error(rw, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
result, err := api.alertmanager.GetRuleStats(ctx, claims.OrgID, ruleID, ¶ms)
|
||||||
|
if err != nil {
|
||||||
|
render.Error(rw, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
render.Success(rw, http.StatusOK, result)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (api *API) GetRuleStateHistoryTopContributors(rw http.ResponseWriter, req *http.Request) {
|
||||||
|
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
claims, err := authtypes.ClaimsFromContext(ctx)
|
||||||
|
if err != nil {
|
||||||
|
render.Error(rw, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ruleID := mux.Vars(req)["id"]
|
||||||
|
if ruleID == "" {
|
||||||
|
render.Error(rw, errors.NewInvalidInputf(errors.CodeInvalidInput, "rule ID is required"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var params alertmanagertypes.QueryRuleStateHistory
|
||||||
|
if err := json.NewDecoder(req.Body).Decode(¶ms); err != nil {
|
||||||
|
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid request body: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := params.Validate(); err != nil {
|
||||||
|
render.Error(rw, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
result, err := api.alertmanager.GetRuleStateHistoryTopContributors(ctx, claims.OrgID, ruleID, ¶ms)
|
||||||
|
if err != nil {
|
||||||
|
render.Error(rw, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
render.Success(rw, http.StatusOK, result)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (api *API) GetOverallStateTransitions(rw http.ResponseWriter, req *http.Request) {
|
||||||
|
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
claims, err := authtypes.ClaimsFromContext(ctx)
|
||||||
|
if err != nil {
|
||||||
|
render.Error(rw, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ruleID := mux.Vars(req)["id"]
|
||||||
|
if ruleID == "" {
|
||||||
|
render.Error(rw, errors.NewInvalidInputf(errors.CodeInvalidInput, "rule ID is required"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var params alertmanagertypes.QueryRuleStateHistory
|
||||||
|
if err := json.NewDecoder(req.Body).Decode(¶ms); err != nil {
|
||||||
|
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid request body: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := params.Validate(); err != nil {
|
||||||
|
render.Error(rw, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
result, err := api.alertmanager.GetOverallStateTransitions(ctx, claims.OrgID, ruleID, ¶ms)
|
||||||
|
if err != nil {
|
||||||
|
render.Error(rw, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
render.Success(rw, http.StatusOK, result)
|
||||||
|
}
|
||||||
|
|||||||
1123
pkg/alertmanager/api_test.go
Normal file
1123
pkg/alertmanager/api_test.go
Normal file
File diff suppressed because it is too large
Load Diff
@@ -2,10 +2,15 @@ package alertmanager
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"math"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/prometheus/alertmanager/featurecontrol"
|
"github.com/prometheus/alertmanager/featurecontrol"
|
||||||
"github.com/prometheus/alertmanager/matcher/compat"
|
"github.com/prometheus/alertmanager/matcher/compat"
|
||||||
|
amtypes "github.com/prometheus/alertmanager/types"
|
||||||
|
"github.com/prometheus/common/model"
|
||||||
|
|
||||||
"github.com/SigNoz/signoz/pkg/alertmanager/alertmanagerserver"
|
"github.com/SigNoz/signoz/pkg/alertmanager/alertmanagerserver"
|
||||||
"github.com/SigNoz/signoz/pkg/alertmanager/nfmanager"
|
"github.com/SigNoz/signoz/pkg/alertmanager/nfmanager"
|
||||||
@@ -38,6 +43,15 @@ type Service struct {
|
|||||||
serversMtx sync.RWMutex
|
serversMtx sync.RWMutex
|
||||||
|
|
||||||
notificationManager nfmanager.NotificationManager
|
notificationManager nfmanager.NotificationManager
|
||||||
|
|
||||||
|
// maintenanceExprMuter is an optional muter for expression-based maintenance scoping
|
||||||
|
maintenanceExprMuter amtypes.Muter
|
||||||
|
|
||||||
|
// stateHistoryStore writes rule state history to persistent storage (e.g. ClickHouse)
|
||||||
|
stateHistoryStore alertmanagertypes.StateHistoryStore
|
||||||
|
|
||||||
|
// stateTracker tracks alert state transitions for v2 state history recording
|
||||||
|
stateTracker *stateTracker
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(
|
func New(
|
||||||
@@ -48,16 +62,21 @@ func New(
|
|||||||
configStore alertmanagertypes.ConfigStore,
|
configStore alertmanagertypes.ConfigStore,
|
||||||
orgGetter organization.Getter,
|
orgGetter organization.Getter,
|
||||||
nfManager nfmanager.NotificationManager,
|
nfManager nfmanager.NotificationManager,
|
||||||
|
maintenanceExprMuter amtypes.Muter,
|
||||||
|
stateHistoryStore alertmanagertypes.StateHistoryStore,
|
||||||
) *Service {
|
) *Service {
|
||||||
service := &Service{
|
service := &Service{
|
||||||
config: config,
|
config: config,
|
||||||
stateStore: stateStore,
|
stateStore: stateStore,
|
||||||
configStore: configStore,
|
configStore: configStore,
|
||||||
orgGetter: orgGetter,
|
orgGetter: orgGetter,
|
||||||
settings: settings,
|
settings: settings,
|
||||||
servers: make(map[string]*alertmanagerserver.Server),
|
servers: make(map[string]*alertmanagerserver.Server),
|
||||||
serversMtx: sync.RWMutex{},
|
serversMtx: sync.RWMutex{},
|
||||||
notificationManager: nfManager,
|
notificationManager: nfManager,
|
||||||
|
maintenanceExprMuter: maintenanceExprMuter,
|
||||||
|
stateHistoryStore: stateHistoryStore,
|
||||||
|
stateTracker: newStateTracker(),
|
||||||
}
|
}
|
||||||
|
|
||||||
return service
|
return service
|
||||||
@@ -131,7 +150,21 @@ func (service *Service) PutAlerts(ctx context.Context, orgID string, alerts aler
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return server.PutAlerts(ctx, alerts)
|
// Convert to typed alerts for state tracking (same conversion the server does).
|
||||||
|
now := time.Now()
|
||||||
|
typedAlerts, _ := alertmanagertypes.NewAlertsFromPostableAlerts(
|
||||||
|
alerts, time.Duration(service.config.Global.ResolveTimeout), now,
|
||||||
|
)
|
||||||
|
|
||||||
|
// Delegate to server for notification pipeline.
|
||||||
|
if err := server.PutAlerts(ctx, alerts); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Record state history from the incoming alerts.
|
||||||
|
service.recordStateHistoryFromAlerts(ctx, orgID, typedAlerts, now)
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (service *Service) TestReceiver(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error {
|
func (service *Service) TestReceiver(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error {
|
||||||
@@ -176,7 +209,7 @@ func (service *Service) newServer(ctx context.Context, orgID string) (*alertmana
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
server, err := alertmanagerserver.New(ctx, service.settings.Logger(), service.settings.PrometheusRegisterer(), service.config, orgID, service.stateStore, service.notificationManager)
|
server, err := alertmanagerserver.New(ctx, service.settings.Logger(), service.settings.PrometheusRegisterer(), service.config, orgID, service.stateStore, service.notificationManager, service.maintenanceExprMuter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -255,6 +288,205 @@ func (service *Service) compareAndSelectConfig(ctx context.Context, incomingConf
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RecordRuleStateHistory applies maintenance muting logic and writes state history entries.
|
||||||
|
// For each entry with State=="firing", if the maintenance muter matches the entry's labels,
|
||||||
|
// the state is changed to "muted" before writing.
|
||||||
|
func (service *Service) RecordRuleStateHistory(ctx context.Context, orgID string, entries []alertmanagertypes.RuleStateHistory) error {
|
||||||
|
if service.stateHistoryStore == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range entries {
|
||||||
|
entries[i].OrgID = orgID
|
||||||
|
}
|
||||||
|
|
||||||
|
if service.maintenanceExprMuter != nil {
|
||||||
|
for i := range entries {
|
||||||
|
if entries[i].State != "firing" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
lbls := labelsFromJSON(entries[i].Labels)
|
||||||
|
if lbls == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Add ruleId to the label set for muter matching.
|
||||||
|
lbls["ruleId"] = model.LabelValue(entries[i].RuleID)
|
||||||
|
if service.maintenanceExprMuter.Mutes(lbls) {
|
||||||
|
entries[i].State = "muted"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return service.stateHistoryStore.WriteRuleStateHistory(ctx, entries)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (service *Service) GetLastSavedRuleStateHistory(ctx context.Context, ruleID string) ([]alertmanagertypes.RuleStateHistory, error) {
|
||||||
|
if service.stateHistoryStore == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return service.stateHistoryStore.GetLastSavedRuleStateHistory(ctx, ruleID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (service *Service) GetRuleStateHistoryTimeline(ctx context.Context, orgID string, ruleID string, params *alertmanagertypes.QueryRuleStateHistory) (*alertmanagertypes.RuleStateTimeline, error) {
|
||||||
|
if service.stateHistoryStore == nil {
|
||||||
|
return &alertmanagertypes.RuleStateTimeline{Items: []alertmanagertypes.RuleStateHistory{}}, nil
|
||||||
|
}
|
||||||
|
return service.stateHistoryStore.GetRuleStateHistoryTimeline(ctx, orgID, ruleID, params)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (service *Service) GetRuleStateHistoryTopContributors(ctx context.Context, orgID string, ruleID string, params *alertmanagertypes.QueryRuleStateHistory) ([]alertmanagertypes.RuleStateHistoryContributor, error) {
|
||||||
|
if service.stateHistoryStore == nil {
|
||||||
|
return []alertmanagertypes.RuleStateHistoryContributor{}, nil
|
||||||
|
}
|
||||||
|
return service.stateHistoryStore.GetRuleStateHistoryTopContributors(ctx, orgID, ruleID, params)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (service *Service) GetOverallStateTransitions(ctx context.Context, orgID string, ruleID string, params *alertmanagertypes.QueryRuleStateHistory) ([]alertmanagertypes.RuleStateTransition, error) {
|
||||||
|
if service.stateHistoryStore == nil {
|
||||||
|
return []alertmanagertypes.RuleStateTransition{}, nil
|
||||||
|
}
|
||||||
|
return service.stateHistoryStore.GetOverallStateTransitions(ctx, orgID, ruleID, params)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (service *Service) GetRuleStats(ctx context.Context, orgID string, ruleID string, params *alertmanagertypes.QueryRuleStateHistory) (*alertmanagertypes.RuleStats, error) {
|
||||||
|
if service.stateHistoryStore == nil {
|
||||||
|
return &alertmanagertypes.RuleStats{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
store := service.stateHistoryStore
|
||||||
|
|
||||||
|
// Current period stats.
|
||||||
|
totalCurrentTriggers, err := store.GetTotalTriggers(ctx, orgID, ruleID, params)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
currentTriggersSeries, err := store.GetTriggersByInterval(ctx, orgID, ruleID, params)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
currentAvgResolutionTime, err := store.GetAvgResolutionTime(ctx, orgID, ruleID, params)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
currentAvgResolutionTimeSeries, err := store.GetAvgResolutionTimeByInterval(ctx, orgID, ruleID, params)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Past period stats — shift time window backward.
|
||||||
|
pastParams := *params
|
||||||
|
duration := params.End - params.Start
|
||||||
|
if duration >= 86400000 {
|
||||||
|
days := int64(math.Ceil(float64(duration) / 86400000))
|
||||||
|
pastParams.Start -= days * 86400000
|
||||||
|
pastParams.End -= days * 86400000
|
||||||
|
} else {
|
||||||
|
pastParams.Start -= 86400000
|
||||||
|
pastParams.End -= 86400000
|
||||||
|
}
|
||||||
|
|
||||||
|
totalPastTriggers, err := store.GetTotalTriggers(ctx, orgID, ruleID, &pastParams)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
pastTriggersSeries, err := store.GetTriggersByInterval(ctx, orgID, ruleID, &pastParams)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
pastAvgResolutionTime, err := store.GetAvgResolutionTime(ctx, orgID, ruleID, &pastParams)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
pastAvgResolutionTimeSeries, err := store.GetAvgResolutionTimeByInterval(ctx, orgID, ruleID, &pastParams)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if math.IsNaN(currentAvgResolutionTime) || math.IsInf(currentAvgResolutionTime, 0) {
|
||||||
|
currentAvgResolutionTime = 0
|
||||||
|
}
|
||||||
|
if math.IsNaN(pastAvgResolutionTime) || math.IsInf(pastAvgResolutionTime, 0) {
|
||||||
|
pastAvgResolutionTime = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
return &alertmanagertypes.RuleStats{
|
||||||
|
TotalCurrentTriggers: totalCurrentTriggers,
|
||||||
|
TotalPastTriggers: totalPastTriggers,
|
||||||
|
CurrentTriggersSeries: currentTriggersSeries,
|
||||||
|
PastTriggersSeries: pastTriggersSeries,
|
||||||
|
CurrentAvgResolutionTime: currentAvgResolutionTime,
|
||||||
|
PastAvgResolutionTime: pastAvgResolutionTime,
|
||||||
|
CurrentAvgResolutionTimeSeries: currentAvgResolutionTimeSeries,
|
||||||
|
PastAvgResolutionTimeSeries: pastAvgResolutionTimeSeries,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// recordStateHistoryFromAlerts detects state transitions from incoming alerts
|
||||||
|
// and records them via RecordRuleStateHistory (which applies maintenance muting).
|
||||||
|
func (service *Service) recordStateHistoryFromAlerts(ctx context.Context, orgID string, alerts []*amtypes.Alert, now time.Time) {
|
||||||
|
if service.stateHistoryStore == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
entries := service.stateTracker.processAlerts(orgID, alerts, now)
|
||||||
|
if len(entries) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := service.RecordRuleStateHistory(ctx, orgID, entries); err != nil {
|
||||||
|
service.settings.Logger().ErrorContext(ctx, "failed to record state history", "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// StartStateHistorySweep starts a background goroutine that periodically checks
|
||||||
|
// for stale firing alerts and records them as resolved. Call this once after creating the service.
|
||||||
|
func (service *Service) StartStateHistorySweep(ctx context.Context) {
|
||||||
|
if service.stateHistoryStore == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
staleTimeout := 2 * time.Duration(service.config.Global.ResolveTimeout)
|
||||||
|
if staleTimeout == 0 {
|
||||||
|
staleTimeout = 10 * time.Minute
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
ticker := time.NewTicker(1 * time.Minute)
|
||||||
|
defer ticker.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
now := time.Now()
|
||||||
|
entriesByOrg := service.stateTracker.sweepStale(staleTimeout, now)
|
||||||
|
for orgID, orgEntries := range entriesByOrg {
|
||||||
|
if err := service.RecordRuleStateHistory(ctx, orgID, orgEntries); err != nil {
|
||||||
|
service.settings.Logger().ErrorContext(ctx, "failed to record stale state history", "org_id", orgID, "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// labelsFromJSON parses a JSON string of labels into a model.LabelSet.
|
||||||
|
func labelsFromJSON(labelsJSON string) model.LabelSet {
|
||||||
|
if labelsJSON == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
var m map[string]string
|
||||||
|
if err := json.Unmarshal([]byte(labelsJSON), &m); err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
ls := make(model.LabelSet, len(m))
|
||||||
|
for k, v := range m {
|
||||||
|
ls[model.LabelName(k)] = model.LabelValue(v)
|
||||||
|
}
|
||||||
|
return ls
|
||||||
|
}
|
||||||
|
|
||||||
// getServer returns the server for the given orgID. It should be called with the lock held.
|
// getServer returns the server for the given orgID. It should be called with the lock held.
|
||||||
func (service *Service) getServer(orgID string) (*alertmanagerserver.Server, error) {
|
func (service *Service) getServer(orgID string) (*alertmanagerserver.Server, error) {
|
||||||
server, ok := service.servers[orgID]
|
server, ok := service.servers[orgID]
|
||||||
|
|||||||
426
pkg/alertmanager/service_test.go
Normal file
426
pkg/alertmanager/service_test.go
Normal file
@@ -0,0 +1,426 @@
|
|||||||
|
package alertmanager
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"math"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
|
||||||
|
"github.com/prometheus/common/model"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
// testMuter implements amtypes.Muter for testing.
|
||||||
|
type testMuter struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
muteFunc func(model.LabelSet) bool
|
||||||
|
calls []model.LabelSet
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *testMuter) Mutes(labels model.LabelSet) bool {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
m.calls = append(m.calls, labels)
|
||||||
|
if m.muteFunc != nil {
|
||||||
|
return m.muteFunc(labels)
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *testMuter) getCalls() []model.LabelSet {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
result := make([]model.LabelSet, len(m.calls))
|
||||||
|
copy(result, m.calls)
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// fakeStateHistoryStore captures calls for assertion.
|
||||||
|
type fakeStateHistoryStore struct {
|
||||||
|
written []alertmanagertypes.RuleStateHistory
|
||||||
|
lastErr error
|
||||||
|
getResult []alertmanagertypes.RuleStateHistory
|
||||||
|
getErr error
|
||||||
|
|
||||||
|
// Stats method returns
|
||||||
|
totalTriggers uint64
|
||||||
|
totalTriggersErr error
|
||||||
|
triggersSeries *alertmanagertypes.Series
|
||||||
|
triggersSeriesErr error
|
||||||
|
avgResolutionTime float64
|
||||||
|
avgResolutionTimeErr error
|
||||||
|
avgResTimeSeries *alertmanagertypes.Series
|
||||||
|
avgResTimeSeriesErr error
|
||||||
|
|
||||||
|
// Captures params passed to stats methods
|
||||||
|
statsCalls []*alertmanagertypes.QueryRuleStateHistory
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *fakeStateHistoryStore) WriteRuleStateHistory(_ context.Context, entries []alertmanagertypes.RuleStateHistory) error {
|
||||||
|
w.written = append(w.written, entries...)
|
||||||
|
return w.lastErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *fakeStateHistoryStore) GetLastSavedRuleStateHistory(_ context.Context, _ string) ([]alertmanagertypes.RuleStateHistory, error) {
|
||||||
|
return w.getResult, w.getErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *fakeStateHistoryStore) GetRuleStateHistoryTimeline(_ context.Context, _ string, _ string, _ *alertmanagertypes.QueryRuleStateHistory) (*alertmanagertypes.RuleStateTimeline, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *fakeStateHistoryStore) GetRuleStateHistoryTopContributors(_ context.Context, _ string, _ string, _ *alertmanagertypes.QueryRuleStateHistory) ([]alertmanagertypes.RuleStateHistoryContributor, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *fakeStateHistoryStore) GetOverallStateTransitions(_ context.Context, _ string, _ string, _ *alertmanagertypes.QueryRuleStateHistory) ([]alertmanagertypes.RuleStateTransition, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *fakeStateHistoryStore) GetTotalTriggers(_ context.Context, _ string, _ string, params *alertmanagertypes.QueryRuleStateHistory) (uint64, error) {
|
||||||
|
w.statsCalls = append(w.statsCalls, params)
|
||||||
|
return w.totalTriggers, w.totalTriggersErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *fakeStateHistoryStore) GetTriggersByInterval(_ context.Context, _ string, _ string, params *alertmanagertypes.QueryRuleStateHistory) (*alertmanagertypes.Series, error) {
|
||||||
|
w.statsCalls = append(w.statsCalls, params)
|
||||||
|
return w.triggersSeries, w.triggersSeriesErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *fakeStateHistoryStore) GetAvgResolutionTime(_ context.Context, _ string, _ string, params *alertmanagertypes.QueryRuleStateHistory) (float64, error) {
|
||||||
|
w.statsCalls = append(w.statsCalls, params)
|
||||||
|
return w.avgResolutionTime, w.avgResolutionTimeErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *fakeStateHistoryStore) GetAvgResolutionTimeByInterval(_ context.Context, _ string, _ string, params *alertmanagertypes.QueryRuleStateHistory) (*alertmanagertypes.Series, error) {
|
||||||
|
w.statsCalls = append(w.statsCalls, params)
|
||||||
|
return w.avgResTimeSeries, w.avgResTimeSeriesErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLabelsFromJSON(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
input string
|
||||||
|
want model.LabelSet
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "empty string",
|
||||||
|
input: "",
|
||||||
|
want: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "invalid json",
|
||||||
|
input: "not json",
|
||||||
|
want: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "valid labels",
|
||||||
|
input: `{"env":"prod","severity":"critical"}`,
|
||||||
|
want: model.LabelSet{
|
||||||
|
"env": "prod",
|
||||||
|
"severity": "critical",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "empty object",
|
||||||
|
input: `{}`,
|
||||||
|
want: model.LabelSet{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "single label",
|
||||||
|
input: `{"alertname":"HighCPU"}`,
|
||||||
|
want: model.LabelSet{"alertname": "HighCPU"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
got := labelsFromJSON(tc.input)
|
||||||
|
assert.Equal(t, tc.want, got)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRecordRuleStateHistory(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
t.Run("nil writer returns nil", func(t *testing.T) {
|
||||||
|
svc := &Service{stateHistoryStore: nil}
|
||||||
|
err := svc.RecordRuleStateHistory(ctx, "org-1", []alertmanagertypes.RuleStateHistory{
|
||||||
|
{RuleID: "r1", State: "firing"},
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("no muter writes entries unchanged", func(t *testing.T) {
|
||||||
|
writer := &fakeStateHistoryStore{}
|
||||||
|
svc := &Service{
|
||||||
|
stateHistoryStore: writer,
|
||||||
|
maintenanceExprMuter: nil,
|
||||||
|
}
|
||||||
|
|
||||||
|
entries := []alertmanagertypes.RuleStateHistory{
|
||||||
|
{RuleID: "r1", State: "firing", Labels: `{"env":"prod"}`},
|
||||||
|
{RuleID: "r2", State: "normal", Labels: `{"env":"staging"}`},
|
||||||
|
}
|
||||||
|
err := svc.RecordRuleStateHistory(ctx, "org-1", entries)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, writer.written, 2)
|
||||||
|
assert.Equal(t, "firing", writer.written[0].State)
|
||||||
|
assert.Equal(t, "normal", writer.written[1].State)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("muter changes firing to muted when matched", func(t *testing.T) {
|
||||||
|
writer := &fakeStateHistoryStore{}
|
||||||
|
muter := &testMuter{
|
||||||
|
muteFunc: func(ls model.LabelSet) bool {
|
||||||
|
return ls["env"] == "prod"
|
||||||
|
},
|
||||||
|
}
|
||||||
|
svc := &Service{
|
||||||
|
stateHistoryStore: writer,
|
||||||
|
maintenanceExprMuter: muter,
|
||||||
|
}
|
||||||
|
|
||||||
|
entries := []alertmanagertypes.RuleStateHistory{
|
||||||
|
{RuleID: "r1", State: "firing", Labels: `{"env":"prod"}`},
|
||||||
|
}
|
||||||
|
err := svc.RecordRuleStateHistory(ctx, "org-1", entries)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, writer.written, 1)
|
||||||
|
assert.Equal(t, "muted", writer.written[0].State)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("muter does not change firing when not matched", func(t *testing.T) {
|
||||||
|
writer := &fakeStateHistoryStore{}
|
||||||
|
muter := &testMuter{
|
||||||
|
muteFunc: func(ls model.LabelSet) bool {
|
||||||
|
return ls["env"] == "prod"
|
||||||
|
},
|
||||||
|
}
|
||||||
|
svc := &Service{
|
||||||
|
stateHistoryStore: writer,
|
||||||
|
maintenanceExprMuter: muter,
|
||||||
|
}
|
||||||
|
|
||||||
|
entries := []alertmanagertypes.RuleStateHistory{
|
||||||
|
{RuleID: "r1", State: "firing", Labels: `{"env":"staging"}`},
|
||||||
|
}
|
||||||
|
err := svc.RecordRuleStateHistory(ctx, "org-1", entries)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, writer.written, 1)
|
||||||
|
assert.Equal(t, "firing", writer.written[0].State)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("muter only affects firing entries", func(t *testing.T) {
|
||||||
|
writer := &fakeStateHistoryStore{}
|
||||||
|
muter := &testMuter{
|
||||||
|
muteFunc: func(model.LabelSet) bool { return true }, // mute everything
|
||||||
|
}
|
||||||
|
svc := &Service{
|
||||||
|
stateHistoryStore: writer,
|
||||||
|
maintenanceExprMuter: muter,
|
||||||
|
}
|
||||||
|
|
||||||
|
entries := []alertmanagertypes.RuleStateHistory{
|
||||||
|
{RuleID: "r1", State: "normal", Labels: `{"env":"prod"}`},
|
||||||
|
{RuleID: "r2", State: "no_data", Labels: `{"env":"prod"}`},
|
||||||
|
{RuleID: "r3", State: "firing", Labels: `{"env":"prod"}`},
|
||||||
|
}
|
||||||
|
err := svc.RecordRuleStateHistory(ctx, "org-1", entries)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, writer.written, 3)
|
||||||
|
assert.Equal(t, "normal", writer.written[0].State, "normal should not be muted")
|
||||||
|
assert.Equal(t, "no_data", writer.written[1].State, "no_data should not be muted")
|
||||||
|
assert.Equal(t, "muted", writer.written[2].State, "firing should become muted")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("ruleId is injected into labels for muter evaluation", func(t *testing.T) {
|
||||||
|
writer := &fakeStateHistoryStore{}
|
||||||
|
muter := &testMuter{
|
||||||
|
muteFunc: func(ls model.LabelSet) bool {
|
||||||
|
return ls["ruleId"] == "target-rule"
|
||||||
|
},
|
||||||
|
}
|
||||||
|
svc := &Service{
|
||||||
|
stateHistoryStore: writer,
|
||||||
|
maintenanceExprMuter: muter,
|
||||||
|
}
|
||||||
|
|
||||||
|
entries := []alertmanagertypes.RuleStateHistory{
|
||||||
|
{RuleID: "target-rule", State: "firing", Labels: `{"env":"prod"}`},
|
||||||
|
{RuleID: "other-rule", State: "firing", Labels: `{"env":"prod"}`},
|
||||||
|
}
|
||||||
|
err := svc.RecordRuleStateHistory(ctx, "org-1", entries)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, writer.written, 2)
|
||||||
|
assert.Equal(t, "muted", writer.written[0].State, "target-rule should be muted")
|
||||||
|
assert.Equal(t, "firing", writer.written[1].State, "other-rule should not be muted")
|
||||||
|
|
||||||
|
// Verify the muter received labels with ruleId injected
|
||||||
|
calls := muter.getCalls()
|
||||||
|
require.Len(t, calls, 2)
|
||||||
|
assert.Equal(t, model.LabelValue("target-rule"), calls[0]["ruleId"])
|
||||||
|
assert.Equal(t, model.LabelValue("other-rule"), calls[1]["ruleId"])
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("invalid labels JSON skips muting check", func(t *testing.T) {
|
||||||
|
writer := &fakeStateHistoryStore{}
|
||||||
|
muter := &testMuter{
|
||||||
|
muteFunc: func(model.LabelSet) bool { return true },
|
||||||
|
}
|
||||||
|
svc := &Service{
|
||||||
|
stateHistoryStore: writer,
|
||||||
|
maintenanceExprMuter: muter,
|
||||||
|
}
|
||||||
|
|
||||||
|
entries := []alertmanagertypes.RuleStateHistory{
|
||||||
|
{RuleID: "r1", State: "firing", Labels: "not-json"},
|
||||||
|
{RuleID: "r2", State: "firing", Labels: ""},
|
||||||
|
}
|
||||||
|
err := svc.RecordRuleStateHistory(ctx, "org-1", entries)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, writer.written, 2)
|
||||||
|
// Both should stay firing because labels couldn't be parsed
|
||||||
|
assert.Equal(t, "firing", writer.written[0].State)
|
||||||
|
assert.Equal(t, "firing", writer.written[1].State)
|
||||||
|
// Muter should not have been called
|
||||||
|
assert.Empty(t, muter.getCalls())
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("mixed entries with selective muting", func(t *testing.T) {
|
||||||
|
writer := &fakeStateHistoryStore{}
|
||||||
|
muter := &testMuter{
|
||||||
|
muteFunc: func(ls model.LabelSet) bool {
|
||||||
|
return ls["severity"] == "warning"
|
||||||
|
},
|
||||||
|
}
|
||||||
|
svc := &Service{
|
||||||
|
stateHistoryStore: writer,
|
||||||
|
maintenanceExprMuter: muter,
|
||||||
|
}
|
||||||
|
|
||||||
|
entries := []alertmanagertypes.RuleStateHistory{
|
||||||
|
{RuleID: "r1", State: "firing", Labels: `{"severity":"critical"}`, Fingerprint: 1},
|
||||||
|
{RuleID: "r2", State: "firing", Labels: `{"severity":"warning"}`, Fingerprint: 2},
|
||||||
|
{RuleID: "r3", State: "normal", Labels: `{"severity":"warning"}`, Fingerprint: 3},
|
||||||
|
{RuleID: "r4", State: "firing", Labels: `{"severity":"warning"}`, Fingerprint: 4},
|
||||||
|
}
|
||||||
|
err := svc.RecordRuleStateHistory(ctx, "org-1", entries)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, writer.written, 4)
|
||||||
|
assert.Equal(t, "firing", writer.written[0].State, "critical firing stays firing")
|
||||||
|
assert.Equal(t, "muted", writer.written[1].State, "warning firing becomes muted")
|
||||||
|
assert.Equal(t, "normal", writer.written[2].State, "normal is never muted")
|
||||||
|
assert.Equal(t, "muted", writer.written[3].State, "warning firing becomes muted")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetLastSavedRuleStateHistory(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
t.Run("nil writer returns nil", func(t *testing.T) {
|
||||||
|
svc := &Service{stateHistoryStore: nil}
|
||||||
|
result, err := svc.GetLastSavedRuleStateHistory(ctx, "r1")
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Nil(t, result)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("delegates to writer", func(t *testing.T) {
|
||||||
|
expected := []alertmanagertypes.RuleStateHistory{
|
||||||
|
{RuleID: "r1", State: "firing", Fingerprint: 123},
|
||||||
|
}
|
||||||
|
writer := &fakeStateHistoryStore{getResult: expected}
|
||||||
|
svc := &Service{stateHistoryStore: writer}
|
||||||
|
|
||||||
|
result, err := svc.GetLastSavedRuleStateHistory(ctx, "r1")
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, expected, result)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetRuleStats(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
t.Run("aggregates current and past period stats", func(t *testing.T) {
|
||||||
|
currentSeries := &alertmanagertypes.Series{Points: []alertmanagertypes.Point{{Timestamp: 1000, Value: 5}}}
|
||||||
|
currentResSeries := &alertmanagertypes.Series{Points: []alertmanagertypes.Point{{Timestamp: 1000, Value: 120}}}
|
||||||
|
store := &fakeStateHistoryStore{
|
||||||
|
totalTriggers: 10,
|
||||||
|
triggersSeries: currentSeries,
|
||||||
|
avgResolutionTime: 300.5,
|
||||||
|
avgResTimeSeries: currentResSeries,
|
||||||
|
}
|
||||||
|
svc := &Service{stateHistoryStore: store}
|
||||||
|
|
||||||
|
params := &alertmanagertypes.QueryRuleStateHistory{
|
||||||
|
Start: 1000,
|
||||||
|
End: 90000000, // ~1 day
|
||||||
|
}
|
||||||
|
result, err := svc.GetRuleStats(ctx, "org-1", "rule-1", params)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
assert.Equal(t, uint64(10), result.TotalCurrentTriggers)
|
||||||
|
assert.Equal(t, uint64(10), result.TotalPastTriggers)
|
||||||
|
assert.Equal(t, currentSeries, result.CurrentTriggersSeries)
|
||||||
|
assert.Equal(t, 300.5, result.CurrentAvgResolutionTime)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("period shifting for duration >= 1 day", func(t *testing.T) {
|
||||||
|
store := &fakeStateHistoryStore{}
|
||||||
|
svc := &Service{stateHistoryStore: store}
|
||||||
|
|
||||||
|
// 2-day window: Start=0, End=172800000 (2 days in millis)
|
||||||
|
params := &alertmanagertypes.QueryRuleStateHistory{
|
||||||
|
Start: 0,
|
||||||
|
End: 172800000,
|
||||||
|
}
|
||||||
|
_, err := svc.GetRuleStats(ctx, "org-1", "rule-1", params)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// First 4 calls are current period, next 4 are past period.
|
||||||
|
// For 2 days: ceil(172800000/86400000) = 2, shift = 2*86400000 = 172800000
|
||||||
|
require.GreaterOrEqual(t, len(store.statsCalls), 8)
|
||||||
|
pastParams := store.statsCalls[4] // first past period call
|
||||||
|
assert.Equal(t, int64(-172800000), pastParams.Start)
|
||||||
|
assert.Equal(t, int64(0), pastParams.End)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("period shifting for duration < 1 day", func(t *testing.T) {
|
||||||
|
store := &fakeStateHistoryStore{}
|
||||||
|
svc := &Service{stateHistoryStore: store}
|
||||||
|
|
||||||
|
// 1-hour window
|
||||||
|
params := &alertmanagertypes.QueryRuleStateHistory{
|
||||||
|
Start: 100000000,
|
||||||
|
End: 103600000, // 3600000ms = 1 hour
|
||||||
|
}
|
||||||
|
_, err := svc.GetRuleStats(ctx, "org-1", "rule-1", params)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// For < 1 day: shift by exactly 1 day (86400000ms)
|
||||||
|
require.GreaterOrEqual(t, len(store.statsCalls), 8)
|
||||||
|
pastParams := store.statsCalls[4]
|
||||||
|
assert.Equal(t, int64(100000000-86400000), pastParams.Start)
|
||||||
|
assert.Equal(t, int64(103600000-86400000), pastParams.End)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("NaN and Inf avg resolution times are zeroed", func(t *testing.T) {
|
||||||
|
for _, val := range []float64{math.NaN(), math.Inf(1), math.Inf(-1)} {
|
||||||
|
store := &fakeStateHistoryStore{
|
||||||
|
avgResolutionTime: val,
|
||||||
|
}
|
||||||
|
svc := &Service{stateHistoryStore: store}
|
||||||
|
|
||||||
|
result, err := svc.GetRuleStats(ctx, "org-1", "rule-1", &alertmanagertypes.QueryRuleStateHistory{
|
||||||
|
Start: 0, End: 100000000,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, float64(0), result.CurrentAvgResolutionTime)
|
||||||
|
assert.Equal(t, float64(0), result.PastAvgResolutionTime)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
150
pkg/alertmanager/signozalertmanager/expreval.go
Normal file
150
pkg/alertmanager/signozalertmanager/expreval.go
Normal file
@@ -0,0 +1,150 @@
|
|||||||
|
package signozalertmanager
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log/slog"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/expr-lang/expr"
|
||||||
|
"github.com/prometheus/common/model"
|
||||||
|
)
|
||||||
|
|
||||||
|
// convertLabelSetToEnv converts a flat label set with dotted keys into a nested map
|
||||||
|
// structure for expr-lang evaluation. When both a leaf and a deeper nested path exist
|
||||||
|
// (e.g. "foo" and "foo.bar"), the nested structure takes precedence.
|
||||||
|
func convertLabelSetToEnv(labelSet model.LabelSet) map[string]interface{} {
|
||||||
|
env := make(map[string]interface{})
|
||||||
|
|
||||||
|
for lk, lv := range labelSet {
|
||||||
|
key := strings.TrimSpace(string(lk))
|
||||||
|
value := string(lv)
|
||||||
|
|
||||||
|
if strings.Contains(key, ".") {
|
||||||
|
parts := strings.Split(key, ".")
|
||||||
|
current := env
|
||||||
|
|
||||||
|
for i, raw := range parts {
|
||||||
|
part := strings.TrimSpace(raw)
|
||||||
|
|
||||||
|
last := i == len(parts)-1
|
||||||
|
if last {
|
||||||
|
if _, isMap := current[part].(map[string]interface{}); isMap {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
current[part] = value
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if nextMap, ok := current[part].(map[string]interface{}); ok {
|
||||||
|
current = nextMap
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
newMap := make(map[string]interface{})
|
||||||
|
current[part] = newMap
|
||||||
|
current = newMap
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, isMap := env[key].(map[string]interface{}); isMap {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
env[key] = value
|
||||||
|
}
|
||||||
|
|
||||||
|
return env
|
||||||
|
}
|
||||||
|
|
||||||
|
// evaluateExpr compiles and runs an expr-lang expression against the given label set.
|
||||||
|
func evaluateExpr(expression string, labelSet model.LabelSet) (bool, error) {
|
||||||
|
env := convertLabelSetToEnv(labelSet)
|
||||||
|
|
||||||
|
program, err := expr.Compile(expression, expr.Env(env))
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
output, err := expr.Run(program, env)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if boolVal, ok := output.(bool); ok {
|
||||||
|
return boolVal, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// activeMaintenanceExpr holds an active maintenance's scoping criteria.
|
||||||
|
// Muting logic: (ruleIDs match OR ruleIDs empty) AND (expression match OR expression empty).
|
||||||
|
type activeMaintenanceExpr struct {
|
||||||
|
ruleIDs []string
|
||||||
|
expression string
|
||||||
|
}
|
||||||
|
|
||||||
|
// MaintenanceExprMuter implements types.Muter for expression-based maintenance scoping.
|
||||||
|
// It evaluates expr-lang expressions against alert labels to determine if an alert
|
||||||
|
// should be muted (suppressed) during a maintenance window.
|
||||||
|
type MaintenanceExprMuter struct {
|
||||||
|
mu sync.RWMutex
|
||||||
|
expressions []activeMaintenanceExpr
|
||||||
|
logger *slog.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewMaintenanceExprMuter creates a new MaintenanceExprMuter.
|
||||||
|
func NewMaintenanceExprMuter(logger *slog.Logger) *MaintenanceExprMuter {
|
||||||
|
return &MaintenanceExprMuter{
|
||||||
|
logger: logger,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mutes returns true if the given label set matches any active maintenance entry.
|
||||||
|
// Each entry uses AND logic: (ruleIDs match OR empty) AND (expression match OR empty).
|
||||||
|
// Empty ruleIDs means all rules are in scope. Empty expression means all labels match.
|
||||||
|
func (m *MaintenanceExprMuter) Mutes(labels model.LabelSet) bool {
|
||||||
|
m.mu.RLock()
|
||||||
|
defer m.mu.RUnlock()
|
||||||
|
|
||||||
|
for _, ae := range m.expressions {
|
||||||
|
// Check rule scope: empty ruleIDs means all rules match.
|
||||||
|
ruleMatch := len(ae.ruleIDs) == 0
|
||||||
|
if !ruleMatch {
|
||||||
|
alertRuleID := string(labels["ruleId"])
|
||||||
|
for _, rid := range ae.ruleIDs {
|
||||||
|
if rid == alertRuleID {
|
||||||
|
ruleMatch = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !ruleMatch {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check expression scope: empty expression means all labels match.
|
||||||
|
if ae.expression == "" {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
matched, err := evaluateExpr(ae.expression, labels)
|
||||||
|
if err != nil {
|
||||||
|
m.logger.Error("failed to evaluate maintenance expression",
|
||||||
|
"expression", ae.expression,
|
||||||
|
"error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if matched {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetActiveExpressions updates the list of active maintenance expressions.
|
||||||
|
func (m *MaintenanceExprMuter) SetActiveExpressions(exprs []activeMaintenanceExpr) {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
m.expressions = exprs
|
||||||
|
}
|
||||||
@@ -10,12 +10,14 @@ import (
|
|||||||
amConfig "github.com/prometheus/alertmanager/config"
|
amConfig "github.com/prometheus/alertmanager/config"
|
||||||
|
|
||||||
"github.com/SigNoz/signoz/pkg/alertmanager"
|
"github.com/SigNoz/signoz/pkg/alertmanager"
|
||||||
|
"github.com/SigNoz/signoz/pkg/alertmanager/alertmanagerstore/clickhousealertmanagerstore"
|
||||||
"github.com/SigNoz/signoz/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore"
|
"github.com/SigNoz/signoz/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore"
|
||||||
"github.com/SigNoz/signoz/pkg/alertmanager/nfmanager"
|
"github.com/SigNoz/signoz/pkg/alertmanager/nfmanager"
|
||||||
"github.com/SigNoz/signoz/pkg/errors"
|
"github.com/SigNoz/signoz/pkg/errors"
|
||||||
"github.com/SigNoz/signoz/pkg/factory"
|
"github.com/SigNoz/signoz/pkg/factory"
|
||||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||||
|
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||||
"github.com/SigNoz/signoz/pkg/types"
|
"github.com/SigNoz/signoz/pkg/types"
|
||||||
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
|
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
|
||||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||||
@@ -23,25 +25,34 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type provider struct {
|
type provider struct {
|
||||||
service *alertmanager.Service
|
service *alertmanager.Service
|
||||||
config alertmanager.Config
|
config alertmanager.Config
|
||||||
settings factory.ScopedProviderSettings
|
settings factory.ScopedProviderSettings
|
||||||
configStore alertmanagertypes.ConfigStore
|
configStore alertmanagertypes.ConfigStore
|
||||||
stateStore alertmanagertypes.StateStore
|
stateStore alertmanagertypes.StateStore
|
||||||
notificationManager nfmanager.NotificationManager
|
notificationManager nfmanager.NotificationManager
|
||||||
stopC chan struct{}
|
maintenanceStore alertmanagertypes.MaintenanceStore
|
||||||
|
maintenanceExprMuter *MaintenanceExprMuter
|
||||||
|
orgGetter organization.Getter
|
||||||
|
stopC chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFactory(sqlstore sqlstore.SQLStore, orgGetter organization.Getter, notificationManager nfmanager.NotificationManager) factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config] {
|
func NewFactory(sqlstore sqlstore.SQLStore, orgGetter organization.Getter, notificationManager nfmanager.NotificationManager, telemetryStore telemetrystore.TelemetryStore) factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config] {
|
||||||
return factory.NewProviderFactory(factory.MustNewName("signoz"), func(ctx context.Context, settings factory.ProviderSettings, config alertmanager.Config) (alertmanager.Alertmanager, error) {
|
return factory.NewProviderFactory(factory.MustNewName("signoz"), func(ctx context.Context, settings factory.ProviderSettings, config alertmanager.Config) (alertmanager.Alertmanager, error) {
|
||||||
return New(ctx, settings, config, sqlstore, orgGetter, notificationManager)
|
return New(ctx, settings, config, sqlstore, orgGetter, notificationManager, telemetryStore)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(ctx context.Context, providerSettings factory.ProviderSettings, config alertmanager.Config, sqlstore sqlstore.SQLStore, orgGetter organization.Getter, notificationManager nfmanager.NotificationManager) (*provider, error) {
|
func New(ctx context.Context, providerSettings factory.ProviderSettings, config alertmanager.Config, sqlstore sqlstore.SQLStore, orgGetter organization.Getter, notificationManager nfmanager.NotificationManager, telemetryStore telemetrystore.TelemetryStore) (*provider, error) {
|
||||||
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/alertmanager/signozalertmanager")
|
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/alertmanager/signozalertmanager")
|
||||||
configStore := sqlalertmanagerstore.NewConfigStore(sqlstore)
|
configStore := sqlalertmanagerstore.NewConfigStore(sqlstore)
|
||||||
stateStore := sqlalertmanagerstore.NewStateStore(sqlstore)
|
stateStore := sqlalertmanagerstore.NewStateStore(sqlstore)
|
||||||
|
maintenanceExprMuter := NewMaintenanceExprMuter(settings.Logger())
|
||||||
|
|
||||||
|
var stateHistoryStore alertmanagertypes.StateHistoryStore
|
||||||
|
if telemetryStore != nil {
|
||||||
|
stateHistoryStore = clickhousealertmanagerstore.NewStateHistoryStore(telemetryStore.ClickhouseDB())
|
||||||
|
}
|
||||||
|
|
||||||
p := &provider{
|
p := &provider{
|
||||||
service: alertmanager.New(
|
service: alertmanager.New(
|
||||||
@@ -52,13 +63,18 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config
|
|||||||
configStore,
|
configStore,
|
||||||
orgGetter,
|
orgGetter,
|
||||||
notificationManager,
|
notificationManager,
|
||||||
|
maintenanceExprMuter,
|
||||||
|
stateHistoryStore,
|
||||||
),
|
),
|
||||||
settings: settings,
|
settings: settings,
|
||||||
config: config,
|
config: config,
|
||||||
configStore: configStore,
|
configStore: configStore,
|
||||||
stateStore: stateStore,
|
stateStore: stateStore,
|
||||||
notificationManager: notificationManager,
|
notificationManager: notificationManager,
|
||||||
stopC: make(chan struct{}),
|
maintenanceStore: sqlalertmanagerstore.NewMaintenanceStore(sqlstore),
|
||||||
|
maintenanceExprMuter: maintenanceExprMuter,
|
||||||
|
orgGetter: orgGetter,
|
||||||
|
stopC: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
return p, nil
|
return p, nil
|
||||||
@@ -70,16 +86,28 @@ func (provider *provider) Start(ctx context.Context) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
ticker := time.NewTicker(provider.config.Signoz.PollInterval)
|
// Initial maintenance sync before entering the ticker loop.
|
||||||
defer ticker.Stop()
|
provider.syncMaintenance(ctx, provider.maintenanceExprMuter)
|
||||||
|
|
||||||
|
// Start background sweep for stale alerts in state history tracking.
|
||||||
|
provider.service.StartStateHistorySweep(ctx)
|
||||||
|
|
||||||
|
serverTicker := time.NewTicker(provider.config.Signoz.PollInterval)
|
||||||
|
defer serverTicker.Stop()
|
||||||
|
|
||||||
|
maintenanceTicker := time.NewTicker(maintenanceSyncInterval)
|
||||||
|
defer maintenanceTicker.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-provider.stopC:
|
case <-provider.stopC:
|
||||||
return nil
|
return nil
|
||||||
case <-ticker.C:
|
case <-serverTicker.C:
|
||||||
if err := provider.service.SyncServers(ctx); err != nil {
|
if err := provider.service.SyncServers(ctx); err != nil {
|
||||||
provider.settings.Logger().ErrorContext(ctx, "failed to sync alertmanager servers", "error", err)
|
provider.settings.Logger().ErrorContext(ctx, "failed to sync alertmanager servers", "error", err)
|
||||||
}
|
}
|
||||||
|
case <-maintenanceTicker.C:
|
||||||
|
provider.syncMaintenance(ctx, provider.maintenanceExprMuter)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -561,3 +589,89 @@ func (provider *provider) DeleteAllInhibitRulesByRuleId(ctx context.Context, org
|
|||||||
|
|
||||||
return provider.configStore.Set(ctx, config)
|
return provider.configStore.Set(ctx, config)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (provider *provider) GetAllPlannedMaintenance(ctx context.Context, orgID string) ([]*alertmanagertypes.GettablePlannedMaintenance, error) {
|
||||||
|
return provider.maintenanceStore.GetAllPlannedMaintenance(ctx, orgID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (provider *provider) GetPlannedMaintenanceByID(ctx context.Context, id valuer.UUID) (*alertmanagertypes.GettablePlannedMaintenance, error) {
|
||||||
|
return provider.maintenanceStore.GetPlannedMaintenanceByID(ctx, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (provider *provider) CreatePlannedMaintenance(ctx context.Context, maintenance alertmanagertypes.GettablePlannedMaintenance) (valuer.UUID, error) {
|
||||||
|
return provider.maintenanceStore.CreatePlannedMaintenance(ctx, maintenance)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (provider *provider) EditPlannedMaintenance(ctx context.Context, maintenance alertmanagertypes.GettablePlannedMaintenance, id valuer.UUID) error {
|
||||||
|
return provider.maintenanceStore.EditPlannedMaintenance(ctx, maintenance, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (provider *provider) DeletePlannedMaintenance(ctx context.Context, id valuer.UUID) error {
|
||||||
|
return provider.maintenanceStore.DeletePlannedMaintenance(ctx, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (provider *provider) RecordRuleStateHistory(ctx context.Context, orgID string, entries []alertmanagertypes.RuleStateHistory) error {
|
||||||
|
return provider.service.RecordRuleStateHistory(ctx, orgID, entries)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (provider *provider) GetLastSavedRuleStateHistory(ctx context.Context, ruleID string) ([]alertmanagertypes.RuleStateHistory, error) {
|
||||||
|
return provider.service.GetLastSavedRuleStateHistory(ctx, ruleID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (provider *provider) GetRuleStateHistoryTimeline(ctx context.Context, orgID string, ruleID string, params *alertmanagertypes.QueryRuleStateHistory) (*alertmanagertypes.RuleStateTimeline, error) {
|
||||||
|
return provider.service.GetRuleStateHistoryTimeline(ctx, orgID, ruleID, params)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (provider *provider) GetRuleStateHistoryTopContributors(ctx context.Context, orgID string, ruleID string, params *alertmanagertypes.QueryRuleStateHistory) ([]alertmanagertypes.RuleStateHistoryContributor, error) {
|
||||||
|
return provider.service.GetRuleStateHistoryTopContributors(ctx, orgID, ruleID, params)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (provider *provider) GetOverallStateTransitions(ctx context.Context, orgID string, ruleID string, params *alertmanagertypes.QueryRuleStateHistory) ([]alertmanagertypes.RuleStateTransition, error) {
|
||||||
|
return provider.service.GetOverallStateTransitions(ctx, orgID, ruleID, params)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (provider *provider) GetRuleStats(ctx context.Context, orgID string, ruleID string, params *alertmanagertypes.QueryRuleStateHistory) (*alertmanagertypes.RuleStats, error) {
|
||||||
|
return provider.service.GetRuleStats(ctx, orgID, ruleID, params)
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
maintenanceSyncInterval = 30 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
// syncMaintenance checks planned maintenance windows and updates the given
|
||||||
|
// MaintenanceExprMuter with active maintenance entries. The muter is injected
|
||||||
|
// into the notification pipeline as a MuteStage, suppressing notifications
|
||||||
|
// while allowing rules to continue evaluating (preserving state history).
|
||||||
|
func (provider *provider) syncMaintenance(ctx context.Context, muter *MaintenanceExprMuter) {
|
||||||
|
orgs, err := provider.orgGetter.ListByOwnedKeyRange(ctx)
|
||||||
|
if err != nil {
|
||||||
|
provider.settings.Logger().ErrorContext(ctx, "failed to list orgs for maintenance sync", "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
var activeExprs []activeMaintenanceExpr
|
||||||
|
|
||||||
|
for _, org := range orgs {
|
||||||
|
orgID := org.ID.StringValue()
|
||||||
|
maintenanceList, err := provider.maintenanceStore.GetAllPlannedMaintenance(ctx, orgID)
|
||||||
|
if err != nil {
|
||||||
|
provider.settings.Logger().ErrorContext(ctx, "failed to get planned maintenance for sync", "orgID", orgID, "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, maint := range maintenanceList {
|
||||||
|
_, active := maint.CurrentWindowEndTime(now)
|
||||||
|
if !active {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
activeExprs = append(activeExprs, activeMaintenanceExpr{
|
||||||
|
ruleIDs: maint.RuleIDs,
|
||||||
|
expression: maint.Expression,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
muter.SetActiveExpressions(activeExprs)
|
||||||
|
}
|
||||||
|
|||||||
216
pkg/alertmanager/signozalertmanager/provider_test.go
Normal file
216
pkg/alertmanager/signozalertmanager/provider_test.go
Normal file
@@ -0,0 +1,216 @@
|
|||||||
|
package signozalertmanager
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log/slog"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/prometheus/common/model"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMaintenanceExprMuter(t *testing.T) {
|
||||||
|
logger := slog.New(slog.DiscardHandler)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
exprs []activeMaintenanceExpr
|
||||||
|
labels model.LabelSet
|
||||||
|
want bool
|
||||||
|
}{
|
||||||
|
// --- no maintenance ---
|
||||||
|
{
|
||||||
|
name: "no expressions - not muted",
|
||||||
|
exprs: nil,
|
||||||
|
labels: model.LabelSet{"env": "prod"},
|
||||||
|
want: false,
|
||||||
|
},
|
||||||
|
// --- expression only (ruleIDs empty = all rules) ---
|
||||||
|
{
|
||||||
|
name: "expression only - matching",
|
||||||
|
exprs: []activeMaintenanceExpr{
|
||||||
|
{expression: `env == "prod"`},
|
||||||
|
},
|
||||||
|
labels: model.LabelSet{"env": "prod"},
|
||||||
|
want: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "expression only - non-matching",
|
||||||
|
exprs: []activeMaintenanceExpr{
|
||||||
|
{expression: `env == "prod"`},
|
||||||
|
},
|
||||||
|
labels: model.LabelSet{"env": "staging"},
|
||||||
|
want: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "expression only - matches regardless of ruleId label",
|
||||||
|
exprs: []activeMaintenanceExpr{
|
||||||
|
{expression: `env == "prod"`},
|
||||||
|
},
|
||||||
|
labels: model.LabelSet{"env": "prod", "ruleId": "any-rule"},
|
||||||
|
want: true,
|
||||||
|
},
|
||||||
|
// --- ruleIDs only (expression empty = all labels) ---
|
||||||
|
{
|
||||||
|
name: "ruleIDs only - matching rule",
|
||||||
|
exprs: []activeMaintenanceExpr{
|
||||||
|
{ruleIDs: []string{"rule-1", "rule-2"}},
|
||||||
|
},
|
||||||
|
labels: model.LabelSet{"ruleId": "rule-1", "env": "prod"},
|
||||||
|
want: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "ruleIDs only - non-matching rule",
|
||||||
|
exprs: []activeMaintenanceExpr{
|
||||||
|
{ruleIDs: []string{"rule-1", "rule-2"}},
|
||||||
|
},
|
||||||
|
labels: model.LabelSet{"ruleId": "rule-3", "env": "prod"},
|
||||||
|
want: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "ruleIDs only - no ruleId label on alert",
|
||||||
|
exprs: []activeMaintenanceExpr{
|
||||||
|
{ruleIDs: []string{"rule-1"}},
|
||||||
|
},
|
||||||
|
labels: model.LabelSet{"env": "prod"},
|
||||||
|
want: false,
|
||||||
|
},
|
||||||
|
// --- ruleIDs AND expression ---
|
||||||
|
{
|
||||||
|
name: "ruleIDs AND expression - both match",
|
||||||
|
exprs: []activeMaintenanceExpr{
|
||||||
|
{ruleIDs: []string{"rule-1"}, expression: `severity == "critical"`},
|
||||||
|
},
|
||||||
|
labels: model.LabelSet{"ruleId": "rule-1", "severity": "critical"},
|
||||||
|
want: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "ruleIDs AND expression - rule matches, expression does not",
|
||||||
|
exprs: []activeMaintenanceExpr{
|
||||||
|
{ruleIDs: []string{"rule-1"}, expression: `severity == "critical"`},
|
||||||
|
},
|
||||||
|
labels: model.LabelSet{"ruleId": "rule-1", "severity": "warning"},
|
||||||
|
want: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "ruleIDs AND expression - expression matches, rule does not",
|
||||||
|
exprs: []activeMaintenanceExpr{
|
||||||
|
{ruleIDs: []string{"rule-1"}, expression: `severity == "critical"`},
|
||||||
|
},
|
||||||
|
labels: model.LabelSet{"ruleId": "rule-999", "severity": "critical"},
|
||||||
|
want: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "ruleIDs AND expression - neither matches",
|
||||||
|
exprs: []activeMaintenanceExpr{
|
||||||
|
{ruleIDs: []string{"rule-1"}, expression: `severity == "critical"`},
|
||||||
|
},
|
||||||
|
labels: model.LabelSet{"ruleId": "rule-999", "severity": "warning"},
|
||||||
|
want: false,
|
||||||
|
},
|
||||||
|
// --- catch-all (both empty) ---
|
||||||
|
{
|
||||||
|
name: "catch-all - empty ruleIDs and empty expression mutes everything",
|
||||||
|
exprs: []activeMaintenanceExpr{
|
||||||
|
{},
|
||||||
|
},
|
||||||
|
labels: model.LabelSet{"ruleId": "any-rule", "env": "anything"},
|
||||||
|
want: true,
|
||||||
|
},
|
||||||
|
// --- multiple expressions ---
|
||||||
|
{
|
||||||
|
name: "multiple entries - first matches",
|
||||||
|
exprs: []activeMaintenanceExpr{
|
||||||
|
{expression: `env == "prod"`},
|
||||||
|
{expression: `env == "staging"`},
|
||||||
|
},
|
||||||
|
labels: model.LabelSet{"env": "prod"},
|
||||||
|
want: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "multiple entries - second matches",
|
||||||
|
exprs: []activeMaintenanceExpr{
|
||||||
|
{expression: `env == "staging"`},
|
||||||
|
{expression: `env == "prod"`},
|
||||||
|
},
|
||||||
|
labels: model.LabelSet{"env": "prod"},
|
||||||
|
want: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "multiple entries - none match",
|
||||||
|
exprs: []activeMaintenanceExpr{
|
||||||
|
{expression: `env == "staging"`},
|
||||||
|
{expression: `env == "dev"`},
|
||||||
|
},
|
||||||
|
labels: model.LabelSet{"env": "prod"},
|
||||||
|
want: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "multiple entries - ruleIDs entry matches, expression entry does not",
|
||||||
|
exprs: []activeMaintenanceExpr{
|
||||||
|
{ruleIDs: []string{"rule-1"}},
|
||||||
|
{expression: `env == "staging"`},
|
||||||
|
},
|
||||||
|
labels: model.LabelSet{"ruleId": "rule-1", "env": "prod"},
|
||||||
|
want: true,
|
||||||
|
},
|
||||||
|
// --- complex expressions ---
|
||||||
|
{
|
||||||
|
name: "complex expression with AND",
|
||||||
|
exprs: []activeMaintenanceExpr{
|
||||||
|
{expression: `severity == "critical" && env == "prod"`},
|
||||||
|
},
|
||||||
|
labels: model.LabelSet{"severity": "critical", "env": "prod"},
|
||||||
|
want: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "complex expression with AND - partial match",
|
||||||
|
exprs: []activeMaintenanceExpr{
|
||||||
|
{expression: `severity == "critical" && env == "prod"`},
|
||||||
|
},
|
||||||
|
labels: model.LabelSet{"severity": "warning", "env": "prod"},
|
||||||
|
want: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "expression with OR logic",
|
||||||
|
exprs: []activeMaintenanceExpr{
|
||||||
|
{expression: `env == "prod" || env == "staging"`},
|
||||||
|
},
|
||||||
|
labels: model.LabelSet{"env": "staging"},
|
||||||
|
want: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "expression with nested label (dotted key)",
|
||||||
|
exprs: []activeMaintenanceExpr{
|
||||||
|
{expression: `labels.env == "prod"`},
|
||||||
|
},
|
||||||
|
labels: model.LabelSet{"labels.env": "prod"},
|
||||||
|
want: true,
|
||||||
|
},
|
||||||
|
// --- ruleId as expression (user can also match ruleId via expression) ---
|
||||||
|
{
|
||||||
|
name: "expression matching specific ruleId label",
|
||||||
|
exprs: []activeMaintenanceExpr{
|
||||||
|
{expression: `ruleId == "rule-1"`},
|
||||||
|
},
|
||||||
|
labels: model.LabelSet{"ruleId": "rule-1", "env": "prod"},
|
||||||
|
want: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "expression matching specific ruleId label - non-matching",
|
||||||
|
exprs: []activeMaintenanceExpr{
|
||||||
|
{expression: `ruleId == "rule-1"`},
|
||||||
|
},
|
||||||
|
labels: model.LabelSet{"ruleId": "rule-3", "env": "prod"},
|
||||||
|
want: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
muter := NewMaintenanceExprMuter(logger)
|
||||||
|
muter.SetActiveExpressions(tc.exprs)
|
||||||
|
got := muter.Mutes(tc.labels)
|
||||||
|
assert.Equal(t, tc.want, got)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
263
pkg/alertmanager/statehistory.go
Normal file
263
pkg/alertmanager/statehistory.go
Normal file
@@ -0,0 +1,263 @@
|
|||||||
|
package alertmanager
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"strconv"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
|
||||||
|
"github.com/prometheus/alertmanager/types"
|
||||||
|
"github.com/prometheus/common/model"
|
||||||
|
)
|
||||||
|
|
||||||
|
// trackedAlert represents the last known state of a single alert series.
|
||||||
|
type trackedAlert struct {
|
||||||
|
state string // "firing" or "inactive"
|
||||||
|
labels string // JSON labels
|
||||||
|
ruleName string
|
||||||
|
value float64
|
||||||
|
lastSeen time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// ruleOverallState tracks the overall state of a rule across all its alert series.
|
||||||
|
type ruleOverallState struct {
|
||||||
|
state string // "firing" or "inactive"
|
||||||
|
}
|
||||||
|
|
||||||
|
// stateTracker maintains per-org, per-rule, per-fingerprint alert state
|
||||||
|
// to detect state transitions when PutAlerts is called.
|
||||||
|
type stateTracker struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
alerts map[string]map[string]map[uint64]*trackedAlert // orgID → ruleID → fingerprint → state
|
||||||
|
overallState map[string]map[string]*ruleOverallState // orgID → ruleID → overall state
|
||||||
|
}
|
||||||
|
|
||||||
|
func newStateTracker() *stateTracker {
|
||||||
|
return &stateTracker{
|
||||||
|
alerts: make(map[string]map[string]map[uint64]*trackedAlert),
|
||||||
|
overallState: make(map[string]map[string]*ruleOverallState),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// processAlerts detects state transitions from incoming alerts and returns
|
||||||
|
// RuleStateHistory entries for transitions only.
|
||||||
|
func (t *stateTracker) processAlerts(orgID string, alerts []*types.Alert, now time.Time) []alertmanagertypes.RuleStateHistory {
|
||||||
|
t.mu.Lock()
|
||||||
|
defer t.mu.Unlock()
|
||||||
|
|
||||||
|
if _, ok := t.alerts[orgID]; !ok {
|
||||||
|
t.alerts[orgID] = make(map[string]map[uint64]*trackedAlert)
|
||||||
|
}
|
||||||
|
if _, ok := t.overallState[orgID]; !ok {
|
||||||
|
t.overallState[orgID] = make(map[string]*ruleOverallState)
|
||||||
|
}
|
||||||
|
|
||||||
|
var entries []alertmanagertypes.RuleStateHistory
|
||||||
|
|
||||||
|
// Track which rules were affected in this batch for overall_state computation.
|
||||||
|
affectedRules := make(map[string]bool)
|
||||||
|
|
||||||
|
for _, alert := range alerts {
|
||||||
|
ruleID := string(alert.Labels[model.LabelName("ruleId")])
|
||||||
|
if ruleID == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
fp := uint64(alert.Fingerprint())
|
||||||
|
ruleName := string(alert.Labels[model.LabelName("alertname")])
|
||||||
|
labelsJSON := labelsToJSON(alert.Labels)
|
||||||
|
value := valueFromAnnotations(alert.Annotations)
|
||||||
|
|
||||||
|
var newState string
|
||||||
|
if !alert.EndsAt.IsZero() && !alert.EndsAt.After(now) {
|
||||||
|
newState = "inactive"
|
||||||
|
} else {
|
||||||
|
newState = "firing"
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, ok := t.alerts[orgID][ruleID]; !ok {
|
||||||
|
t.alerts[orgID][ruleID] = make(map[uint64]*trackedAlert)
|
||||||
|
}
|
||||||
|
|
||||||
|
tracked, exists := t.alerts[orgID][ruleID][fp]
|
||||||
|
|
||||||
|
if !exists {
|
||||||
|
// First time seeing this alert.
|
||||||
|
t.alerts[orgID][ruleID][fp] = &trackedAlert{
|
||||||
|
state: newState,
|
||||||
|
labels: labelsJSON,
|
||||||
|
ruleName: ruleName,
|
||||||
|
value: value,
|
||||||
|
lastSeen: now,
|
||||||
|
}
|
||||||
|
if newState == "firing" {
|
||||||
|
// New firing alert — record transition.
|
||||||
|
entries = append(entries, alertmanagertypes.RuleStateHistory{
|
||||||
|
OrgID: orgID,
|
||||||
|
RuleID: ruleID,
|
||||||
|
RuleName: ruleName,
|
||||||
|
State: "firing",
|
||||||
|
StateChanged: true,
|
||||||
|
UnixMilli: now.UnixMilli(),
|
||||||
|
Labels: labelsJSON,
|
||||||
|
Fingerprint: fp,
|
||||||
|
Value: value,
|
||||||
|
})
|
||||||
|
affectedRules[ruleID] = true
|
||||||
|
}
|
||||||
|
// Not found + resolved: no-op (we didn't track it firing).
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Alert exists in tracker — check for transition.
|
||||||
|
tracked.lastSeen = now
|
||||||
|
tracked.value = value
|
||||||
|
tracked.labels = labelsJSON
|
||||||
|
|
||||||
|
if tracked.state != newState {
|
||||||
|
// State transition detected.
|
||||||
|
tracked.state = newState
|
||||||
|
entries = append(entries, alertmanagertypes.RuleStateHistory{
|
||||||
|
OrgID: orgID,
|
||||||
|
RuleID: ruleID,
|
||||||
|
RuleName: ruleName,
|
||||||
|
State: newState,
|
||||||
|
StateChanged: true,
|
||||||
|
UnixMilli: now.UnixMilli(),
|
||||||
|
Labels: labelsJSON,
|
||||||
|
Fingerprint: fp,
|
||||||
|
Value: value,
|
||||||
|
})
|
||||||
|
affectedRules[ruleID] = true
|
||||||
|
}
|
||||||
|
// Same state — no transition, nothing to record.
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compute overall_state for affected rules and set on entries.
|
||||||
|
for ruleID := range affectedRules {
|
||||||
|
currentOverall := t.computeOverallState(orgID, ruleID)
|
||||||
|
prevOverall, hasPrev := t.overallState[orgID][ruleID]
|
||||||
|
|
||||||
|
overallChanged := !hasPrev || prevOverall.state != currentOverall
|
||||||
|
if !hasPrev {
|
||||||
|
t.overallState[orgID][ruleID] = &ruleOverallState{state: currentOverall}
|
||||||
|
} else {
|
||||||
|
prevOverall.state = currentOverall
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set overall_state on all entries for this rule.
|
||||||
|
for i := range entries {
|
||||||
|
if entries[i].RuleID == ruleID {
|
||||||
|
entries[i].OverallState = currentOverall
|
||||||
|
entries[i].OverallStateChanged = overallChanged
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return entries
|
||||||
|
}
|
||||||
|
|
||||||
|
// computeOverallState returns "firing" if any tracked alert for the rule is firing.
|
||||||
|
func (t *stateTracker) computeOverallState(orgID, ruleID string) string {
|
||||||
|
ruleAlerts, ok := t.alerts[orgID][ruleID]
|
||||||
|
if !ok {
|
||||||
|
return "inactive"
|
||||||
|
}
|
||||||
|
for _, a := range ruleAlerts {
|
||||||
|
if a.state == "firing" {
|
||||||
|
return "firing"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return "inactive"
|
||||||
|
}
|
||||||
|
|
||||||
|
// sweepStale finds alerts that haven't been updated within staleTimeout and
|
||||||
|
// records them as resolved. Returns transition entries grouped by orgID.
|
||||||
|
func (t *stateTracker) sweepStale(staleTimeout time.Duration, now time.Time) map[string][]alertmanagertypes.RuleStateHistory {
|
||||||
|
t.mu.Lock()
|
||||||
|
defer t.mu.Unlock()
|
||||||
|
|
||||||
|
result := make(map[string][]alertmanagertypes.RuleStateHistory)
|
||||||
|
affectedRules := make(map[string]map[string]bool) // orgID → ruleID → true
|
||||||
|
|
||||||
|
for orgID, rules := range t.alerts {
|
||||||
|
for ruleID, fingerprints := range rules {
|
||||||
|
for fp, tracked := range fingerprints {
|
||||||
|
if tracked.state != "firing" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if now.Sub(tracked.lastSeen) <= staleTimeout {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stale firing alert — mark as resolved.
|
||||||
|
tracked.state = "inactive"
|
||||||
|
result[orgID] = append(result[orgID], alertmanagertypes.RuleStateHistory{
|
||||||
|
OrgID: orgID,
|
||||||
|
RuleID: ruleID,
|
||||||
|
RuleName: tracked.ruleName,
|
||||||
|
State: "inactive",
|
||||||
|
StateChanged: true,
|
||||||
|
UnixMilli: now.UnixMilli(),
|
||||||
|
Labels: tracked.labels,
|
||||||
|
Fingerprint: fp,
|
||||||
|
Value: tracked.value,
|
||||||
|
})
|
||||||
|
|
||||||
|
if affectedRules[orgID] == nil {
|
||||||
|
affectedRules[orgID] = make(map[string]bool)
|
||||||
|
}
|
||||||
|
affectedRules[orgID][ruleID] = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compute overall_state for affected rules.
|
||||||
|
for orgID, rules := range affectedRules {
|
||||||
|
for ruleID := range rules {
|
||||||
|
currentOverall := t.computeOverallState(orgID, ruleID)
|
||||||
|
prevOverall, hasPrev := t.overallState[orgID][ruleID]
|
||||||
|
|
||||||
|
overallChanged := !hasPrev || prevOverall.state != currentOverall
|
||||||
|
if hasPrev {
|
||||||
|
prevOverall.state = currentOverall
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range result[orgID] {
|
||||||
|
if result[orgID][i].RuleID == ruleID {
|
||||||
|
result[orgID][i].OverallState = currentOverall
|
||||||
|
result[orgID][i].OverallStateChanged = overallChanged
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// labelsToJSON converts a model.LabelSet to a JSON string.
|
||||||
|
func labelsToJSON(ls model.LabelSet) string {
|
||||||
|
m := make(map[string]string, len(ls))
|
||||||
|
for k, v := range ls {
|
||||||
|
m[string(k)] = string(v)
|
||||||
|
}
|
||||||
|
b, err := json.Marshal(m)
|
||||||
|
if err != nil {
|
||||||
|
return "{}"
|
||||||
|
}
|
||||||
|
return string(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
// valueFromAnnotations extracts the metric value from alert annotations.
|
||||||
|
func valueFromAnnotations(annotations model.LabelSet) float64 {
|
||||||
|
valStr := string(annotations[model.LabelName("value")])
|
||||||
|
if valStr == "" {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
v, err := strconv.ParseFloat(valStr, 64)
|
||||||
|
if err != nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return v
|
||||||
|
}
|
||||||
328
pkg/alertmanager/statehistory_test.go
Normal file
328
pkg/alertmanager/statehistory_test.go
Normal file
@@ -0,0 +1,328 @@
|
|||||||
|
package alertmanager
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/prometheus/alertmanager/types"
|
||||||
|
"github.com/prometheus/common/model"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func makeAlert(ruleID, alertname string, firing bool, now time.Time, extraLabels map[string]string) *types.Alert {
|
||||||
|
labels := model.LabelSet{
|
||||||
|
"ruleId": model.LabelValue(ruleID),
|
||||||
|
"alertname": model.LabelValue(alertname),
|
||||||
|
}
|
||||||
|
for k, v := range extraLabels {
|
||||||
|
labels[model.LabelName(k)] = model.LabelValue(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
alert := &types.Alert{
|
||||||
|
Alert: model.Alert{
|
||||||
|
Labels: labels,
|
||||||
|
Annotations: model.LabelSet{"value": "42.5"},
|
||||||
|
StartsAt: now.Add(-1 * time.Minute),
|
||||||
|
},
|
||||||
|
UpdatedAt: now,
|
||||||
|
}
|
||||||
|
|
||||||
|
if firing {
|
||||||
|
alert.EndsAt = now.Add(5 * time.Minute) // future = firing
|
||||||
|
} else {
|
||||||
|
alert.EndsAt = now.Add(-10 * time.Second) // past = resolved
|
||||||
|
}
|
||||||
|
|
||||||
|
return alert
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestProcessAlerts_NewFiringAlert(t *testing.T) {
|
||||||
|
tracker := newStateTracker()
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
alerts := []*types.Alert{
|
||||||
|
makeAlert("rule-1", "HighCPU", true, now, map[string]string{"host": "server-1"}),
|
||||||
|
}
|
||||||
|
|
||||||
|
entries := tracker.processAlerts("org-1", alerts, now)
|
||||||
|
|
||||||
|
require.Len(t, entries, 1)
|
||||||
|
assert.Equal(t, "firing", entries[0].State)
|
||||||
|
assert.Equal(t, "rule-1", entries[0].RuleID)
|
||||||
|
assert.Equal(t, "HighCPU", entries[0].RuleName)
|
||||||
|
assert.Equal(t, "org-1", entries[0].OrgID)
|
||||||
|
assert.Equal(t, true, entries[0].StateChanged)
|
||||||
|
assert.Equal(t, 42.5, entries[0].Value)
|
||||||
|
assert.Equal(t, now.UnixMilli(), entries[0].UnixMilli)
|
||||||
|
assert.Equal(t, "firing", entries[0].OverallState)
|
||||||
|
assert.Equal(t, true, entries[0].OverallStateChanged)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestProcessAlerts_StillFiringNoTransition(t *testing.T) {
|
||||||
|
tracker := newStateTracker()
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
alerts := []*types.Alert{
|
||||||
|
makeAlert("rule-1", "HighCPU", true, now, map[string]string{"host": "server-1"}),
|
||||||
|
}
|
||||||
|
|
||||||
|
// First call: new firing.
|
||||||
|
entries := tracker.processAlerts("org-1", alerts, now)
|
||||||
|
require.Len(t, entries, 1)
|
||||||
|
|
||||||
|
// Second call: still firing — no transition.
|
||||||
|
entries = tracker.processAlerts("org-1", alerts, now.Add(1*time.Minute))
|
||||||
|
assert.Empty(t, entries)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestProcessAlerts_FiringThenResolved(t *testing.T) {
|
||||||
|
tracker := newStateTracker()
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
// First: fire the alert.
|
||||||
|
firingAlerts := []*types.Alert{
|
||||||
|
makeAlert("rule-1", "HighCPU", true, now, map[string]string{"host": "server-1"}),
|
||||||
|
}
|
||||||
|
entries := tracker.processAlerts("org-1", firingAlerts, now)
|
||||||
|
require.Len(t, entries, 1)
|
||||||
|
assert.Equal(t, "firing", entries[0].State)
|
||||||
|
|
||||||
|
// Second: resolve the alert.
|
||||||
|
resolvedAlerts := []*types.Alert{
|
||||||
|
makeAlert("rule-1", "HighCPU", false, now.Add(5*time.Minute), map[string]string{"host": "server-1"}),
|
||||||
|
}
|
||||||
|
entries = tracker.processAlerts("org-1", resolvedAlerts, now.Add(5*time.Minute))
|
||||||
|
require.Len(t, entries, 1)
|
||||||
|
assert.Equal(t, "inactive", entries[0].State)
|
||||||
|
assert.Equal(t, "rule-1", entries[0].RuleID)
|
||||||
|
assert.Equal(t, "inactive", entries[0].OverallState)
|
||||||
|
assert.Equal(t, true, entries[0].OverallStateChanged)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestProcessAlerts_ResolvedWithoutPriorFiring(t *testing.T) {
|
||||||
|
tracker := newStateTracker()
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
// A resolved alert arriving without prior tracking should produce no entry.
|
||||||
|
alerts := []*types.Alert{
|
||||||
|
makeAlert("rule-1", "HighCPU", false, now, map[string]string{"host": "server-1"}),
|
||||||
|
}
|
||||||
|
|
||||||
|
entries := tracker.processAlerts("org-1", alerts, now)
|
||||||
|
assert.Empty(t, entries)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestProcessAlerts_ReFiring(t *testing.T) {
|
||||||
|
tracker := newStateTracker()
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
// Fire.
|
||||||
|
entries := tracker.processAlerts("org-1", []*types.Alert{
|
||||||
|
makeAlert("rule-1", "HighCPU", true, now, map[string]string{"host": "server-1"}),
|
||||||
|
}, now)
|
||||||
|
require.Len(t, entries, 1)
|
||||||
|
assert.Equal(t, "firing", entries[0].State)
|
||||||
|
|
||||||
|
// Resolve.
|
||||||
|
entries = tracker.processAlerts("org-1", []*types.Alert{
|
||||||
|
makeAlert("rule-1", "HighCPU", false, now.Add(5*time.Minute), map[string]string{"host": "server-1"}),
|
||||||
|
}, now.Add(5*time.Minute))
|
||||||
|
require.Len(t, entries, 1)
|
||||||
|
assert.Equal(t, "inactive", entries[0].State)
|
||||||
|
|
||||||
|
// Re-fire.
|
||||||
|
entries = tracker.processAlerts("org-1", []*types.Alert{
|
||||||
|
makeAlert("rule-1", "HighCPU", true, now.Add(10*time.Minute), map[string]string{"host": "server-1"}),
|
||||||
|
}, now.Add(10*time.Minute))
|
||||||
|
require.Len(t, entries, 1)
|
||||||
|
assert.Equal(t, "firing", entries[0].State)
|
||||||
|
assert.Equal(t, "firing", entries[0].OverallState)
|
||||||
|
assert.Equal(t, true, entries[0].OverallStateChanged)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestProcessAlerts_OverallStateComputation(t *testing.T) {
|
||||||
|
tracker := newStateTracker()
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
// Fire two series for the same rule.
|
||||||
|
entries := tracker.processAlerts("org-1", []*types.Alert{
|
||||||
|
makeAlert("rule-1", "HighCPU", true, now, map[string]string{"host": "server-1"}),
|
||||||
|
makeAlert("rule-1", "HighCPU", true, now, map[string]string{"host": "server-2"}),
|
||||||
|
}, now)
|
||||||
|
require.Len(t, entries, 2)
|
||||||
|
assert.Equal(t, "firing", entries[0].OverallState)
|
||||||
|
assert.Equal(t, "firing", entries[1].OverallState)
|
||||||
|
|
||||||
|
// Resolve only one series — overall should still be "firing".
|
||||||
|
entries = tracker.processAlerts("org-1", []*types.Alert{
|
||||||
|
makeAlert("rule-1", "HighCPU", false, now.Add(5*time.Minute), map[string]string{"host": "server-1"}),
|
||||||
|
}, now.Add(5*time.Minute))
|
||||||
|
require.Len(t, entries, 1)
|
||||||
|
assert.Equal(t, "inactive", entries[0].State)
|
||||||
|
assert.Equal(t, "firing", entries[0].OverallState)
|
||||||
|
assert.Equal(t, false, entries[0].OverallStateChanged) // still firing overall
|
||||||
|
|
||||||
|
// Resolve the second series — overall should transition to "inactive".
|
||||||
|
entries = tracker.processAlerts("org-1", []*types.Alert{
|
||||||
|
makeAlert("rule-1", "HighCPU", false, now.Add(6*time.Minute), map[string]string{"host": "server-2"}),
|
||||||
|
}, now.Add(6*time.Minute))
|
||||||
|
require.Len(t, entries, 1)
|
||||||
|
assert.Equal(t, "inactive", entries[0].State)
|
||||||
|
assert.Equal(t, "inactive", entries[0].OverallState)
|
||||||
|
assert.Equal(t, true, entries[0].OverallStateChanged) // transitioned to inactive
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestProcessAlerts_MultipleRulesIndependent(t *testing.T) {
|
||||||
|
tracker := newStateTracker()
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
entries := tracker.processAlerts("org-1", []*types.Alert{
|
||||||
|
makeAlert("rule-1", "HighCPU", true, now, map[string]string{"host": "server-1"}),
|
||||||
|
makeAlert("rule-2", "HighMem", true, now, map[string]string{"host": "server-1"}),
|
||||||
|
}, now)
|
||||||
|
|
||||||
|
require.Len(t, entries, 2)
|
||||||
|
// Each rule has its own overall state.
|
||||||
|
assert.Equal(t, "rule-1", entries[0].RuleID)
|
||||||
|
assert.Equal(t, "rule-2", entries[1].RuleID)
|
||||||
|
assert.Equal(t, "firing", entries[0].OverallState)
|
||||||
|
assert.Equal(t, "firing", entries[1].OverallState)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestProcessAlerts_AlertWithoutRuleIDSkipped(t *testing.T) {
|
||||||
|
tracker := newStateTracker()
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
alert := &types.Alert{
|
||||||
|
Alert: model.Alert{
|
||||||
|
Labels: model.LabelSet{"alertname": "NoRuleID"},
|
||||||
|
StartsAt: now.Add(-1 * time.Minute),
|
||||||
|
EndsAt: now.Add(5 * time.Minute),
|
||||||
|
},
|
||||||
|
UpdatedAt: now,
|
||||||
|
}
|
||||||
|
|
||||||
|
entries := tracker.processAlerts("org-1", []*types.Alert{alert}, now)
|
||||||
|
assert.Empty(t, entries)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestProcessAlerts_MultipleOrgs(t *testing.T) {
|
||||||
|
tracker := newStateTracker()
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
// Org 1 fires.
|
||||||
|
entries1 := tracker.processAlerts("org-1", []*types.Alert{
|
||||||
|
makeAlert("rule-1", "HighCPU", true, now, nil),
|
||||||
|
}, now)
|
||||||
|
require.Len(t, entries1, 1)
|
||||||
|
assert.Equal(t, "org-1", entries1[0].OrgID)
|
||||||
|
|
||||||
|
// Org 2 fires same rule ID — independent tracking.
|
||||||
|
entries2 := tracker.processAlerts("org-2", []*types.Alert{
|
||||||
|
makeAlert("rule-1", "HighCPU", true, now, nil),
|
||||||
|
}, now)
|
||||||
|
require.Len(t, entries2, 1)
|
||||||
|
assert.Equal(t, "org-2", entries2[0].OrgID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSweepStale_FiringAlertBecomesInactive(t *testing.T) {
|
||||||
|
tracker := newStateTracker()
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
// Fire an alert.
|
||||||
|
tracker.processAlerts("org-1", []*types.Alert{
|
||||||
|
makeAlert("rule-1", "HighCPU", true, now, map[string]string{"host": "server-1"}),
|
||||||
|
}, now)
|
||||||
|
|
||||||
|
// Sweep with staleTimeout = 5 minutes, 10 minutes later.
|
||||||
|
result := tracker.sweepStale(5*time.Minute, now.Add(10*time.Minute))
|
||||||
|
|
||||||
|
require.Len(t, result["org-1"], 1)
|
||||||
|
assert.Equal(t, "inactive", result["org-1"][0].State)
|
||||||
|
assert.Equal(t, "rule-1", result["org-1"][0].RuleID)
|
||||||
|
assert.Equal(t, "inactive", result["org-1"][0].OverallState)
|
||||||
|
assert.Equal(t, true, result["org-1"][0].OverallStateChanged)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSweepStale_RecentAlertNotSwept(t *testing.T) {
|
||||||
|
tracker := newStateTracker()
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
// Fire an alert.
|
||||||
|
tracker.processAlerts("org-1", []*types.Alert{
|
||||||
|
makeAlert("rule-1", "HighCPU", true, now, map[string]string{"host": "server-1"}),
|
||||||
|
}, now)
|
||||||
|
|
||||||
|
// Sweep with staleTimeout = 10 minutes, only 2 minutes later.
|
||||||
|
result := tracker.sweepStale(10*time.Minute, now.Add(2*time.Minute))
|
||||||
|
assert.Empty(t, result)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSweepStale_InactiveAlertNotSwept(t *testing.T) {
|
||||||
|
tracker := newStateTracker()
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
// Fire then resolve.
|
||||||
|
tracker.processAlerts("org-1", []*types.Alert{
|
||||||
|
makeAlert("rule-1", "HighCPU", true, now, nil),
|
||||||
|
}, now)
|
||||||
|
tracker.processAlerts("org-1", []*types.Alert{
|
||||||
|
makeAlert("rule-1", "HighCPU", false, now.Add(1*time.Minute), nil),
|
||||||
|
}, now.Add(1*time.Minute))
|
||||||
|
|
||||||
|
// Sweep much later — should produce nothing since alert is already inactive.
|
||||||
|
result := tracker.sweepStale(5*time.Minute, now.Add(30*time.Minute))
|
||||||
|
assert.Empty(t, result)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLabelsToJSON(t *testing.T) {
|
||||||
|
ls := model.LabelSet{
|
||||||
|
"alertname": "HighCPU",
|
||||||
|
"env": "prod",
|
||||||
|
}
|
||||||
|
|
||||||
|
result := labelsToJSON(ls)
|
||||||
|
|
||||||
|
// Parse back and verify.
|
||||||
|
parsed := labelsFromJSON(result)
|
||||||
|
require.NotNil(t, parsed)
|
||||||
|
assert.Equal(t, model.LabelValue("HighCPU"), parsed["alertname"])
|
||||||
|
assert.Equal(t, model.LabelValue("prod"), parsed["env"])
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestValueFromAnnotations(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
annotations model.LabelSet
|
||||||
|
want float64
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "valid float",
|
||||||
|
annotations: model.LabelSet{"value": "42.5"},
|
||||||
|
want: 42.5,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "empty value",
|
||||||
|
annotations: model.LabelSet{},
|
||||||
|
want: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "invalid value",
|
||||||
|
annotations: model.LabelSet{"value": "not-a-number"},
|
||||||
|
want: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "scientific notation",
|
||||||
|
annotations: model.LabelSet{"value": "1.5E+02"},
|
||||||
|
want: 150,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
got := valueFromAnnotations(tc.annotations)
|
||||||
|
assert.Equal(t, tc.want, got)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -26,6 +26,8 @@ import (
|
|||||||
|
|
||||||
"github.com/SigNoz/signoz/pkg/alertmanager"
|
"github.com/SigNoz/signoz/pkg/alertmanager"
|
||||||
"github.com/SigNoz/signoz/pkg/apis/fields"
|
"github.com/SigNoz/signoz/pkg/apis/fields"
|
||||||
|
"github.com/SigNoz/signoz/pkg/http/handler"
|
||||||
|
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
|
||||||
errorsV2 "github.com/SigNoz/signoz/pkg/errors"
|
errorsV2 "github.com/SigNoz/signoz/pkg/errors"
|
||||||
"github.com/SigNoz/signoz/pkg/http/middleware"
|
"github.com/SigNoz/signoz/pkg/http/middleware"
|
||||||
"github.com/SigNoz/signoz/pkg/http/render"
|
"github.com/SigNoz/signoz/pkg/http/render"
|
||||||
@@ -492,18 +494,109 @@ func (aH *APIHandler) Respond(w http.ResponseWriter, data interface{}) {
|
|||||||
func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *middleware.AuthZ) {
|
func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *middleware.AuthZ) {
|
||||||
router.HandleFunc("/api/v1/query_range", am.ViewAccess(aH.queryRangeMetrics)).Methods(http.MethodGet)
|
router.HandleFunc("/api/v1/query_range", am.ViewAccess(aH.queryRangeMetrics)).Methods(http.MethodGet)
|
||||||
router.HandleFunc("/api/v1/query", am.ViewAccess(aH.queryMetrics)).Methods(http.MethodGet)
|
router.HandleFunc("/api/v1/query", am.ViewAccess(aH.queryMetrics)).Methods(http.MethodGet)
|
||||||
router.HandleFunc("/api/v1/channels", am.ViewAccess(aH.AlertmanagerAPI.ListChannels)).Methods(http.MethodGet)
|
router.Handle("/api/v1/channels", handler.New(am.ViewAccess(aH.AlertmanagerAPI.ListChannels), handler.OpenAPIDef{
|
||||||
router.HandleFunc("/api/v1/channels/{id}", am.ViewAccess(aH.AlertmanagerAPI.GetChannelByID)).Methods(http.MethodGet)
|
ID: "ListChannels",
|
||||||
router.HandleFunc("/api/v1/channels/{id}", am.AdminAccess(aH.AlertmanagerAPI.UpdateChannelByID)).Methods(http.MethodPut)
|
Tags: []string{"channels"},
|
||||||
router.HandleFunc("/api/v1/channels/{id}", am.AdminAccess(aH.AlertmanagerAPI.DeleteChannelByID)).Methods(http.MethodDelete)
|
Summary: "List notification channels",
|
||||||
router.HandleFunc("/api/v1/channels", am.EditAccess(aH.AlertmanagerAPI.CreateChannel)).Methods(http.MethodPost)
|
Description: "Returns all notification channels for the organization.",
|
||||||
router.HandleFunc("/api/v1/testChannel", am.EditAccess(aH.AlertmanagerAPI.TestReceiver)).Methods(http.MethodPost)
|
Response: make([]*alertmanagertypes.Channel, 0),
|
||||||
|
ResponseContentType: "application/json",
|
||||||
|
SuccessStatusCode: http.StatusOK,
|
||||||
|
})).Methods(http.MethodGet)
|
||||||
|
router.Handle("/api/v1/channels/{id}", handler.New(am.ViewAccess(aH.AlertmanagerAPI.GetChannelByID), handler.OpenAPIDef{
|
||||||
|
ID: "GetChannelByID",
|
||||||
|
Tags: []string{"channels"},
|
||||||
|
Summary: "Get a notification channel",
|
||||||
|
Description: "Returns a single notification channel by ID.",
|
||||||
|
Response: new(alertmanagertypes.Channel),
|
||||||
|
ResponseContentType: "application/json",
|
||||||
|
SuccessStatusCode: http.StatusOK,
|
||||||
|
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound},
|
||||||
|
})).Methods(http.MethodGet)
|
||||||
|
router.Handle("/api/v1/channels/{id}", handler.New(am.AdminAccess(aH.AlertmanagerAPI.UpdateChannelByID), handler.OpenAPIDef{
|
||||||
|
ID: "UpdateChannelByID",
|
||||||
|
Tags: []string{"channels"},
|
||||||
|
Summary: "Update a notification channel",
|
||||||
|
Description: "Updates a notification channel by ID.",
|
||||||
|
SuccessStatusCode: http.StatusNoContent,
|
||||||
|
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound},
|
||||||
|
})).Methods(http.MethodPut)
|
||||||
|
router.Handle("/api/v1/channels/{id}", handler.New(am.AdminAccess(aH.AlertmanagerAPI.DeleteChannelByID), handler.OpenAPIDef{
|
||||||
|
ID: "DeleteChannelByID",
|
||||||
|
Tags: []string{"channels"},
|
||||||
|
Summary: "Delete a notification channel",
|
||||||
|
Description: "Deletes a notification channel by ID.",
|
||||||
|
SuccessStatusCode: http.StatusNoContent,
|
||||||
|
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound},
|
||||||
|
})).Methods(http.MethodDelete)
|
||||||
|
router.Handle("/api/v1/channels", handler.New(am.EditAccess(aH.AlertmanagerAPI.CreateChannel), handler.OpenAPIDef{
|
||||||
|
ID: "CreateChannel",
|
||||||
|
Tags: []string{"channels"},
|
||||||
|
Summary: "Create a notification channel",
|
||||||
|
Description: "Creates a new notification channel.",
|
||||||
|
Response: new(alertmanagertypes.Channel),
|
||||||
|
ResponseContentType: "application/json",
|
||||||
|
SuccessStatusCode: http.StatusCreated,
|
||||||
|
ErrorStatusCodes: []int{http.StatusBadRequest},
|
||||||
|
})).Methods(http.MethodPost)
|
||||||
|
router.Handle("/api/v1/testChannel", handler.New(am.EditAccess(aH.AlertmanagerAPI.TestReceiver), handler.OpenAPIDef{
|
||||||
|
ID: "TestReceiver",
|
||||||
|
Tags: []string{"channels"},
|
||||||
|
Summary: "Test a notification channel",
|
||||||
|
Description: "Sends a test alert to a receiver configuration.",
|
||||||
|
SuccessStatusCode: http.StatusNoContent,
|
||||||
|
ErrorStatusCodes: []int{http.StatusBadRequest},
|
||||||
|
})).Methods(http.MethodPost)
|
||||||
|
|
||||||
router.HandleFunc("/api/v1/route_policies", am.ViewAccess(aH.AlertmanagerAPI.GetAllRoutePolicies)).Methods(http.MethodGet)
|
router.Handle("/api/v1/route_policies", handler.New(am.ViewAccess(aH.AlertmanagerAPI.GetAllRoutePolicies), handler.OpenAPIDef{
|
||||||
router.HandleFunc("/api/v1/route_policies/{id}", am.ViewAccess(aH.AlertmanagerAPI.GetRoutePolicyByID)).Methods(http.MethodGet)
|
ID: "GetAllRoutePolicies",
|
||||||
router.HandleFunc("/api/v1/route_policies", am.AdminAccess(aH.AlertmanagerAPI.CreateRoutePolicy)).Methods(http.MethodPost)
|
Tags: []string{"route_policies"},
|
||||||
router.HandleFunc("/api/v1/route_policies/{id}", am.AdminAccess(aH.AlertmanagerAPI.DeleteRoutePolicyByID)).Methods(http.MethodDelete)
|
Summary: "List route policies",
|
||||||
router.HandleFunc("/api/v1/route_policies/{id}", am.AdminAccess(aH.AlertmanagerAPI.UpdateRoutePolicy)).Methods(http.MethodPut)
|
Description: "Returns all notification route policies.",
|
||||||
|
Response: make([]*alertmanagertypes.GettableRoutePolicy, 0),
|
||||||
|
ResponseContentType: "application/json",
|
||||||
|
SuccessStatusCode: http.StatusOK,
|
||||||
|
})).Methods(http.MethodGet)
|
||||||
|
router.Handle("/api/v1/route_policies/{id}", handler.New(am.ViewAccess(aH.AlertmanagerAPI.GetRoutePolicyByID), handler.OpenAPIDef{
|
||||||
|
ID: "GetRoutePolicyByID",
|
||||||
|
Tags: []string{"route_policies"},
|
||||||
|
Summary: "Get a route policy",
|
||||||
|
Description: "Returns a single notification route policy by ID.",
|
||||||
|
Response: new(alertmanagertypes.GettableRoutePolicy),
|
||||||
|
ResponseContentType: "application/json",
|
||||||
|
SuccessStatusCode: http.StatusOK,
|
||||||
|
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound},
|
||||||
|
})).Methods(http.MethodGet)
|
||||||
|
router.Handle("/api/v1/route_policies", handler.New(am.AdminAccess(aH.AlertmanagerAPI.CreateRoutePolicy), handler.OpenAPIDef{
|
||||||
|
ID: "CreateRoutePolicy",
|
||||||
|
Tags: []string{"route_policies"},
|
||||||
|
Summary: "Create a route policy",
|
||||||
|
Description: "Creates a new notification route policy.",
|
||||||
|
Request: new(alertmanagertypes.PostableRoutePolicy),
|
||||||
|
Response: new(alertmanagertypes.GettableRoutePolicy),
|
||||||
|
ResponseContentType: "application/json",
|
||||||
|
SuccessStatusCode: http.StatusCreated,
|
||||||
|
ErrorStatusCodes: []int{http.StatusBadRequest},
|
||||||
|
})).Methods(http.MethodPost)
|
||||||
|
router.Handle("/api/v1/route_policies/{id}", handler.New(am.AdminAccess(aH.AlertmanagerAPI.DeleteRoutePolicyByID), handler.OpenAPIDef{
|
||||||
|
ID: "DeleteRoutePolicyByID",
|
||||||
|
Tags: []string{"route_policies"},
|
||||||
|
Summary: "Delete a route policy",
|
||||||
|
Description: "Deletes a notification route policy by ID.",
|
||||||
|
SuccessStatusCode: http.StatusNoContent,
|
||||||
|
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound},
|
||||||
|
})).Methods(http.MethodDelete)
|
||||||
|
router.Handle("/api/v1/route_policies/{id}", handler.New(am.AdminAccess(aH.AlertmanagerAPI.UpdateRoutePolicy), handler.OpenAPIDef{
|
||||||
|
ID: "UpdateRoutePolicy",
|
||||||
|
Tags: []string{"route_policies"},
|
||||||
|
Summary: "Update a route policy",
|
||||||
|
Description: "Updates a notification route policy by ID.",
|
||||||
|
Request: new(alertmanagertypes.PostableRoutePolicy),
|
||||||
|
Response: new(alertmanagertypes.GettableRoutePolicy),
|
||||||
|
ResponseContentType: "application/json",
|
||||||
|
SuccessStatusCode: http.StatusOK,
|
||||||
|
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound},
|
||||||
|
})).Methods(http.MethodPut)
|
||||||
|
|
||||||
router.HandleFunc("/api/v1/alerts", am.ViewAccess(aH.AlertmanagerAPI.GetAlerts)).Methods(http.MethodGet)
|
router.HandleFunc("/api/v1/alerts", am.ViewAccess(aH.AlertmanagerAPI.GetAlerts)).Methods(http.MethodGet)
|
||||||
|
|
||||||
@@ -525,6 +618,103 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *middleware.AuthZ) {
|
|||||||
router.HandleFunc("/api/v1/downtime_schedules/{id}", am.EditAccess(aH.editDowntimeSchedule)).Methods(http.MethodPut)
|
router.HandleFunc("/api/v1/downtime_schedules/{id}", am.EditAccess(aH.editDowntimeSchedule)).Methods(http.MethodPut)
|
||||||
router.HandleFunc("/api/v1/downtime_schedules/{id}", am.EditAccess(aH.deleteDowntimeSchedule)).Methods(http.MethodDelete)
|
router.HandleFunc("/api/v1/downtime_schedules/{id}", am.EditAccess(aH.deleteDowntimeSchedule)).Methods(http.MethodDelete)
|
||||||
|
|
||||||
|
// V2 downtime schedules (alertmanager-based)
|
||||||
|
router.Handle("/api/v2/downtime_schedules", handler.New(am.ViewAccess(aH.AlertmanagerAPI.ListDowntimeSchedules), handler.OpenAPIDef{
|
||||||
|
ID: "ListDowntimeSchedules",
|
||||||
|
Tags: []string{"downtime_schedules"},
|
||||||
|
Summary: "List downtime schedules",
|
||||||
|
Description: "Returns all planned maintenance schedules for the organization. Supports filtering by active and recurring query parameters.",
|
||||||
|
Response: make([]*alertmanagertypes.GettablePlannedMaintenance, 0),
|
||||||
|
ResponseContentType: "application/json",
|
||||||
|
SuccessStatusCode: http.StatusOK,
|
||||||
|
})).Methods(http.MethodGet)
|
||||||
|
router.Handle("/api/v2/downtime_schedules/{id}", handler.New(am.ViewAccess(aH.AlertmanagerAPI.GetDowntimeSchedule), handler.OpenAPIDef{
|
||||||
|
ID: "GetDowntimeSchedule",
|
||||||
|
Tags: []string{"downtime_schedules"},
|
||||||
|
Summary: "Get a downtime schedule",
|
||||||
|
Description: "Returns a single planned maintenance schedule by ID.",
|
||||||
|
Response: new(alertmanagertypes.GettablePlannedMaintenance),
|
||||||
|
ResponseContentType: "application/json",
|
||||||
|
SuccessStatusCode: http.StatusOK,
|
||||||
|
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound},
|
||||||
|
})).Methods(http.MethodGet)
|
||||||
|
router.Handle("/api/v2/downtime_schedules", handler.New(am.EditAccess(aH.AlertmanagerAPI.CreateDowntimeSchedule), handler.OpenAPIDef{
|
||||||
|
ID: "CreateDowntimeSchedule",
|
||||||
|
Tags: []string{"downtime_schedules"},
|
||||||
|
Summary: "Create a downtime schedule",
|
||||||
|
Description: "Creates a new planned maintenance schedule.",
|
||||||
|
Request: new(alertmanagertypes.GettablePlannedMaintenance),
|
||||||
|
Response: nil,
|
||||||
|
ResponseContentType: "application/json",
|
||||||
|
SuccessStatusCode: http.StatusOK,
|
||||||
|
ErrorStatusCodes: []int{http.StatusBadRequest},
|
||||||
|
})).Methods(http.MethodPost)
|
||||||
|
router.Handle("/api/v2/downtime_schedules/{id}", handler.New(am.EditAccess(aH.AlertmanagerAPI.EditDowntimeSchedule), handler.OpenAPIDef{
|
||||||
|
ID: "EditDowntimeSchedule",
|
||||||
|
Tags: []string{"downtime_schedules"},
|
||||||
|
Summary: "Update a downtime schedule",
|
||||||
|
Description: "Updates an existing planned maintenance schedule by ID.",
|
||||||
|
Request: new(alertmanagertypes.GettablePlannedMaintenance),
|
||||||
|
Response: nil,
|
||||||
|
ResponseContentType: "application/json",
|
||||||
|
SuccessStatusCode: http.StatusOK,
|
||||||
|
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound},
|
||||||
|
})).Methods(http.MethodPut)
|
||||||
|
router.Handle("/api/v2/downtime_schedules/{id}", handler.New(am.EditAccess(aH.AlertmanagerAPI.DeleteDowntimeSchedule), handler.OpenAPIDef{
|
||||||
|
ID: "DeleteDowntimeSchedule",
|
||||||
|
Tags: []string{"downtime_schedules"},
|
||||||
|
Summary: "Delete a downtime schedule",
|
||||||
|
Description: "Deletes a planned maintenance schedule by ID.",
|
||||||
|
SuccessStatusCode: http.StatusNoContent,
|
||||||
|
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound},
|
||||||
|
})).Methods(http.MethodDelete)
|
||||||
|
|
||||||
|
// V2 rule state history (alertmanager-based)
|
||||||
|
router.Handle("/api/v2/rules/{id}/history/timeline", handler.New(am.ViewAccess(aH.AlertmanagerAPI.GetRuleStateHistoryTimeline), handler.OpenAPIDef{
|
||||||
|
ID: "GetRuleStateHistoryTimeline",
|
||||||
|
Tags: []string{"rule_state_history"},
|
||||||
|
Summary: "Get rule state history timeline",
|
||||||
|
Description: "Returns paginated state history entries for a rule within a time range, with optional state filter and distinct label keys for filter UI.",
|
||||||
|
Request: new(alertmanagertypes.QueryRuleStateHistory),
|
||||||
|
Response: new(alertmanagertypes.RuleStateTimeline),
|
||||||
|
ResponseContentType: "application/json",
|
||||||
|
SuccessStatusCode: http.StatusOK,
|
||||||
|
ErrorStatusCodes: []int{http.StatusBadRequest},
|
||||||
|
})).Methods(http.MethodPost)
|
||||||
|
router.Handle("/api/v2/rules/{id}/history/stats", handler.New(am.ViewAccess(aH.AlertmanagerAPI.GetRuleStats), handler.OpenAPIDef{
|
||||||
|
ID: "GetRuleStats",
|
||||||
|
Tags: []string{"rule_state_history"},
|
||||||
|
Summary: "Get rule trigger and resolution statistics",
|
||||||
|
Description: "Returns trigger counts and average resolution times for a rule, comparing the current time period against a previous period of equal length.",
|
||||||
|
Request: new(alertmanagertypes.QueryRuleStateHistory),
|
||||||
|
Response: new(alertmanagertypes.RuleStats),
|
||||||
|
ResponseContentType: "application/json",
|
||||||
|
SuccessStatusCode: http.StatusOK,
|
||||||
|
ErrorStatusCodes: []int{http.StatusBadRequest},
|
||||||
|
})).Methods(http.MethodPost)
|
||||||
|
router.Handle("/api/v2/rules/{id}/history/top_contributors", handler.New(am.ViewAccess(aH.AlertmanagerAPI.GetRuleStateHistoryTopContributors), handler.OpenAPIDef{
|
||||||
|
ID: "GetRuleStateHistoryTopContributors",
|
||||||
|
Tags: []string{"rule_state_history"},
|
||||||
|
Summary: "Get top contributing alert series",
|
||||||
|
Description: "Returns alert series (by fingerprint) that transitioned to firing most frequently for a rule within a time range, ranked by count.",
|
||||||
|
Request: new(alertmanagertypes.QueryRuleStateHistory),
|
||||||
|
Response: make([]alertmanagertypes.RuleStateHistoryContributor, 0),
|
||||||
|
ResponseContentType: "application/json",
|
||||||
|
SuccessStatusCode: http.StatusOK,
|
||||||
|
ErrorStatusCodes: []int{http.StatusBadRequest},
|
||||||
|
})).Methods(http.MethodPost)
|
||||||
|
router.Handle("/api/v2/rules/{id}/history/overall_status", handler.New(am.ViewAccess(aH.AlertmanagerAPI.GetOverallStateTransitions), handler.OpenAPIDef{
|
||||||
|
ID: "GetOverallStateTransitions",
|
||||||
|
Tags: []string{"rule_state_history"},
|
||||||
|
Summary: "Get overall state transition timeline",
|
||||||
|
Description: "Returns a timeline of contiguous firing and inactive periods for a rule within a time range, with gap-filling between transitions.",
|
||||||
|
Request: new(alertmanagertypes.QueryRuleStateHistory),
|
||||||
|
Response: make([]alertmanagertypes.RuleStateTransition, 0),
|
||||||
|
ResponseContentType: "application/json",
|
||||||
|
SuccessStatusCode: http.StatusOK,
|
||||||
|
ErrorStatusCodes: []int{http.StatusBadRequest},
|
||||||
|
})).Methods(http.MethodPost)
|
||||||
|
|
||||||
router.HandleFunc("/api/v1/dashboards", am.ViewAccess(aH.List)).Methods(http.MethodGet)
|
router.HandleFunc("/api/v1/dashboards", am.ViewAccess(aH.List)).Methods(http.MethodGet)
|
||||||
router.HandleFunc("/api/v1/dashboards", am.EditAccess(aH.Signoz.Handlers.Dashboard.Create)).Methods(http.MethodPost)
|
router.HandleFunc("/api/v1/dashboards", am.EditAccess(aH.Signoz.Handlers.Dashboard.Create)).Methods(http.MethodPost)
|
||||||
router.HandleFunc("/api/v1/dashboards/{id}", am.ViewAccess(aH.Get)).Methods(http.MethodGet)
|
router.HandleFunc("/api/v1/dashboards/{id}", am.ViewAccess(aH.Get)).Methods(http.MethodGet)
|
||||||
|
|||||||
@@ -478,15 +478,14 @@ func (r *BaseRule) RecordRuleStateHistory(ctx context.Context, prevState, curren
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(revisedItemsToAdd) > 0 && r.reader != nil {
|
if len(revisedItemsToAdd) > 0 {
|
||||||
zap.L().Debug("writing rule state history", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd))
|
zap.L().Debug("writing rule state history", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd))
|
||||||
|
|
||||||
entries := make([]model.RuleStateHistory, 0, len(revisedItemsToAdd))
|
entries := make([]model.RuleStateHistory, 0, len(revisedItemsToAdd))
|
||||||
for _, item := range revisedItemsToAdd {
|
for _, item := range revisedItemsToAdd {
|
||||||
entries = append(entries, item)
|
entries = append(entries, item)
|
||||||
}
|
}
|
||||||
err := r.reader.AddRuleStateHistory(ctx, entries)
|
if err := r.reader.AddRuleStateHistory(ctx, entries); err != nil {
|
||||||
if err != nil {
|
|
||||||
zap.L().Error("error while inserting rule state history", zap.Error(err), zap.Any("itemsToAdd", itemsToAdd))
|
zap.L().Error("error while inserting rule state history", zap.Error(err), zap.Any("itemsToAdd", itemsToAdd))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -34,7 +34,7 @@ func TestNewHandlers(t *testing.T) {
|
|||||||
orgGetter := implorganization.NewGetter(implorganization.NewStore(sqlstore), sharder)
|
orgGetter := implorganization.NewGetter(implorganization.NewStore(sqlstore), sharder)
|
||||||
notificationManager := nfmanagertest.NewMock()
|
notificationManager := nfmanagertest.NewMock()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{}, sqlstore, orgGetter, notificationManager)
|
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{}, sqlstore, orgGetter, notificationManager, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
tokenizer := tokenizertest.NewMockTokenizer(t)
|
tokenizer := tokenizertest.NewMockTokenizer(t)
|
||||||
emailing := emailingtest.New()
|
emailing := emailingtest.New()
|
||||||
|
|||||||
@@ -34,7 +34,7 @@ func TestNewModules(t *testing.T) {
|
|||||||
orgGetter := implorganization.NewGetter(implorganization.NewStore(sqlstore), sharder)
|
orgGetter := implorganization.NewGetter(implorganization.NewStore(sqlstore), sharder)
|
||||||
notificationManager := nfmanagertest.NewMock()
|
notificationManager := nfmanagertest.NewMock()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{}, sqlstore, orgGetter, notificationManager)
|
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{}, sqlstore, orgGetter, notificationManager, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
tokenizer := tokenizertest.NewMockTokenizer(t)
|
tokenizer := tokenizertest.NewMockTokenizer(t)
|
||||||
emailing := emailingtest.New()
|
emailing := emailingtest.New()
|
||||||
|
|||||||
@@ -166,6 +166,8 @@ func NewSQLMigrationProviderFactories(
|
|||||||
sqlmigration.NewAddAuthzIndexFactory(sqlstore, sqlschema),
|
sqlmigration.NewAddAuthzIndexFactory(sqlstore, sqlschema),
|
||||||
sqlmigration.NewMigrateRbacToAuthzFactory(sqlstore),
|
sqlmigration.NewMigrateRbacToAuthzFactory(sqlstore),
|
||||||
sqlmigration.NewMigratePublicDashboardsFactory(sqlstore),
|
sqlmigration.NewMigratePublicDashboardsFactory(sqlstore),
|
||||||
|
sqlmigration.NewCreatePlannedMaintenanceV2Factory(sqlstore),
|
||||||
|
sqlmigration.NewCreateRuleStateHistoryV2Factory(telemetryStore),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -191,9 +193,9 @@ func NewNotificationManagerProviderFactories(routeStore alertmanagertypes.RouteS
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewAlertmanagerProviderFactories(sqlstore sqlstore.SQLStore, orgGetter organization.Getter, nfManager nfmanager.NotificationManager) factory.NamedMap[factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config]] {
|
func NewAlertmanagerProviderFactories(sqlstore sqlstore.SQLStore, orgGetter organization.Getter, nfManager nfmanager.NotificationManager, telemetryStore telemetrystore.TelemetryStore) factory.NamedMap[factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config]] {
|
||||||
return factory.MustNewNamedMap(
|
return factory.MustNewNamedMap(
|
||||||
signozalertmanager.NewFactory(sqlstore, orgGetter, nfManager),
|
signozalertmanager.NewFactory(sqlstore, orgGetter, nfManager, telemetryStore),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -58,7 +58,7 @@ func TestNewProviderFactories(t *testing.T) {
|
|||||||
assert.NotPanics(t, func() {
|
assert.NotPanics(t, func() {
|
||||||
orgGetter := implorganization.NewGetter(implorganization.NewStore(sqlstoretest.New(sqlstore.Config{Provider: "sqlite"}, sqlmock.QueryMatcherEqual)), nil)
|
orgGetter := implorganization.NewGetter(implorganization.NewStore(sqlstoretest.New(sqlstore.Config{Provider: "sqlite"}, sqlmock.QueryMatcherEqual)), nil)
|
||||||
notificationManager := nfmanagertest.NewMock()
|
notificationManager := nfmanagertest.NewMock()
|
||||||
NewAlertmanagerProviderFactories(sqlstoretest.New(sqlstore.Config{Provider: "sqlite"}, sqlmock.QueryMatcherEqual), orgGetter, notificationManager)
|
NewAlertmanagerProviderFactories(sqlstoretest.New(sqlstore.Config{Provider: "sqlite"}, sqlmock.QueryMatcherEqual), orgGetter, notificationManager, nil)
|
||||||
})
|
})
|
||||||
|
|
||||||
assert.NotPanics(t, func() {
|
assert.NotPanics(t, func() {
|
||||||
|
|||||||
@@ -311,7 +311,7 @@ func New(
|
|||||||
ctx,
|
ctx,
|
||||||
providerSettings,
|
providerSettings,
|
||||||
config.Alertmanager,
|
config.Alertmanager,
|
||||||
NewAlertmanagerProviderFactories(sqlstore, orgGetter, nfManager),
|
NewAlertmanagerProviderFactories(sqlstore, orgGetter, nfManager, telemetrystore),
|
||||||
config.Alertmanager.Provider,
|
config.Alertmanager.Provider,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
45
pkg/sqlmigration/063_add_maintenance_matchers.go
Normal file
45
pkg/sqlmigration/063_add_maintenance_matchers.go
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
package sqlmigration
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/SigNoz/signoz/pkg/factory"
|
||||||
|
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||||
|
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
|
||||||
|
"github.com/uptrace/bun"
|
||||||
|
"github.com/uptrace/bun/migrate"
|
||||||
|
)
|
||||||
|
|
||||||
|
type createPlannedMaintenanceV2 struct {
|
||||||
|
sqlstore sqlstore.SQLStore
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewCreatePlannedMaintenanceV2Factory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[SQLMigration, Config] {
|
||||||
|
return factory.NewProviderFactory(factory.MustNewName("create_planned_maintenance_v2"), func(ctx context.Context, providerSettings factory.ProviderSettings, config Config) (SQLMigration, error) {
|
||||||
|
return &createPlannedMaintenanceV2{sqlstore: sqlstore}, nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (migration *createPlannedMaintenanceV2) Register(migrations *migrate.Migrations) error {
|
||||||
|
if err := migrations.Register(migration.Up, migration.Down); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (migration *createPlannedMaintenanceV2) Up(ctx context.Context, db *bun.DB) error {
|
||||||
|
_, err := db.NewCreateTable().
|
||||||
|
Model((*alertmanagertypes.StorablePlannedMaintenance)(nil)).
|
||||||
|
IfNotExists().
|
||||||
|
Exec(ctx)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (migration *createPlannedMaintenanceV2) Down(ctx context.Context, db *bun.DB) error {
|
||||||
|
_, err := db.NewDropTable().
|
||||||
|
Model((*alertmanagertypes.StorablePlannedMaintenance)(nil)).
|
||||||
|
IfExists().
|
||||||
|
Exec(ctx)
|
||||||
|
return err
|
||||||
|
}
|
||||||
80
pkg/sqlmigration/064_create_rule_state_history_v2.go
Normal file
80
pkg/sqlmigration/064_create_rule_state_history_v2.go
Normal file
@@ -0,0 +1,80 @@
|
|||||||
|
package sqlmigration
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/SigNoz/signoz/pkg/factory"
|
||||||
|
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||||
|
"github.com/uptrace/bun"
|
||||||
|
"github.com/uptrace/bun/migrate"
|
||||||
|
)
|
||||||
|
|
||||||
|
type createRuleStateHistoryV2 struct {
|
||||||
|
telemetryStore telemetrystore.TelemetryStore
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewCreateRuleStateHistoryV2Factory(telemetryStore telemetrystore.TelemetryStore) factory.ProviderFactory[SQLMigration, Config] {
|
||||||
|
return factory.NewProviderFactory(factory.MustNewName("create_rule_state_history_v2"), func(ctx context.Context, providerSettings factory.ProviderSettings, config Config) (SQLMigration, error) {
|
||||||
|
return &createRuleStateHistoryV2{telemetryStore: telemetryStore}, nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (migration *createRuleStateHistoryV2) Register(migrations *migrate.Migrations) error {
|
||||||
|
if err := migrations.Register(migration.Up, migration.Down); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (migration *createRuleStateHistoryV2) Up(ctx context.Context, db *bun.DB) error {
|
||||||
|
// Create the local MergeTree table.
|
||||||
|
if err := migration.telemetryStore.ClickhouseDB().Exec(ctx, `
|
||||||
|
CREATE TABLE IF NOT EXISTS signoz_analytics.rule_state_history_v2
|
||||||
|
(
|
||||||
|
org_id LowCardinality(String),
|
||||||
|
rule_id String,
|
||||||
|
rule_name String,
|
||||||
|
fingerprint UInt64,
|
||||||
|
labels String,
|
||||||
|
state LowCardinality(String),
|
||||||
|
state_changed Bool DEFAULT true,
|
||||||
|
value Float64,
|
||||||
|
unix_milli Int64,
|
||||||
|
overall_state LowCardinality(String),
|
||||||
|
overall_state_changed Bool
|
||||||
|
)
|
||||||
|
ENGINE = MergeTree()
|
||||||
|
PARTITION BY toDate(unix_milli / 1000)
|
||||||
|
ORDER BY (org_id, rule_id, fingerprint, unix_milli)
|
||||||
|
TTL toDate(unix_milli / 1000) + INTERVAL 90 DAY
|
||||||
|
`); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the distributed table.
|
||||||
|
if err := migration.telemetryStore.ClickhouseDB().Exec(ctx, `
|
||||||
|
CREATE TABLE IF NOT EXISTS signoz_analytics.distributed_rule_state_history_v2
|
||||||
|
AS signoz_analytics.rule_state_history_v2
|
||||||
|
ENGINE = Distributed('cluster', 'signoz_analytics', 'rule_state_history_v2', cityHash64(rule_id))
|
||||||
|
`); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (migration *createRuleStateHistoryV2) Down(ctx context.Context, db *bun.DB) error {
|
||||||
|
if err := migration.telemetryStore.ClickhouseDB().Exec(ctx, `
|
||||||
|
DROP TABLE IF EXISTS signoz_analytics.distributed_rule_state_history_v2
|
||||||
|
`); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := migration.telemetryStore.ClickhouseDB().Exec(ctx, `
|
||||||
|
DROP TABLE IF EXISTS signoz_analytics.rule_state_history_v2
|
||||||
|
`); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
445
pkg/types/alertmanagertypes/maintenance.go
Normal file
445
pkg/types/alertmanagertypes/maintenance.go
Normal file
@@ -0,0 +1,445 @@
|
|||||||
|
package alertmanagertypes
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"log/slog"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/SigNoz/signoz/pkg/errors"
|
||||||
|
"github.com/SigNoz/signoz/pkg/types"
|
||||||
|
"github.com/SigNoz/signoz/pkg/valuer"
|
||||||
|
"github.com/expr-lang/expr"
|
||||||
|
"github.com/uptrace/bun"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrCodeInvalidPlannedMaintenancePayload = errors.MustNewCode("invalid_planned_maintenance_payload")
|
||||||
|
)
|
||||||
|
|
||||||
|
type StorablePlannedMaintenance struct {
|
||||||
|
bun.BaseModel `bun:"table:planned_maintenance_v2"`
|
||||||
|
types.Identifiable
|
||||||
|
types.TimeAuditable
|
||||||
|
types.UserAuditable
|
||||||
|
Name string `bun:"name,type:text,notnull"`
|
||||||
|
Description string `bun:"description,type:text"`
|
||||||
|
Schedule *Schedule `bun:"schedule,type:text,notnull"`
|
||||||
|
RuleIDs string `bun:"rule_ids,type:text"`
|
||||||
|
Expression string `bun:"expression,type:text"`
|
||||||
|
OrgID string `bun:"org_id,type:text"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type GettablePlannedMaintenance struct {
|
||||||
|
Id string `json:"id"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
Description string `json:"description"`
|
||||||
|
Schedule *Schedule `json:"schedule"`
|
||||||
|
RuleIDs []string `json:"ruleIds,omitempty"`
|
||||||
|
Expression string `json:"expression,omitempty"`
|
||||||
|
CreatedAt time.Time `json:"createdAt"`
|
||||||
|
CreatedBy string `json:"createdBy"`
|
||||||
|
UpdatedAt time.Time `json:"updatedAt"`
|
||||||
|
UpdatedBy string `json:"updatedBy"`
|
||||||
|
Status string `json:"status"`
|
||||||
|
Kind string `json:"kind"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *GettablePlannedMaintenance) IsActive(now time.Time) bool {
|
||||||
|
loc, err := time.LoadLocation(m.Schedule.Timezone)
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
currentTime := now.In(loc)
|
||||||
|
|
||||||
|
// fixed schedule
|
||||||
|
if !m.Schedule.StartTime.IsZero() && !m.Schedule.EndTime.IsZero() {
|
||||||
|
startTime := m.Schedule.StartTime.In(loc)
|
||||||
|
endTime := m.Schedule.EndTime.In(loc)
|
||||||
|
if currentTime.Equal(startTime) || currentTime.Equal(endTime) ||
|
||||||
|
(currentTime.After(startTime) && currentTime.Before(endTime)) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// recurring schedule
|
||||||
|
if m.Schedule.Recurrence != nil {
|
||||||
|
start := m.Schedule.Recurrence.StartTime
|
||||||
|
|
||||||
|
// Make sure the recurrence has started
|
||||||
|
if currentTime.Before(start.In(loc)) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if recurrence has expired
|
||||||
|
if m.Schedule.Recurrence.EndTime != nil {
|
||||||
|
endTime := *m.Schedule.Recurrence.EndTime
|
||||||
|
if !endTime.IsZero() && currentTime.After(endTime.In(loc)) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
switch m.Schedule.Recurrence.RepeatType {
|
||||||
|
case RepeatTypeDaily:
|
||||||
|
return m.checkDaily(currentTime, m.Schedule.Recurrence, loc)
|
||||||
|
case RepeatTypeWeekly:
|
||||||
|
return m.checkWeekly(currentTime, m.Schedule.Recurrence, loc)
|
||||||
|
case RepeatTypeMonthly:
|
||||||
|
return m.checkMonthly(currentTime, m.Schedule.Recurrence, loc)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkDaily rebases the recurrence start to today (or yesterday if needed)
|
||||||
|
// and returns true if currentTime is within [candidate, candidate+Duration].
|
||||||
|
func (m *GettablePlannedMaintenance) checkDaily(currentTime time.Time, rec *Recurrence, loc *time.Location) bool {
|
||||||
|
candidate := time.Date(
|
||||||
|
currentTime.Year(), currentTime.Month(), currentTime.Day(),
|
||||||
|
rec.StartTime.Hour(), rec.StartTime.Minute(), 0, 0,
|
||||||
|
loc,
|
||||||
|
)
|
||||||
|
if candidate.After(currentTime) {
|
||||||
|
candidate = candidate.AddDate(0, 0, -1)
|
||||||
|
}
|
||||||
|
return currentTime.Sub(candidate) <= time.Duration(rec.Duration)
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkWeekly finds the most recent allowed occurrence by rebasing the recurrence's
|
||||||
|
// time-of-day onto the allowed weekday. It does this for each allowed day and returns true
|
||||||
|
// if the current time falls within the candidate window.
|
||||||
|
func (m *GettablePlannedMaintenance) checkWeekly(currentTime time.Time, rec *Recurrence, loc *time.Location) bool {
|
||||||
|
// If no days specified, treat as every day (like daily).
|
||||||
|
if len(rec.RepeatOn) == 0 {
|
||||||
|
return m.checkDaily(currentTime, rec, loc)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, day := range rec.RepeatOn {
|
||||||
|
allowedDay, ok := RepeatOnAllMap[day]
|
||||||
|
if !ok {
|
||||||
|
continue // skip invalid days
|
||||||
|
}
|
||||||
|
// Compute the day difference: allowedDay - current weekday.
|
||||||
|
delta := int(allowedDay) - int(currentTime.Weekday())
|
||||||
|
// Build a candidate occurrence by rebasing today's date to the allowed weekday.
|
||||||
|
candidate := time.Date(
|
||||||
|
currentTime.Year(), currentTime.Month(), currentTime.Day(),
|
||||||
|
rec.StartTime.Hour(), rec.StartTime.Minute(), 0, 0,
|
||||||
|
loc,
|
||||||
|
).AddDate(0, 0, delta)
|
||||||
|
// If the candidate is in the future, subtract 7 days.
|
||||||
|
if candidate.After(currentTime) {
|
||||||
|
candidate = candidate.AddDate(0, 0, -7)
|
||||||
|
}
|
||||||
|
if currentTime.Sub(candidate) <= time.Duration(rec.Duration) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkMonthly rebases the candidate occurrence using the recurrence's day-of-month.
|
||||||
|
// If the candidate for the current month is in the future, it uses the previous month.
|
||||||
|
func (m *GettablePlannedMaintenance) checkMonthly(currentTime time.Time, rec *Recurrence, loc *time.Location) bool {
|
||||||
|
refDay := rec.StartTime.Day()
|
||||||
|
year, month, _ := currentTime.Date()
|
||||||
|
lastDay := time.Date(year, month+1, 0, 0, 0, 0, 0, loc).Day()
|
||||||
|
day := refDay
|
||||||
|
if refDay > lastDay {
|
||||||
|
day = lastDay
|
||||||
|
}
|
||||||
|
candidate := time.Date(year, month, day,
|
||||||
|
rec.StartTime.Hour(), rec.StartTime.Minute(), rec.StartTime.Second(), rec.StartTime.Nanosecond(),
|
||||||
|
loc,
|
||||||
|
)
|
||||||
|
if candidate.After(currentTime) {
|
||||||
|
// Use previous month.
|
||||||
|
candidate = candidate.AddDate(0, -1, 0)
|
||||||
|
y, m, _ := candidate.Date()
|
||||||
|
lastDayPrev := time.Date(y, m+1, 0, 0, 0, 0, 0, loc).Day()
|
||||||
|
if refDay > lastDayPrev {
|
||||||
|
candidate = time.Date(y, m, lastDayPrev,
|
||||||
|
rec.StartTime.Hour(), rec.StartTime.Minute(), rec.StartTime.Second(), rec.StartTime.Nanosecond(),
|
||||||
|
loc,
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
candidate = time.Date(y, m, refDay,
|
||||||
|
rec.StartTime.Hour(), rec.StartTime.Minute(), rec.StartTime.Second(), rec.StartTime.Nanosecond(),
|
||||||
|
loc,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return currentTime.Sub(candidate) <= time.Duration(rec.Duration)
|
||||||
|
}
|
||||||
|
|
||||||
|
// CurrentWindowEndTime returns the end time of the current active maintenance window.
|
||||||
|
// Returns zero time and false if the maintenance is not currently active.
|
||||||
|
func (m *GettablePlannedMaintenance) CurrentWindowEndTime(now time.Time) (time.Time, bool) {
|
||||||
|
loc, err := time.LoadLocation(m.Schedule.Timezone)
|
||||||
|
if err != nil {
|
||||||
|
return time.Time{}, false
|
||||||
|
}
|
||||||
|
|
||||||
|
currentTime := now.In(loc)
|
||||||
|
|
||||||
|
// fixed schedule
|
||||||
|
if !m.Schedule.StartTime.IsZero() && !m.Schedule.EndTime.IsZero() {
|
||||||
|
startTime := m.Schedule.StartTime.In(loc)
|
||||||
|
endTime := m.Schedule.EndTime.In(loc)
|
||||||
|
if currentTime.Equal(startTime) || currentTime.Equal(endTime) ||
|
||||||
|
(currentTime.After(startTime) && currentTime.Before(endTime)) {
|
||||||
|
return endTime, true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// recurring schedule
|
||||||
|
if m.Schedule.Recurrence != nil {
|
||||||
|
start := m.Schedule.Recurrence.StartTime
|
||||||
|
if currentTime.Before(start.In(loc)) {
|
||||||
|
return time.Time{}, false
|
||||||
|
}
|
||||||
|
if m.Schedule.Recurrence.EndTime != nil {
|
||||||
|
endTime := *m.Schedule.Recurrence.EndTime
|
||||||
|
if !endTime.IsZero() && currentTime.After(endTime.In(loc)) {
|
||||||
|
return time.Time{}, false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var candidate time.Time
|
||||||
|
var active bool
|
||||||
|
switch m.Schedule.Recurrence.RepeatType {
|
||||||
|
case RepeatTypeDaily:
|
||||||
|
candidate, active = m.currentDailyWindowEnd(currentTime, m.Schedule.Recurrence, loc)
|
||||||
|
case RepeatTypeWeekly:
|
||||||
|
candidate, active = m.currentWeeklyWindowEnd(currentTime, m.Schedule.Recurrence, loc)
|
||||||
|
case RepeatTypeMonthly:
|
||||||
|
candidate, active = m.currentMonthlyWindowEnd(currentTime, m.Schedule.Recurrence, loc)
|
||||||
|
}
|
||||||
|
if active {
|
||||||
|
return candidate, true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return time.Time{}, false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *GettablePlannedMaintenance) currentDailyWindowEnd(currentTime time.Time, rec *Recurrence, loc *time.Location) (time.Time, bool) {
|
||||||
|
candidate := time.Date(
|
||||||
|
currentTime.Year(), currentTime.Month(), currentTime.Day(),
|
||||||
|
rec.StartTime.Hour(), rec.StartTime.Minute(), 0, 0,
|
||||||
|
loc,
|
||||||
|
)
|
||||||
|
if candidate.After(currentTime) {
|
||||||
|
candidate = candidate.AddDate(0, 0, -1)
|
||||||
|
}
|
||||||
|
endTime := candidate.Add(time.Duration(rec.Duration))
|
||||||
|
if currentTime.Before(endTime) || currentTime.Equal(endTime) {
|
||||||
|
return endTime, true
|
||||||
|
}
|
||||||
|
return time.Time{}, false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *GettablePlannedMaintenance) currentWeeklyWindowEnd(currentTime time.Time, rec *Recurrence, loc *time.Location) (time.Time, bool) {
|
||||||
|
if len(rec.RepeatOn) == 0 {
|
||||||
|
return m.currentDailyWindowEnd(currentTime, rec, loc)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, day := range rec.RepeatOn {
|
||||||
|
allowedDay, ok := RepeatOnAllMap[day]
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
delta := int(allowedDay) - int(currentTime.Weekday())
|
||||||
|
candidate := time.Date(
|
||||||
|
currentTime.Year(), currentTime.Month(), currentTime.Day(),
|
||||||
|
rec.StartTime.Hour(), rec.StartTime.Minute(), 0, 0,
|
||||||
|
loc,
|
||||||
|
).AddDate(0, 0, delta)
|
||||||
|
if candidate.After(currentTime) {
|
||||||
|
candidate = candidate.AddDate(0, 0, -7)
|
||||||
|
}
|
||||||
|
endTime := candidate.Add(time.Duration(rec.Duration))
|
||||||
|
if currentTime.Before(endTime) || currentTime.Equal(endTime) {
|
||||||
|
return endTime, true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return time.Time{}, false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *GettablePlannedMaintenance) currentMonthlyWindowEnd(currentTime time.Time, rec *Recurrence, loc *time.Location) (time.Time, bool) {
|
||||||
|
refDay := rec.StartTime.Day()
|
||||||
|
year, month, _ := currentTime.Date()
|
||||||
|
lastDay := time.Date(year, month+1, 0, 0, 0, 0, 0, loc).Day()
|
||||||
|
day := refDay
|
||||||
|
if refDay > lastDay {
|
||||||
|
day = lastDay
|
||||||
|
}
|
||||||
|
candidate := time.Date(year, month, day,
|
||||||
|
rec.StartTime.Hour(), rec.StartTime.Minute(), rec.StartTime.Second(), rec.StartTime.Nanosecond(),
|
||||||
|
loc,
|
||||||
|
)
|
||||||
|
if candidate.After(currentTime) {
|
||||||
|
candidate = candidate.AddDate(0, -1, 0)
|
||||||
|
y, m, _ := candidate.Date()
|
||||||
|
lastDayPrev := time.Date(y, m+1, 0, 0, 0, 0, 0, loc).Day()
|
||||||
|
if refDay > lastDayPrev {
|
||||||
|
candidate = time.Date(y, m, lastDayPrev,
|
||||||
|
rec.StartTime.Hour(), rec.StartTime.Minute(), rec.StartTime.Second(), rec.StartTime.Nanosecond(),
|
||||||
|
loc,
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
candidate = time.Date(y, m, refDay,
|
||||||
|
rec.StartTime.Hour(), rec.StartTime.Minute(), rec.StartTime.Second(), rec.StartTime.Nanosecond(),
|
||||||
|
loc,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
endTime := candidate.Add(time.Duration(rec.Duration))
|
||||||
|
if currentTime.Before(endTime) || currentTime.Equal(endTime) {
|
||||||
|
return endTime, true
|
||||||
|
}
|
||||||
|
return time.Time{}, false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *GettablePlannedMaintenance) IsUpcoming() bool {
|
||||||
|
loc, err := time.LoadLocation(m.Schedule.Timezone)
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
now := time.Now().In(loc)
|
||||||
|
|
||||||
|
if !m.Schedule.StartTime.IsZero() && !m.Schedule.EndTime.IsZero() {
|
||||||
|
return now.Before(m.Schedule.StartTime)
|
||||||
|
}
|
||||||
|
if m.Schedule.Recurrence != nil {
|
||||||
|
return now.Before(m.Schedule.Recurrence.StartTime)
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *GettablePlannedMaintenance) IsRecurring() bool {
|
||||||
|
return m.Schedule.Recurrence != nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *GettablePlannedMaintenance) Validate() error {
|
||||||
|
if m.Name == "" {
|
||||||
|
return errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidPlannedMaintenancePayload, "missing name in the payload")
|
||||||
|
}
|
||||||
|
if m.Schedule == nil {
|
||||||
|
return errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidPlannedMaintenancePayload, "missing schedule in the payload")
|
||||||
|
}
|
||||||
|
if m.Schedule.Timezone == "" {
|
||||||
|
return errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidPlannedMaintenancePayload, "missing timezone in the payload")
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := time.LoadLocation(m.Schedule.Timezone)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidPlannedMaintenancePayload, "invalid timezone in the payload")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !m.Schedule.StartTime.IsZero() && !m.Schedule.EndTime.IsZero() {
|
||||||
|
if m.Schedule.StartTime.After(m.Schedule.EndTime) {
|
||||||
|
return errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidPlannedMaintenancePayload, "start time cannot be after end time")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if m.Schedule.Recurrence != nil {
|
||||||
|
if m.Schedule.Recurrence.RepeatType == "" {
|
||||||
|
return errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidPlannedMaintenancePayload, "missing repeat type in the payload")
|
||||||
|
}
|
||||||
|
if m.Schedule.Recurrence.Duration == 0 {
|
||||||
|
return errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidPlannedMaintenancePayload, "missing duration in the payload")
|
||||||
|
}
|
||||||
|
if m.Schedule.Recurrence.EndTime != nil && m.Schedule.Recurrence.EndTime.Before(m.Schedule.Recurrence.StartTime) {
|
||||||
|
return errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidPlannedMaintenancePayload, "end time cannot be before start time")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if m.Expression != "" {
|
||||||
|
if _, err := expr.Compile(m.Expression); err != nil {
|
||||||
|
return errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidPlannedMaintenancePayload, "invalid expression: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m GettablePlannedMaintenance) MarshalJSON() ([]byte, error) {
|
||||||
|
now := time.Now().In(time.FixedZone(m.Schedule.Timezone, 0))
|
||||||
|
var status string
|
||||||
|
if m.IsActive(now) {
|
||||||
|
status = "active"
|
||||||
|
} else if m.IsUpcoming() {
|
||||||
|
status = "upcoming"
|
||||||
|
} else {
|
||||||
|
status = "expired"
|
||||||
|
}
|
||||||
|
var kind string
|
||||||
|
|
||||||
|
if !m.Schedule.StartTime.IsZero() && !m.Schedule.EndTime.IsZero() && m.Schedule.EndTime.After(m.Schedule.StartTime) {
|
||||||
|
kind = "fixed"
|
||||||
|
} else {
|
||||||
|
kind = "recurring"
|
||||||
|
}
|
||||||
|
|
||||||
|
return json.Marshal(struct {
|
||||||
|
Id string `json:"id"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
Description string `json:"description"`
|
||||||
|
Schedule *Schedule `json:"schedule"`
|
||||||
|
RuleIDs []string `json:"ruleIds,omitempty"`
|
||||||
|
Expression string `json:"expression,omitempty"`
|
||||||
|
CreatedAt time.Time `json:"createdAt"`
|
||||||
|
CreatedBy string `json:"createdBy"`
|
||||||
|
UpdatedAt time.Time `json:"updatedAt"`
|
||||||
|
UpdatedBy string `json:"updatedBy"`
|
||||||
|
Status string `json:"status"`
|
||||||
|
Kind string `json:"kind"`
|
||||||
|
}{
|
||||||
|
Id: m.Id,
|
||||||
|
Name: m.Name,
|
||||||
|
Description: m.Description,
|
||||||
|
Schedule: m.Schedule,
|
||||||
|
RuleIDs: m.RuleIDs,
|
||||||
|
Expression: m.Expression,
|
||||||
|
CreatedAt: m.CreatedAt,
|
||||||
|
CreatedBy: m.CreatedBy,
|
||||||
|
UpdatedAt: m.UpdatedAt,
|
||||||
|
UpdatedBy: m.UpdatedBy,
|
||||||
|
Status: status,
|
||||||
|
Kind: kind,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// ConvertStorableToGettable converts a StorablePlannedMaintenance to GettablePlannedMaintenance.
|
||||||
|
func ConvertStorableToGettable(s *StorablePlannedMaintenance) *GettablePlannedMaintenance {
|
||||||
|
var ruleIDs []string
|
||||||
|
if s.RuleIDs != "" {
|
||||||
|
if err := json.Unmarshal([]byte(s.RuleIDs), &ruleIDs); err != nil {
|
||||||
|
slog.Error("failed to unmarshal rule_ids from DB", "error", err, "raw", s.RuleIDs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &GettablePlannedMaintenance{
|
||||||
|
Id: s.ID.StringValue(),
|
||||||
|
Name: s.Name,
|
||||||
|
Description: s.Description,
|
||||||
|
Schedule: s.Schedule,
|
||||||
|
RuleIDs: ruleIDs,
|
||||||
|
Expression: s.Expression,
|
||||||
|
CreatedAt: s.CreatedAt,
|
||||||
|
UpdatedAt: s.UpdatedAt,
|
||||||
|
CreatedBy: s.CreatedBy,
|
||||||
|
UpdatedBy: s.UpdatedBy,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type MaintenanceStore interface {
|
||||||
|
CreatePlannedMaintenance(context.Context, GettablePlannedMaintenance) (valuer.UUID, error)
|
||||||
|
DeletePlannedMaintenance(context.Context, valuer.UUID) error
|
||||||
|
GetPlannedMaintenanceByID(context.Context, valuer.UUID) (*GettablePlannedMaintenance, error)
|
||||||
|
EditPlannedMaintenance(context.Context, GettablePlannedMaintenance, valuer.UUID) error
|
||||||
|
GetAllPlannedMaintenance(context.Context, string) ([]*GettablePlannedMaintenance, error)
|
||||||
|
}
|
||||||
1272
pkg/types/alertmanagertypes/maintenance_test.go
Normal file
1272
pkg/types/alertmanagertypes/maintenance_test.go
Normal file
File diff suppressed because it is too large
Load Diff
86
pkg/types/alertmanagertypes/recurrence.go
Normal file
86
pkg/types/alertmanagertypes/recurrence.go
Normal file
@@ -0,0 +1,86 @@
|
|||||||
|
package alertmanagertypes
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql/driver"
|
||||||
|
"encoding/json"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/SigNoz/signoz/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type RepeatType string
|
||||||
|
|
||||||
|
const (
|
||||||
|
RepeatTypeDaily RepeatType = "daily"
|
||||||
|
RepeatTypeWeekly RepeatType = "weekly"
|
||||||
|
RepeatTypeMonthly RepeatType = "monthly"
|
||||||
|
)
|
||||||
|
|
||||||
|
type RepeatOn string
|
||||||
|
|
||||||
|
const (
|
||||||
|
RepeatOnSunday RepeatOn = "sunday"
|
||||||
|
RepeatOnMonday RepeatOn = "monday"
|
||||||
|
RepeatOnTuesday RepeatOn = "tuesday"
|
||||||
|
RepeatOnWednesday RepeatOn = "wednesday"
|
||||||
|
RepeatOnThursday RepeatOn = "thursday"
|
||||||
|
RepeatOnFriday RepeatOn = "friday"
|
||||||
|
RepeatOnSaturday RepeatOn = "saturday"
|
||||||
|
)
|
||||||
|
|
||||||
|
var RepeatOnAllMap = map[RepeatOn]time.Weekday{
|
||||||
|
RepeatOnSunday: time.Sunday,
|
||||||
|
RepeatOnMonday: time.Monday,
|
||||||
|
RepeatOnTuesday: time.Tuesday,
|
||||||
|
RepeatOnWednesday: time.Wednesday,
|
||||||
|
RepeatOnThursday: time.Thursday,
|
||||||
|
RepeatOnFriday: time.Friday,
|
||||||
|
RepeatOnSaturday: time.Saturday,
|
||||||
|
}
|
||||||
|
|
||||||
|
type Duration time.Duration
|
||||||
|
|
||||||
|
func (d Duration) MarshalJSON() ([]byte, error) {
|
||||||
|
return json.Marshal(time.Duration(d).String())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Duration) UnmarshalJSON(b []byte) error {
|
||||||
|
var v interface{}
|
||||||
|
if err := json.Unmarshal(b, &v); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
switch value := v.(type) {
|
||||||
|
case float64:
|
||||||
|
*d = Duration(time.Duration(value))
|
||||||
|
return nil
|
||||||
|
case string:
|
||||||
|
tmp, err := time.ParseDuration(value)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
*d = Duration(tmp)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
return errors.New(errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid duration")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type Recurrence struct {
|
||||||
|
StartTime time.Time `json:"startTime"`
|
||||||
|
EndTime *time.Time `json:"endTime,omitempty"`
|
||||||
|
Duration Duration `json:"duration"`
|
||||||
|
RepeatType RepeatType `json:"repeatType"`
|
||||||
|
RepeatOn []RepeatOn `json:"repeatOn"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Recurrence) Scan(src interface{}) error {
|
||||||
|
if data, ok := src.([]byte); ok {
|
||||||
|
return json.Unmarshal(data, r)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Recurrence) Value() (driver.Value, error) {
|
||||||
|
return json.Marshal(r)
|
||||||
|
}
|
||||||
132
pkg/types/alertmanagertypes/schedule.go
Normal file
132
pkg/types/alertmanagertypes/schedule.go
Normal file
@@ -0,0 +1,132 @@
|
|||||||
|
package alertmanagertypes
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql/driver"
|
||||||
|
"encoding/json"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Schedule struct {
|
||||||
|
Timezone string `json:"timezone"`
|
||||||
|
StartTime time.Time `json:"startTime,omitempty"`
|
||||||
|
EndTime time.Time `json:"endTime,omitempty"`
|
||||||
|
Recurrence *Recurrence `json:"recurrence"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Schedule) Scan(src interface{}) error {
|
||||||
|
if data, ok := src.([]byte); ok {
|
||||||
|
return json.Unmarshal(data, s)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Schedule) Value() (driver.Value, error) {
|
||||||
|
return json.Marshal(s)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s Schedule) MarshalJSON() ([]byte, error) {
|
||||||
|
loc, err := time.LoadLocation(s.Timezone)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var startTime, endTime time.Time
|
||||||
|
if !s.StartTime.IsZero() {
|
||||||
|
startTime = time.Date(s.StartTime.Year(), s.StartTime.Month(), s.StartTime.Day(), s.StartTime.Hour(), s.StartTime.Minute(), s.StartTime.Second(), s.StartTime.Nanosecond(), loc)
|
||||||
|
}
|
||||||
|
if !s.EndTime.IsZero() {
|
||||||
|
endTime = time.Date(s.EndTime.Year(), s.EndTime.Month(), s.EndTime.Day(), s.EndTime.Hour(), s.EndTime.Minute(), s.EndTime.Second(), s.EndTime.Nanosecond(), loc)
|
||||||
|
}
|
||||||
|
|
||||||
|
var recurrence *Recurrence
|
||||||
|
if s.Recurrence != nil {
|
||||||
|
recStartTime := time.Date(s.Recurrence.StartTime.Year(), s.Recurrence.StartTime.Month(), s.Recurrence.StartTime.Day(), s.Recurrence.StartTime.Hour(), s.Recurrence.StartTime.Minute(), s.Recurrence.StartTime.Second(), s.Recurrence.StartTime.Nanosecond(), loc)
|
||||||
|
var recEndTime *time.Time
|
||||||
|
if s.Recurrence.EndTime != nil {
|
||||||
|
end := time.Date(s.Recurrence.EndTime.Year(), s.Recurrence.EndTime.Month(), s.Recurrence.EndTime.Day(), s.Recurrence.EndTime.Hour(), s.Recurrence.EndTime.Minute(), s.Recurrence.EndTime.Second(), s.Recurrence.EndTime.Nanosecond(), loc)
|
||||||
|
recEndTime = &end
|
||||||
|
}
|
||||||
|
recurrence = &Recurrence{
|
||||||
|
StartTime: recStartTime,
|
||||||
|
EndTime: recEndTime,
|
||||||
|
Duration: s.Recurrence.Duration,
|
||||||
|
RepeatType: s.Recurrence.RepeatType,
|
||||||
|
RepeatOn: s.Recurrence.RepeatOn,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return json.Marshal(&struct {
|
||||||
|
Timezone string `json:"timezone"`
|
||||||
|
StartTime string `json:"startTime"`
|
||||||
|
EndTime string `json:"endTime"`
|
||||||
|
Recurrence *Recurrence `json:"recurrence,omitempty"`
|
||||||
|
}{
|
||||||
|
Timezone: s.Timezone,
|
||||||
|
StartTime: startTime.Format(time.RFC3339),
|
||||||
|
EndTime: endTime.Format(time.RFC3339),
|
||||||
|
Recurrence: recurrence,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Schedule) UnmarshalJSON(data []byte) error {
|
||||||
|
aux := &struct {
|
||||||
|
Timezone string `json:"timezone"`
|
||||||
|
StartTime string `json:"startTime"`
|
||||||
|
EndTime string `json:"endTime"`
|
||||||
|
Recurrence *Recurrence `json:"recurrence,omitempty"`
|
||||||
|
}{}
|
||||||
|
if err := json.Unmarshal(data, aux); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
loc, err := time.LoadLocation(aux.Timezone)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var startTime time.Time
|
||||||
|
if aux.StartTime != "" {
|
||||||
|
startTime, err = time.Parse(time.RFC3339, aux.StartTime)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
s.StartTime = time.Date(startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(), startTime.Second(), startTime.Nanosecond(), loc)
|
||||||
|
}
|
||||||
|
|
||||||
|
var endTime time.Time
|
||||||
|
if aux.EndTime != "" {
|
||||||
|
endTime, err = time.Parse(time.RFC3339, aux.EndTime)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
s.EndTime = time.Date(endTime.Year(), endTime.Month(), endTime.Day(), endTime.Hour(), endTime.Minute(), endTime.Second(), endTime.Nanosecond(), loc)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.Timezone = aux.Timezone
|
||||||
|
|
||||||
|
if aux.Recurrence != nil {
|
||||||
|
recStartTime, err := time.Parse(time.RFC3339, aux.Recurrence.StartTime.Format(time.RFC3339))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var recEndTime *time.Time
|
||||||
|
if aux.Recurrence.EndTime != nil {
|
||||||
|
end, err := time.Parse(time.RFC3339, aux.Recurrence.EndTime.Format(time.RFC3339))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
endConverted := time.Date(end.Year(), end.Month(), end.Day(), end.Hour(), end.Minute(), end.Second(), end.Nanosecond(), loc)
|
||||||
|
recEndTime = &endConverted
|
||||||
|
}
|
||||||
|
|
||||||
|
s.Recurrence = &Recurrence{
|
||||||
|
StartTime: time.Date(recStartTime.Year(), recStartTime.Month(), recStartTime.Day(), recStartTime.Hour(), recStartTime.Minute(), recStartTime.Second(), recStartTime.Nanosecond(), loc),
|
||||||
|
EndTime: recEndTime,
|
||||||
|
Duration: aux.Recurrence.Duration,
|
||||||
|
RepeatType: aux.Recurrence.RepeatType,
|
||||||
|
RepeatOn: aux.Recurrence.RepeatOn,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
137
pkg/types/alertmanagertypes/statehistory.go
Normal file
137
pkg/types/alertmanagertypes/statehistory.go
Normal file
@@ -0,0 +1,137 @@
|
|||||||
|
package alertmanagertypes
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/SigNoz/signoz/pkg/errors"
|
||||||
|
"github.com/SigNoz/signoz/pkg/valuer"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrCodeInvalidStateHistoryQuery = errors.MustNewCode("invalid_state_history_query")
|
||||||
|
)
|
||||||
|
|
||||||
|
// AlertState represents the state of an alert series (firing, inactive, muted, no_data)
|
||||||
|
// or the overall state of a rule (firing, inactive).
|
||||||
|
type AlertState struct {
|
||||||
|
valuer.String
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
AlertStateFiring = AlertState{valuer.NewString("firing")}
|
||||||
|
AlertStateInactive = AlertState{valuer.NewString("inactive")}
|
||||||
|
AlertStateMuted = AlertState{valuer.NewString("muted")}
|
||||||
|
AlertStateNoData = AlertState{valuer.NewString("no_data")}
|
||||||
|
)
|
||||||
|
|
||||||
|
// SortOrder represents the sort direction for query results.
|
||||||
|
type SortOrder struct {
|
||||||
|
valuer.String
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
SortOrderAsc = SortOrder{valuer.NewString("asc")}
|
||||||
|
SortOrderDesc = SortOrder{valuer.NewString("desc")}
|
||||||
|
)
|
||||||
|
|
||||||
|
// RuleStateHistory represents a single state transition entry stored in ClickHouse.
|
||||||
|
// Only transitions are recorded, not every evaluation.
|
||||||
|
type RuleStateHistory struct {
|
||||||
|
OrgID string `json:"orgId"`
|
||||||
|
RuleID string `json:"ruleId"`
|
||||||
|
RuleName string `json:"ruleName"`
|
||||||
|
OverallState string `json:"overallState"` // aggregate rule state: "firing" if any series fires
|
||||||
|
OverallStateChanged bool `json:"overallStateChanged"` // true if this entry changed the overall state
|
||||||
|
State string `json:"state"` // per-series state: firing, inactive, muted, no_data
|
||||||
|
StateChanged bool `json:"stateChanged"` // always true in v2 (only transitions stored)
|
||||||
|
UnixMilli int64 `json:"unixMilli"`
|
||||||
|
Labels string `json:"labels"` // JSON-encoded label set
|
||||||
|
Fingerprint uint64 `json:"fingerprint"` // hash of the full label set
|
||||||
|
Value float64 `json:"value"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueryRuleStateHistory is the request body for all v2 state history API endpoints.
|
||||||
|
type QueryRuleStateHistory struct {
|
||||||
|
Start int64 `json:"start"` // unix millis, required
|
||||||
|
End int64 `json:"end"` // unix millis, required
|
||||||
|
State AlertState `json:"state"` // optional filter: firing, inactive, muted
|
||||||
|
Offset int64 `json:"offset"`
|
||||||
|
Limit int64 `json:"limit"`
|
||||||
|
Order SortOrder `json:"order"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *QueryRuleStateHistory) Validate() error {
|
||||||
|
if q.Start == 0 || q.End == 0 {
|
||||||
|
return errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidStateHistoryQuery, "start and end are required")
|
||||||
|
}
|
||||||
|
if q.Offset < 0 || q.Limit < 0 {
|
||||||
|
return errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidStateHistoryQuery, "offset and limit must be greater than or equal to 0")
|
||||||
|
}
|
||||||
|
if q.Order.StringValue() != SortOrderAsc.StringValue() && q.Order.StringValue() != SortOrderDesc.StringValue() {
|
||||||
|
return errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidStateHistoryQuery, "order must be asc or desc")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// RuleStateTimeline is the paginated response for the timeline endpoint.
|
||||||
|
type RuleStateTimeline struct {
|
||||||
|
Items []RuleStateHistory `json:"items"`
|
||||||
|
Total uint64 `json:"total"`
|
||||||
|
Labels map[string][]string `json:"labels"` // distinct label keys/values for filter UI
|
||||||
|
}
|
||||||
|
|
||||||
|
// RuleStateHistoryContributor is an alert series ranked by firing frequency.
|
||||||
|
type RuleStateHistoryContributor struct {
|
||||||
|
Fingerprint uint64 `json:"fingerprint"`
|
||||||
|
Labels string `json:"labels"` // JSON-encoded label set
|
||||||
|
Count uint64 `json:"count"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// RuleStateTransition represents a contiguous time period during which a rule
|
||||||
|
// was in a particular overall state (firing or inactive).
|
||||||
|
type RuleStateTransition struct {
|
||||||
|
State AlertState `json:"state"`
|
||||||
|
Start int64 `json:"start"`
|
||||||
|
End int64 `json:"end"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// RuleStats compares trigger counts and avg resolution times between the current
|
||||||
|
// time period and a previous period of equal length.
|
||||||
|
type RuleStats struct {
|
||||||
|
TotalCurrentTriggers uint64 `json:"totalCurrentTriggers"`
|
||||||
|
TotalPastTriggers uint64 `json:"totalPastTriggers"`
|
||||||
|
CurrentTriggersSeries *Series `json:"currentTriggersSeries"`
|
||||||
|
PastTriggersSeries *Series `json:"pastTriggersSeries"`
|
||||||
|
CurrentAvgResolutionTime float64 `json:"currentAvgResolutionTime"`
|
||||||
|
PastAvgResolutionTime float64 `json:"pastAvgResolutionTime"`
|
||||||
|
CurrentAvgResolutionTimeSeries *Series `json:"currentAvgResolutionTimeSeries"`
|
||||||
|
PastAvgResolutionTimeSeries *Series `json:"pastAvgResolutionTimeSeries"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Series struct {
|
||||||
|
Labels map[string]string `json:"labels"`
|
||||||
|
Points []Point `json:"values"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Point struct {
|
||||||
|
Timestamp int64 `json:"timestamp"`
|
||||||
|
Value float64 `json:"value"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// StateHistoryStore provides read and write access to rule state history in ClickHouse.
|
||||||
|
type StateHistoryStore interface {
|
||||||
|
WriteRuleStateHistory(ctx context.Context, entries []RuleStateHistory) error
|
||||||
|
// GetLastSavedRuleStateHistory returns the most recent transition per fingerprint,
|
||||||
|
// used to restore in-memory state after restart.
|
||||||
|
GetLastSavedRuleStateHistory(ctx context.Context, ruleID string) ([]RuleStateHistory, error)
|
||||||
|
|
||||||
|
GetRuleStateHistoryTimeline(ctx context.Context, orgID string, ruleID string, params *QueryRuleStateHistory) (*RuleStateTimeline, error)
|
||||||
|
GetRuleStateHistoryTopContributors(ctx context.Context, orgID string, ruleID string, params *QueryRuleStateHistory) ([]RuleStateHistoryContributor, error)
|
||||||
|
// GetOverallStateTransitions returns firing/inactive periods with gap-filling.
|
||||||
|
GetOverallStateTransitions(ctx context.Context, orgID string, ruleID string, params *QueryRuleStateHistory) ([]RuleStateTransition, error)
|
||||||
|
GetTotalTriggers(ctx context.Context, orgID string, ruleID string, params *QueryRuleStateHistory) (uint64, error)
|
||||||
|
GetTriggersByInterval(ctx context.Context, orgID string, ruleID string, params *QueryRuleStateHistory) (*Series, error)
|
||||||
|
// GetAvgResolutionTime returns avg seconds between firing and next resolution.
|
||||||
|
GetAvgResolutionTime(ctx context.Context, orgID string, ruleID string, params *QueryRuleStateHistory) (float64, error)
|
||||||
|
GetAvgResolutionTimeByInterval(ctx context.Context, orgID string, ruleID string, params *QueryRuleStateHistory) (*Series, error)
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user