mirror of
https://github.com/SigNoz/signoz.git
synced 2026-04-27 14:10:30 +01:00
Compare commits
5 Commits
alertmanag
...
nv/10890
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2a6539bd80 | ||
|
|
7d2457d305 | ||
|
|
71351b2856 | ||
|
|
018c78f4ff | ||
|
|
31e2c1b585 |
@@ -49,7 +49,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
|
||||
rules = append(rules, tr)
|
||||
|
||||
// create ch rule task for evaluation
|
||||
task = newTask(baserules.TaskTypeCh, opts.TaskName, evaluation.GetFrequency().Duration(), rules, opts.ManagerOpts, opts.NotifyFunc, opts.OrgID)
|
||||
task = newTask(baserules.TaskTypeCh, opts.TaskName, evaluation.GetFrequency().Duration(), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
|
||||
|
||||
} else if opts.Rule.RuleType == ruletypes.RuleTypeProm {
|
||||
|
||||
@@ -73,7 +73,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
|
||||
rules = append(rules, pr)
|
||||
|
||||
// create promql rule task for evaluation
|
||||
task = newTask(baserules.TaskTypeProm, opts.TaskName, evaluation.GetFrequency().Duration(), rules, opts.ManagerOpts, opts.NotifyFunc, opts.OrgID)
|
||||
task = newTask(baserules.TaskTypeProm, opts.TaskName, evaluation.GetFrequency().Duration(), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
|
||||
|
||||
} else if opts.Rule.RuleType == ruletypes.RuleTypeAnomaly {
|
||||
// create anomaly rule
|
||||
@@ -96,7 +96,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
|
||||
rules = append(rules, ar)
|
||||
|
||||
// create anomaly rule task for evaluation
|
||||
task = newTask(baserules.TaskTypeCh, opts.TaskName, evaluation.GetFrequency().Duration(), rules, opts.ManagerOpts, opts.NotifyFunc, opts.OrgID)
|
||||
task = newTask(baserules.TaskTypeCh, opts.TaskName, evaluation.GetFrequency().Duration(), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
|
||||
|
||||
} else {
|
||||
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported rule type %s. Supported types: %s, %s", opts.Rule.RuleType, ruletypes.RuleTypeProm, ruletypes.RuleTypeThreshold)
|
||||
@@ -210,9 +210,9 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, error) {
|
||||
}
|
||||
|
||||
// newTask returns an appropriate group for the rule type
|
||||
func newTask(taskType baserules.TaskType, name string, frequency time.Duration, rules []baserules.Rule, opts *baserules.ManagerOptions, notify baserules.NotifyFunc, orgID valuer.UUID) baserules.Task {
|
||||
func newTask(taskType baserules.TaskType, name string, frequency time.Duration, rules []baserules.Rule, opts *baserules.ManagerOptions, notify baserules.NotifyFunc, maintenanceStore ruletypes.MaintenanceStore, orgID valuer.UUID) baserules.Task {
|
||||
if taskType == baserules.TaskTypeCh {
|
||||
return baserules.NewRuleTask(name, "", frequency, rules, opts, notify, orgID)
|
||||
return baserules.NewRuleTask(name, "", frequency, rules, opts, notify, maintenanceStore, orgID)
|
||||
}
|
||||
return baserules.NewPromRuleTask(name, "", frequency, rules, opts, notify, orgID)
|
||||
return baserules.NewPromRuleTask(name, "", frequency, rules, opts, notify, maintenanceStore, orgID)
|
||||
}
|
||||
|
||||
@@ -1,94 +0,0 @@
|
||||
package alertmanagerserver
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/alertmanager/types"
|
||||
"github.com/prometheus/common/model"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/ruletypes"
|
||||
)
|
||||
|
||||
// MaintenanceMuter implements types.Muter for maintenance windows.
|
||||
// It suppresses alerts whose ruleId label matches an active maintenance schedule.
|
||||
// Results are cached for cacheTTL to avoid a DB query on every per-alert check.
|
||||
type MaintenanceMuter struct {
|
||||
maintenanceStore ruletypes.MaintenanceStore
|
||||
orgID string
|
||||
logger *slog.Logger
|
||||
|
||||
mu sync.RWMutex
|
||||
cached []*ruletypes.PlannedMaintenance
|
||||
cacheExpiry time.Time
|
||||
}
|
||||
|
||||
const maintenanceCacheTTL = 30 * time.Second
|
||||
|
||||
func NewMaintenanceMuter(store ruletypes.MaintenanceStore, orgID string, logger *slog.Logger) *MaintenanceMuter {
|
||||
return &MaintenanceMuter{
|
||||
maintenanceStore: store,
|
||||
orgID: orgID,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MaintenanceMuter) Mutes(ctx context.Context, lset model.LabelSet) bool {
|
||||
ruleID := string(lset[ruletypes.AlertRuleIDLabel])
|
||||
if ruleID == "" {
|
||||
return false
|
||||
}
|
||||
now := time.Now()
|
||||
for _, mw := range m.getMaintenances(ctx) {
|
||||
if mw.ShouldSkip(ruleID, now) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (m *MaintenanceMuter) getMaintenances(ctx context.Context) []*ruletypes.PlannedMaintenance {
|
||||
m.mu.RLock()
|
||||
if time.Now().Before(m.cacheExpiry) {
|
||||
cached := m.cached
|
||||
m.mu.RUnlock()
|
||||
return cached
|
||||
}
|
||||
m.mu.RUnlock()
|
||||
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
// Double-check after acquiring write lock.
|
||||
if time.Now().Before(m.cacheExpiry) {
|
||||
return m.cached
|
||||
}
|
||||
|
||||
mws, err := m.maintenanceStore.ListPlannedMaintenance(ctx, m.orgID)
|
||||
if err != nil {
|
||||
m.logger.ErrorContext(ctx, "failed to list planned maintenance windows; alerts will not be suppressed", slog.String("org_id", m.orgID))
|
||||
return m.cached // return stale (potentially empty) cache on error
|
||||
}
|
||||
m.cached = mws
|
||||
m.cacheExpiry = time.Now().Add(maintenanceCacheTTL)
|
||||
return m.cached
|
||||
}
|
||||
|
||||
// maintenanceMuteStage wraps MaintenanceMuter as a notify.Stage.
|
||||
// We implement the stage directly rather than using notify.NewMuteStage to avoid
|
||||
// a dependency on the unexported *notify.Metrics field of PipelineBuilder.
|
||||
type maintenanceMuteStage struct {
|
||||
muter *MaintenanceMuter
|
||||
}
|
||||
|
||||
func (s *maintenanceMuteStage) Exec(ctx context.Context, _ *slog.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
|
||||
filtered := make([]*types.Alert, 0, len(alerts))
|
||||
for _, a := range alerts {
|
||||
if !s.muter.Mutes(ctx, a.Labels) {
|
||||
filtered = append(filtered, a)
|
||||
}
|
||||
}
|
||||
return ctx, filtered, nil
|
||||
}
|
||||
@@ -1,120 +0,0 @@
|
||||
// Copyright (c) 2026 SigNoz, Inc.
|
||||
// Copyright 2015 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package alertmanagerserver
|
||||
|
||||
// pipelineBuilder is a local copy of notify.PipelineBuilder that injects
|
||||
// the maintenance mute stage immediately before the receiver stage.
|
||||
//
|
||||
// We maintain our own copy so we can control exactly where in the pipeline
|
||||
// the maintenance stage runs (between the silence stage and the receiver),
|
||||
// which is not possible by wrapping the output of the upstream builder.
|
||||
//
|
||||
// Upstream pipeline order (notify.PipelineBuilder.New, notify.go:444):
|
||||
//
|
||||
// GossipSettle → Inhibit → TimeActive → TimeMute → Silence → [mms] → Receiver
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/alertmanager/featurecontrol"
|
||||
"github.com/prometheus/alertmanager/inhibit"
|
||||
"github.com/prometheus/alertmanager/nflog/nflogpb"
|
||||
"github.com/prometheus/alertmanager/notify"
|
||||
"github.com/prometheus/alertmanager/silence"
|
||||
"github.com/prometheus/alertmanager/timeinterval"
|
||||
"github.com/prometheus/alertmanager/types"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
type pipelineBuilder struct {
|
||||
metrics *notify.Metrics
|
||||
ff featurecontrol.Flagger
|
||||
muter *MaintenanceMuter
|
||||
}
|
||||
|
||||
func newPipelineBuilder(
|
||||
r prometheus.Registerer,
|
||||
ff featurecontrol.Flagger,
|
||||
muter *MaintenanceMuter,
|
||||
) *pipelineBuilder {
|
||||
return &pipelineBuilder{
|
||||
metrics: notify.NewMetrics(r, ff),
|
||||
ff: ff,
|
||||
muter: muter,
|
||||
}
|
||||
}
|
||||
|
||||
// New returns a map of receivers to Stages, mirroring notify.PipelineBuilder.New
|
||||
// but inserting a maintenanceMuteStage between the silence stage and the receiver.
|
||||
func (pb *pipelineBuilder) New(
|
||||
receivers map[string][]notify.Integration,
|
||||
wait func() time.Duration,
|
||||
inhibitor *inhibit.Inhibitor,
|
||||
silencer *silence.Silencer,
|
||||
intervener *timeinterval.Intervener,
|
||||
marker types.GroupMarker,
|
||||
notificationLog notify.NotificationLog,
|
||||
peer notify.Peer,
|
||||
) notify.RoutingStage {
|
||||
rs := make(notify.RoutingStage, len(receivers))
|
||||
|
||||
ms := notify.NewGossipSettleStage(peer)
|
||||
is := notify.NewMuteStage(inhibitor, pb.metrics)
|
||||
tas := notify.NewTimeActiveStage(intervener, marker, pb.metrics)
|
||||
tms := notify.NewTimeMuteStage(intervener, marker, pb.metrics)
|
||||
ss := notify.NewMuteStage(silencer, pb.metrics)
|
||||
|
||||
var mms *maintenanceMuteStage
|
||||
if pb.muter != nil {
|
||||
mms = &maintenanceMuteStage{muter: pb.muter}
|
||||
}
|
||||
|
||||
for name := range receivers {
|
||||
stages := notify.MultiStage{ms, is, tas, tms, ss}
|
||||
if mms != nil {
|
||||
stages = append(stages, mms)
|
||||
}
|
||||
stages = append(stages, buildReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics))
|
||||
rs[name] = stages
|
||||
}
|
||||
|
||||
pb.metrics.InitializeFor(receivers)
|
||||
return rs
|
||||
}
|
||||
|
||||
// buildReceiverStage is a copy of notify.createReceiverStage (unexported upstream).
|
||||
func buildReceiverStage(
|
||||
name string,
|
||||
integrations []notify.Integration,
|
||||
wait func() time.Duration,
|
||||
notificationLog notify.NotificationLog,
|
||||
metrics *notify.Metrics,
|
||||
) notify.Stage {
|
||||
var fs notify.FanoutStage
|
||||
for i := range integrations {
|
||||
recv := &nflogpb.Receiver{
|
||||
GroupName: name,
|
||||
Integration: integrations[i].Name(),
|
||||
Idx: uint32(integrations[i].Index()),
|
||||
}
|
||||
var s notify.MultiStage
|
||||
s = append(s, notify.NewWaitStage(wait))
|
||||
s = append(s, notify.NewDedupStage(&integrations[i], notificationLog, recv))
|
||||
s = append(s, notify.NewRetryStage(integrations[i], name, metrics))
|
||||
s = append(s, notify.NewSetNotifiesStage(notificationLog, recv))
|
||||
fs = append(fs, s)
|
||||
}
|
||||
return fs
|
||||
}
|
||||
@@ -26,13 +26,14 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/alertmanager/nfmanager"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
|
||||
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
|
||||
)
|
||||
|
||||
// This is not a real snapshot file and will never be used. We need this placeholder to ensure maintenance runs on shutdown.
|
||||
// See https://github.com/prometheus/alertmanager/blob/3ee2cd0f1271e277295c02b6160507b4d193dde2/silence/silence.go#L435-L438
|
||||
// and https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/nflog/nflog.go#L362.
|
||||
var snapfnoop string = "snapfnoop"
|
||||
var (
|
||||
// This is not a real file and will never be used. We need this placeholder to ensure maintenance runs on shutdown. See
|
||||
// https://github.com/prometheus/server/blob/3ee2cd0f1271e277295c02b6160507b4d193dde2/silence/silence.go#L435-L438
|
||||
// and https://github.com/prometheus/server/blob/3b06b97af4d146e141af92885a185891eb79a5b0/nflog/nflog.go#L362.
|
||||
snapfnoop string = "snapfnoop"
|
||||
)
|
||||
|
||||
type Server struct {
|
||||
// logger is the logger for the alertmanager
|
||||
@@ -62,7 +63,7 @@ type Server struct {
|
||||
silencer *silence.Silencer
|
||||
silences *silence.Silences
|
||||
timeIntervals map[string][]timeinterval.TimeInterval
|
||||
pipelineBuilder *pipelineBuilder
|
||||
pipelineBuilder *notify.PipelineBuilder
|
||||
marker *alertmanagertypes.MemMarker
|
||||
tmpl *template.Template
|
||||
wg sync.WaitGroup
|
||||
@@ -70,16 +71,7 @@ type Server struct {
|
||||
notificationManager nfmanager.NotificationManager
|
||||
}
|
||||
|
||||
func New(
|
||||
ctx context.Context,
|
||||
logger *slog.Logger,
|
||||
registry prometheus.Registerer,
|
||||
srvConfig Config,
|
||||
orgID string,
|
||||
stateStore alertmanagertypes.StateStore,
|
||||
nfManager nfmanager.NotificationManager,
|
||||
maintenanceStore ruletypes.MaintenanceStore,
|
||||
) (*Server, error) {
|
||||
func New(ctx context.Context, logger *slog.Logger, registry prometheus.Registerer, srvConfig Config, orgID string, stateStore alertmanagertypes.StateStore, nfManager nfmanager.NotificationManager) (*Server, error) {
|
||||
server := &Server{
|
||||
logger: logger.With(slog.String("pkg", "go.signoz.io/pkg/alertmanager/alertmanagerserver")),
|
||||
registry: registry,
|
||||
@@ -168,6 +160,7 @@ func New(
|
||||
|
||||
return c, server.stateStore.Set(ctx, storableSilences)
|
||||
})
|
||||
|
||||
}()
|
||||
|
||||
// Start maintenance for notification logs
|
||||
@@ -203,11 +196,7 @@ func New(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var muter *MaintenanceMuter
|
||||
if maintenanceStore != nil {
|
||||
muter = NewMaintenanceMuter(maintenanceStore, orgID, server.logger)
|
||||
}
|
||||
server.pipelineBuilder = newPipelineBuilder(signozRegisterer, featurecontrol.NoopFlags{}, muter)
|
||||
server.pipelineBuilder = notify.NewPipelineBuilder(signozRegisterer, featurecontrol.NoopFlags{})
|
||||
server.dispatcherMetrics = NewDispatcherMetrics(false, signozRegisterer)
|
||||
|
||||
return server, nil
|
||||
|
||||
@@ -90,7 +90,7 @@ func TestEndToEndAlertManagerFlow(t *testing.T) {
|
||||
stateStore := alertmanagertypestest.NewStateStore()
|
||||
registry := prometheus.NewRegistry()
|
||||
logger := slog.New(slog.DiscardHandler)
|
||||
server, err := New(context.Background(), logger, registry, srvCfg, orgID, stateStore, notificationManager, nil)
|
||||
server, err := New(context.Background(), logger, registry, srvCfg, orgID, stateStore, notificationManager)
|
||||
require.NoError(t, err)
|
||||
amConfig, err := alertmanagertypes.NewDefaultConfig(srvCfg.Global, srvCfg.Route, orgID)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -25,7 +25,7 @@ import (
|
||||
|
||||
func TestServerSetConfigAndStop(t *testing.T) {
|
||||
notificationManager := nfmanagertest.NewMock()
|
||||
server, err := New(context.Background(), slog.New(slog.DiscardHandler), prometheus.NewRegistry(), NewConfig(), "1", alertmanagertypestest.NewStateStore(), notificationManager, nil)
|
||||
server, err := New(context.Background(), slog.New(slog.DiscardHandler), prometheus.NewRegistry(), NewConfig(), "1", alertmanagertypestest.NewStateStore(), notificationManager)
|
||||
require.NoError(t, err)
|
||||
|
||||
amConfig, err := alertmanagertypes.NewDefaultConfig(alertmanagertypes.GlobalConfig{}, alertmanagertypes.RouteConfig{GroupInterval: 1 * time.Minute, RepeatInterval: 1 * time.Minute, GroupWait: 1 * time.Minute}, "1")
|
||||
@@ -37,7 +37,7 @@ func TestServerSetConfigAndStop(t *testing.T) {
|
||||
|
||||
func TestServerTestReceiverTypeWebhook(t *testing.T) {
|
||||
notificationManager := nfmanagertest.NewMock()
|
||||
server, err := New(context.Background(), slog.New(slog.DiscardHandler), prometheus.NewRegistry(), NewConfig(), "1", alertmanagertypestest.NewStateStore(), notificationManager, nil)
|
||||
server, err := New(context.Background(), slog.New(slog.DiscardHandler), prometheus.NewRegistry(), NewConfig(), "1", alertmanagertypestest.NewStateStore(), notificationManager)
|
||||
require.NoError(t, err)
|
||||
|
||||
amConfig, err := alertmanagertypes.NewDefaultConfig(alertmanagertypes.GlobalConfig{}, alertmanagertypes.RouteConfig{GroupInterval: 1 * time.Minute, RepeatInterval: 1 * time.Minute, GroupWait: 1 * time.Minute}, "1")
|
||||
@@ -85,7 +85,7 @@ func TestServerPutAlerts(t *testing.T) {
|
||||
srvCfg := NewConfig()
|
||||
srvCfg.Route.GroupInterval = 1 * time.Second
|
||||
notificationManager := nfmanagertest.NewMock()
|
||||
server, err := New(context.Background(), slog.New(slog.DiscardHandler), prometheus.NewRegistry(), srvCfg, "1", stateStore, notificationManager, nil)
|
||||
server, err := New(context.Background(), slog.New(slog.DiscardHandler), prometheus.NewRegistry(), srvCfg, "1", stateStore, notificationManager)
|
||||
require.NoError(t, err)
|
||||
|
||||
amConfig, err := alertmanagertypes.NewDefaultConfig(srvCfg.Global, srvCfg.Route, "1")
|
||||
@@ -133,7 +133,7 @@ func TestServerTestAlert(t *testing.T) {
|
||||
srvCfg := NewConfig()
|
||||
srvCfg.Route.GroupInterval = 1 * time.Second
|
||||
notificationManager := nfmanagertest.NewMock()
|
||||
server, err := New(context.Background(), slog.New(slog.DiscardHandler), prometheus.NewRegistry(), srvCfg, "1", stateStore, notificationManager, nil)
|
||||
server, err := New(context.Background(), slog.New(slog.DiscardHandler), prometheus.NewRegistry(), srvCfg, "1", stateStore, notificationManager)
|
||||
require.NoError(t, err)
|
||||
|
||||
amConfig, err := alertmanagertypes.NewDefaultConfig(srvCfg.Global, srvCfg.Route, "1")
|
||||
@@ -238,7 +238,7 @@ func TestServerTestAlertContinuesOnFailure(t *testing.T) {
|
||||
srvCfg := NewConfig()
|
||||
srvCfg.Route.GroupInterval = 1 * time.Second
|
||||
notificationManager := nfmanagertest.NewMock()
|
||||
server, err := New(context.Background(), slog.New(slog.DiscardHandler), prometheus.NewRegistry(), srvCfg, "1", stateStore, notificationManager, nil)
|
||||
server, err := New(context.Background(), slog.New(slog.DiscardHandler), prometheus.NewRegistry(), srvCfg, "1", stateStore, notificationManager)
|
||||
require.NoError(t, err)
|
||||
|
||||
amConfig, err := alertmanagertypes.NewDefaultConfig(srvCfg.Global, srvCfg.Route, "1")
|
||||
|
||||
@@ -14,7 +14,6 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
|
||||
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
|
||||
)
|
||||
|
||||
type Service struct {
|
||||
@@ -40,18 +39,16 @@ type Service struct {
|
||||
serversMtx sync.RWMutex
|
||||
|
||||
notificationManager nfmanager.NotificationManager
|
||||
|
||||
maintenanceStore ruletypes.MaintenanceStore
|
||||
}
|
||||
|
||||
func New(
|
||||
ctx context.Context,
|
||||
settings factory.ScopedProviderSettings,
|
||||
config alertmanagerserver.Config,
|
||||
stateStore alertmanagertypes.StateStore,
|
||||
configStore alertmanagertypes.ConfigStore,
|
||||
orgGetter organization.Getter,
|
||||
nfManager nfmanager.NotificationManager,
|
||||
maintenanceStore ruletypes.MaintenanceStore,
|
||||
) *Service {
|
||||
service := &Service{
|
||||
config: config,
|
||||
@@ -62,7 +59,6 @@ func New(
|
||||
servers: make(map[string]*alertmanagerserver.Server),
|
||||
serversMtx: sync.RWMutex{},
|
||||
notificationManager: nfManager,
|
||||
maintenanceStore: maintenanceStore,
|
||||
}
|
||||
|
||||
return service
|
||||
@@ -181,10 +177,7 @@ func (service *Service) newServer(ctx context.Context, orgID string) (*alertmana
|
||||
return nil, err
|
||||
}
|
||||
|
||||
server, err := alertmanagerserver.New(
|
||||
ctx, service.settings.Logger(), service.settings.PrometheusRegisterer(), service.config, orgID,
|
||||
service.stateStore, service.notificationManager, service.maintenanceStore,
|
||||
)
|
||||
server, err := alertmanagerserver.New(ctx, service.settings.Logger(), service.settings.PrometheusRegisterer(), service.config, orgID, service.stateStore, service.notificationManager)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -20,7 +20,6 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
@@ -31,49 +30,35 @@ type provider struct {
|
||||
configStore alertmanagertypes.ConfigStore
|
||||
stateStore alertmanagertypes.StateStore
|
||||
notificationManager nfmanager.NotificationManager
|
||||
maintenanceStore ruletypes.MaintenanceStore
|
||||
stopC chan struct{}
|
||||
}
|
||||
|
||||
func NewFactory(
|
||||
sqlstore sqlstore.SQLStore,
|
||||
orgGetter organization.Getter,
|
||||
notificationManager nfmanager.NotificationManager,
|
||||
maintenanceStore ruletypes.MaintenanceStore,
|
||||
) factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config] {
|
||||
func NewFactory(sqlstore sqlstore.SQLStore, orgGetter organization.Getter, notificationManager nfmanager.NotificationManager) factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("signoz"), func(ctx context.Context, settings factory.ProviderSettings, config alertmanager.Config) (alertmanager.Alertmanager, error) {
|
||||
return New(settings, config, sqlstore, orgGetter, notificationManager, maintenanceStore)
|
||||
return New(ctx, settings, config, sqlstore, orgGetter, notificationManager)
|
||||
})
|
||||
}
|
||||
|
||||
func New(
|
||||
providerSettings factory.ProviderSettings,
|
||||
config alertmanager.Config,
|
||||
sqlstore sqlstore.SQLStore,
|
||||
orgGetter organization.Getter,
|
||||
notificationManager nfmanager.NotificationManager,
|
||||
maintenanceStore ruletypes.MaintenanceStore,
|
||||
) (*provider, error) {
|
||||
func New(ctx context.Context, providerSettings factory.ProviderSettings, config alertmanager.Config, sqlstore sqlstore.SQLStore, orgGetter organization.Getter, notificationManager nfmanager.NotificationManager) (*provider, error) {
|
||||
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/alertmanager/signozalertmanager")
|
||||
configStore := sqlalertmanagerstore.NewConfigStore(sqlstore)
|
||||
stateStore := sqlalertmanagerstore.NewStateStore(sqlstore)
|
||||
|
||||
p := &provider{
|
||||
service: alertmanager.New(
|
||||
ctx,
|
||||
settings,
|
||||
config.Signoz.Config,
|
||||
stateStore,
|
||||
configStore,
|
||||
orgGetter,
|
||||
notificationManager,
|
||||
maintenanceStore,
|
||||
),
|
||||
settings: settings,
|
||||
config: config,
|
||||
configStore: configStore,
|
||||
stateStore: stateStore,
|
||||
notificationManager: notificationManager,
|
||||
maintenanceStore: maintenanceStore,
|
||||
stopC: make(chan struct{}),
|
||||
}
|
||||
|
||||
|
||||
@@ -138,7 +138,14 @@ func (m *module) listMeterMetrics(ctx context.Context, params *metricsexplorerty
|
||||
|
||||
func (m *module) listMetrics(ctx context.Context, orgID valuer.UUID, params *metricsexplorertypes.ListMetricsParams) (*metricsexplorertypes.ListMetricsResponse, error) {
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
sb.Select("DISTINCT metric_name")
|
||||
sb.Select(
|
||||
"metric_name",
|
||||
"anyLast(description) AS description",
|
||||
"anyLast(type) AS metric_type",
|
||||
"argMax(unit, unix_milli) AS metric_unit",
|
||||
"anyLast(temporality) AS temporality",
|
||||
"anyLast(is_monotonic) AS is_monotonic",
|
||||
)
|
||||
|
||||
if params.Start != nil && params.End != nil {
|
||||
start, end, distributedTsTable, _ := telemetrymetrics.WhichTSTableToUse(uint64(*params.Start), uint64(*params.End), nil)
|
||||
@@ -157,6 +164,7 @@ func (m *module) listMetrics(ctx context.Context, orgID valuer.UUID, params *met
|
||||
sb.Where(sb.Like("lower(metric_name)", fmt.Sprintf("%%%s%%", searchLower)))
|
||||
}
|
||||
|
||||
sb.GroupBy("metric_name")
|
||||
sb.OrderBy("metric_name ASC")
|
||||
sb.Limit(params.Limit)
|
||||
|
||||
@@ -170,43 +178,47 @@ func (m *module) listMetrics(ctx context.Context, orgID valuer.UUID, params *met
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
metricNames := make([]string, 0)
|
||||
var metrics []metricsexplorertypes.ListMetric
|
||||
var metricNames []string
|
||||
for rows.Next() {
|
||||
var name string
|
||||
if err := rows.Scan(&name); err != nil {
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to scan metric name")
|
||||
var metric metricsexplorertypes.ListMetric
|
||||
if err := rows.Scan(
|
||||
&metric.MetricName,
|
||||
&metric.Description,
|
||||
&metric.MetricType,
|
||||
&metric.MetricUnit,
|
||||
&metric.Temporality,
|
||||
&metric.IsMonotonic,
|
||||
); err != nil {
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to scan metric")
|
||||
}
|
||||
metricNames = append(metricNames, name)
|
||||
metrics = append(metrics, metric)
|
||||
metricNames = append(metricNames, metric.MetricName)
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "error iterating metric names")
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "error iterating metrics")
|
||||
}
|
||||
|
||||
if len(metricNames) == 0 {
|
||||
if len(metrics) == 0 {
|
||||
return &metricsexplorertypes.ListMetricsResponse{
|
||||
Metrics: []metricsexplorertypes.ListMetric{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
metadata, err := m.GetMetricMetadataMulti(ctx, orgID, metricNames)
|
||||
// Overlay any user-updated metadata on top of the timeseries metadata.
|
||||
updatedMetadata, err := m.fetchUpdatedMetadata(ctx, orgID, metricNames)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
metrics := make([]metricsexplorertypes.ListMetric, 0, len(metricNames))
|
||||
for _, name := range metricNames {
|
||||
metric := metricsexplorertypes.ListMetric{
|
||||
MetricName: name,
|
||||
for i := range metrics {
|
||||
if meta, ok := updatedMetadata[metrics[i].MetricName]; ok && meta != nil {
|
||||
metrics[i].Description = meta.Description
|
||||
metrics[i].MetricType = meta.MetricType
|
||||
metrics[i].MetricUnit = meta.MetricUnit
|
||||
metrics[i].Temporality = meta.Temporality
|
||||
metrics[i].IsMonotonic = meta.IsMonotonic
|
||||
}
|
||||
if meta, ok := metadata[name]; ok && meta != nil {
|
||||
metric.Description = meta.Description
|
||||
metric.MetricType = meta.MetricType
|
||||
metric.MetricUnit = meta.MetricUnit
|
||||
metric.Temporality = meta.Temporality
|
||||
metric.IsMonotonic = meta.IsMonotonic
|
||||
}
|
||||
metrics = append(metrics, metric)
|
||||
}
|
||||
|
||||
return &metricsexplorertypes.ListMetricsResponse{
|
||||
@@ -243,19 +255,18 @@ func (m *module) GetStats(ctx context.Context, orgID valuer.UUID, req *metricsex
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Get metadata for all metrics
|
||||
// Overlay any user-updated metadata on top of the timeseries metadata
|
||||
// that was already fetched in the combined query.
|
||||
metricNames := make([]string, len(metricStats))
|
||||
for i := range metricStats {
|
||||
metricNames[i] = metricStats[i].MetricName
|
||||
}
|
||||
|
||||
metadata, err := m.GetMetricMetadataMulti(ctx, orgID, metricNames)
|
||||
updatedMetadata, err := m.fetchUpdatedMetadata(ctx, orgID, metricNames)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Enrich stats with metadata
|
||||
enrichStatsWithMetadata(metricStats, metadata)
|
||||
enrichStatsWithMetadata(metricStats, updatedMetadata)
|
||||
|
||||
return &metricsexplorertypes.StatsResponse{
|
||||
Metrics: metricStats,
|
||||
@@ -984,11 +995,14 @@ func (m *module) fetchMetricsStatsWithSamples(
|
||||
samplesTable := telemetrymetrics.WhichSamplesTableToUse(uint64(req.Start), uint64(req.End), metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, nil)
|
||||
countExp := telemetrymetrics.CountExpressionForSamplesTable(samplesTable)
|
||||
|
||||
// Timeseries counts per metric
|
||||
// Timeseries counts and metadata per metric.
|
||||
tsSB := sqlbuilder.NewSelectBuilder()
|
||||
tsSB.Select(
|
||||
"metric_name",
|
||||
"uniq(fingerprint) AS timeseries",
|
||||
"anyLast(description) AS description",
|
||||
"anyLast(type) AS metric_type",
|
||||
"argMax(unit, unix_milli) AS metric_unit",
|
||||
)
|
||||
tsSB.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, distributedTsTable))
|
||||
tsSB.Where(tsSB.Between("unix_milli", start, end))
|
||||
@@ -1036,6 +1050,9 @@ func (m *module) fetchMetricsStatsWithSamples(
|
||||
"COALESCE(ts.timeseries, 0) AS timeseries",
|
||||
"COALESCE(s.samples, 0) AS samples",
|
||||
"COUNT(*) OVER() AS total",
|
||||
"ts.description AS description",
|
||||
"ts.metric_type AS metric_type",
|
||||
"ts.metric_unit AS metric_unit",
|
||||
)
|
||||
finalSB.From("__time_series_counts ts")
|
||||
finalSB.JoinWithOption(sqlbuilder.FullOuterJoin, "__sample_counts s", "ts.metric_name = s.metric_name")
|
||||
@@ -1071,7 +1088,7 @@ func (m *module) fetchMetricsStatsWithSamples(
|
||||
metricStat metricsexplorertypes.Stat
|
||||
rowTotal uint64
|
||||
)
|
||||
if err := rows.Scan(&metricStat.MetricName, &metricStat.TimeSeries, &metricStat.Samples, &rowTotal); err != nil {
|
||||
if err := rows.Scan(&metricStat.MetricName, &metricStat.TimeSeries, &metricStat.Samples, &rowTotal, &metricStat.Description, &metricStat.MetricType, &metricStat.MetricUnit); err != nil {
|
||||
return nil, 0, errors.WrapInternalf(err, errors.CodeInternal, "failed to scan metrics stats row")
|
||||
}
|
||||
metricStats = append(metricStats, metricStat)
|
||||
|
||||
@@ -32,10 +32,11 @@ import (
|
||||
)
|
||||
|
||||
type PrepareTaskOptions struct {
|
||||
Rule *ruletypes.PostableRule
|
||||
TaskName string
|
||||
RuleStore ruletypes.RuleStore
|
||||
Querier querier.Querier
|
||||
Rule *ruletypes.PostableRule
|
||||
TaskName string
|
||||
RuleStore ruletypes.RuleStore
|
||||
MaintenanceStore ruletypes.MaintenanceStore
|
||||
Querier querier.Querier
|
||||
Logger *slog.Logger
|
||||
Cache cache.Cache
|
||||
ManagerOpts *ManagerOptions
|
||||
@@ -45,9 +46,10 @@ type PrepareTaskOptions struct {
|
||||
}
|
||||
|
||||
type PrepareTestRuleOptions struct {
|
||||
Rule *ruletypes.PostableRule
|
||||
RuleStore ruletypes.RuleStore
|
||||
Querier querier.Querier
|
||||
Rule *ruletypes.PostableRule
|
||||
RuleStore ruletypes.RuleStore
|
||||
MaintenanceStore ruletypes.MaintenanceStore
|
||||
Querier querier.Querier
|
||||
Logger *slog.Logger
|
||||
Cache cache.Cache
|
||||
ManagerOpts *ManagerOptions
|
||||
@@ -165,7 +167,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
|
||||
rules = append(rules, tr)
|
||||
|
||||
// create ch rule task for evaluation
|
||||
task = newTask(TaskTypeCh, opts.TaskName, taskNameSuffix, evaluation.GetFrequency().Duration(), rules, opts.ManagerOpts, opts.NotifyFunc, opts.OrgID)
|
||||
task = newTask(TaskTypeCh, opts.TaskName, taskNameSuffix, evaluation.GetFrequency().Duration(), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
|
||||
|
||||
} else if opts.Rule.RuleType == ruletypes.RuleTypeProm {
|
||||
|
||||
@@ -189,7 +191,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
|
||||
rules = append(rules, pr)
|
||||
|
||||
// create promql rule task for evaluation
|
||||
task = newTask(TaskTypeProm, opts.TaskName, taskNameSuffix, evaluation.GetFrequency().Duration(), rules, opts.ManagerOpts, opts.NotifyFunc, opts.OrgID)
|
||||
task = newTask(TaskTypeProm, opts.TaskName, taskNameSuffix, evaluation.GetFrequency().Duration(), rules, opts.ManagerOpts, opts.NotifyFunc, opts.MaintenanceStore, opts.OrgID)
|
||||
|
||||
} else {
|
||||
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported rule type %s. Supported types: %s, %s", opts.Rule.RuleType, ruletypes.RuleTypeProm, ruletypes.RuleTypeThreshold)
|
||||
@@ -430,8 +432,9 @@ func (m *Manager) editTask(_ context.Context, orgID valuer.UUID, rule *ruletypes
|
||||
newTask, err := m.prepareTaskFunc(PrepareTaskOptions{
|
||||
Rule: rule,
|
||||
TaskName: taskName,
|
||||
RuleStore: m.ruleStore,
|
||||
Querier: m.opts.Querier,
|
||||
RuleStore: m.ruleStore,
|
||||
MaintenanceStore: m.maintenanceStore,
|
||||
Querier: m.opts.Querier,
|
||||
Logger: m.opts.Logger,
|
||||
Cache: m.cache,
|
||||
ManagerOpts: m.opts,
|
||||
@@ -642,8 +645,9 @@ func (m *Manager) addTask(_ context.Context, orgID valuer.UUID, rule *ruletypes.
|
||||
newTask, err := m.prepareTaskFunc(PrepareTaskOptions{
|
||||
Rule: rule,
|
||||
TaskName: taskName,
|
||||
RuleStore: m.ruleStore,
|
||||
Querier: m.opts.Querier,
|
||||
RuleStore: m.ruleStore,
|
||||
MaintenanceStore: m.maintenanceStore,
|
||||
Querier: m.opts.Querier,
|
||||
Logger: m.opts.Logger,
|
||||
Cache: m.cache,
|
||||
ManagerOpts: m.opts,
|
||||
@@ -1026,8 +1030,9 @@ func (m *Manager) TestNotification(ctx context.Context, orgID valuer.UUID, ruleS
|
||||
|
||||
alertCount, err := m.prepareTestRuleFunc(PrepareTestRuleOptions{
|
||||
Rule: &parsedRule,
|
||||
RuleStore: m.ruleStore,
|
||||
Querier: m.opts.Querier,
|
||||
RuleStore: m.ruleStore,
|
||||
MaintenanceStore: m.maintenanceStore,
|
||||
Querier: m.opts.Querier,
|
||||
Logger: m.opts.Logger,
|
||||
Cache: m.cache,
|
||||
ManagerOpts: m.opts,
|
||||
|
||||
@@ -40,12 +40,13 @@ type PromRuleTask struct {
|
||||
logger *slog.Logger
|
||||
notify NotifyFunc
|
||||
|
||||
orgID valuer.UUID
|
||||
maintenanceStore ruletypes.MaintenanceStore
|
||||
orgID valuer.UUID
|
||||
}
|
||||
|
||||
// NewPromRuleTask holds rules that have promql condition
|
||||
// and evaluates the rule at a given frequency
|
||||
func NewPromRuleTask(name, file string, frequency time.Duration, rules []Rule, opts *ManagerOptions, notify NotifyFunc, orgID valuer.UUID) *PromRuleTask {
|
||||
func NewPromRuleTask(name, file string, frequency time.Duration, rules []Rule, opts *ManagerOptions, notify NotifyFunc, maintenanceStore ruletypes.MaintenanceStore, orgID valuer.UUID) *PromRuleTask {
|
||||
opts.Logger.Info("initiating a new rule group", "name", name, "frequency", frequency)
|
||||
|
||||
if frequency == 0 {
|
||||
@@ -62,9 +63,10 @@ func NewPromRuleTask(name, file string, frequency time.Duration, rules []Rule, o
|
||||
seriesInPreviousEval: make([]map[string]plabels.Labels, len(rules)),
|
||||
done: make(chan struct{}),
|
||||
terminated: make(chan struct{}),
|
||||
notify: notify,
|
||||
logger: opts.Logger,
|
||||
orgID: orgID,
|
||||
notify: notify,
|
||||
maintenanceStore: maintenanceStore,
|
||||
logger: opts.Logger,
|
||||
orgID: orgID,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -328,12 +330,30 @@ func (g *PromRuleTask) Eval(ctx context.Context, ts time.Time) {
|
||||
}()
|
||||
|
||||
g.logger.InfoContext(ctx, "promql rule task", "name", g.name, "eval_started_at", ts)
|
||||
maintenance, err := g.maintenanceStore.ListPlannedMaintenance(ctx, g.orgID.StringValue())
|
||||
if err != nil {
|
||||
g.logger.ErrorContext(ctx, "error in processing sql query", errors.Attr(err))
|
||||
}
|
||||
|
||||
for i, rule := range g.rules {
|
||||
if rule == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
shouldSkip := false
|
||||
for _, m := range maintenance {
|
||||
g.logger.InfoContext(ctx, "checking if rule should be skipped", slog.String("rule.id", rule.ID()), slog.Any("maintenance", m))
|
||||
if m.ShouldSkip(rule.ID(), ts) {
|
||||
shouldSkip = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if shouldSkip {
|
||||
g.logger.InfoContext(ctx, "rule should be skipped", slog.String("rule.id", rule.ID()))
|
||||
continue
|
||||
}
|
||||
|
||||
select {
|
||||
case <-g.done:
|
||||
return
|
||||
|
||||
@@ -38,13 +38,14 @@ type RuleTask struct {
|
||||
pause bool
|
||||
notify NotifyFunc
|
||||
|
||||
orgID valuer.UUID
|
||||
maintenanceStore ruletypes.MaintenanceStore
|
||||
orgID valuer.UUID
|
||||
}
|
||||
|
||||
const DefaultFrequency = 1 * time.Minute
|
||||
|
||||
// NewRuleTask makes a new RuleTask with the given name, options, and rules.
|
||||
func NewRuleTask(name, file string, frequency time.Duration, rules []Rule, opts *ManagerOptions, notify NotifyFunc, orgID valuer.UUID) *RuleTask {
|
||||
func NewRuleTask(name, file string, frequency time.Duration, rules []Rule, opts *ManagerOptions, notify NotifyFunc, maintenanceStore ruletypes.MaintenanceStore, orgID valuer.UUID) *RuleTask {
|
||||
|
||||
if frequency == 0 {
|
||||
frequency = DefaultFrequency
|
||||
@@ -61,8 +62,9 @@ func NewRuleTask(name, file string, frequency time.Duration, rules []Rule, opts
|
||||
logger: opts.Logger,
|
||||
done: make(chan struct{}),
|
||||
terminated: make(chan struct{}),
|
||||
notify: notify,
|
||||
orgID: orgID,
|
||||
notify: notify,
|
||||
maintenanceStore: maintenanceStore,
|
||||
orgID: orgID,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -316,11 +318,31 @@ func (g *RuleTask) Eval(ctx context.Context, ts time.Time) {
|
||||
|
||||
g.logger.DebugContext(ctx, "rule task eval started", "name", g.name, "start_time", ts)
|
||||
|
||||
maintenance, err := g.maintenanceStore.ListPlannedMaintenance(ctx, g.orgID.StringValue())
|
||||
|
||||
if err != nil {
|
||||
g.logger.ErrorContext(ctx, "error in processing sql query", errors.Attr(err))
|
||||
}
|
||||
|
||||
for i, rule := range g.rules {
|
||||
if rule == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
shouldSkip := false
|
||||
for _, m := range maintenance {
|
||||
g.logger.InfoContext(ctx, "checking if rule should be skipped", slog.String("rule.id", rule.ID()), slog.Any("maintenance", m))
|
||||
if m.ShouldSkip(rule.ID(), ts) {
|
||||
shouldSkip = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if shouldSkip {
|
||||
g.logger.InfoContext(ctx, "rule should be skipped", slog.String("rule.id", rule.ID()))
|
||||
continue
|
||||
}
|
||||
|
||||
select {
|
||||
case <-g.done:
|
||||
return
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/ruletypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
@@ -31,9 +32,9 @@ type Task interface {
|
||||
|
||||
// newTask returns an appropriate group for
|
||||
// rule type
|
||||
func newTask(taskType TaskType, name, file string, frequency time.Duration, rules []Rule, opts *ManagerOptions, notify NotifyFunc, orgID valuer.UUID) Task {
|
||||
func newTask(taskType TaskType, name, file string, frequency time.Duration, rules []Rule, opts *ManagerOptions, notify NotifyFunc, maintenanceStore ruletypes.MaintenanceStore, orgID valuer.UUID) Task {
|
||||
if taskType == TaskTypeCh {
|
||||
return NewRuleTask(name, file, frequency, rules, opts, notify, orgID)
|
||||
return NewRuleTask(name, file, frequency, rules, opts, notify, maintenanceStore, orgID)
|
||||
}
|
||||
return NewPromRuleTask(name, file, frequency, rules, opts, notify, orgID)
|
||||
return NewPromRuleTask(name, file, frequency, rules, opts, notify, maintenanceStore, orgID)
|
||||
}
|
||||
|
||||
@@ -19,7 +19,6 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/modules/user/impluser"
|
||||
"github.com/SigNoz/signoz/pkg/querier"
|
||||
"github.com/SigNoz/signoz/pkg/queryparser"
|
||||
"github.com/SigNoz/signoz/pkg/ruler/rulestore/sqlrulestore"
|
||||
"github.com/SigNoz/signoz/pkg/sharder"
|
||||
"github.com/SigNoz/signoz/pkg/sharder/noopsharder"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
@@ -39,8 +38,7 @@ func TestNewHandlers(t *testing.T) {
|
||||
orgGetter := implorganization.NewGetter(implorganization.NewStore(sqlstore), sharder)
|
||||
notificationManager := nfmanagertest.NewMock()
|
||||
require.NoError(t, err)
|
||||
maintenanceStore := sqlrulestore.NewMaintenanceStore(sqlstore)
|
||||
alertmanager, err := signozalertmanager.New(providerSettings, alertmanager.Config{}, sqlstore, orgGetter, notificationManager, maintenanceStore)
|
||||
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{}, sqlstore, orgGetter, notificationManager)
|
||||
require.NoError(t, err)
|
||||
tokenizer := tokenizertest.NewMockTokenizer(t)
|
||||
emailing := emailingtest.New()
|
||||
|
||||
@@ -20,7 +20,6 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/modules/serviceaccount/implserviceaccount"
|
||||
"github.com/SigNoz/signoz/pkg/modules/user/impluser"
|
||||
"github.com/SigNoz/signoz/pkg/queryparser"
|
||||
"github.com/SigNoz/signoz/pkg/ruler/rulestore/sqlrulestore"
|
||||
"github.com/SigNoz/signoz/pkg/sharder"
|
||||
"github.com/SigNoz/signoz/pkg/sharder/noopsharder"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
@@ -40,8 +39,7 @@ func TestNewModules(t *testing.T) {
|
||||
orgGetter := implorganization.NewGetter(implorganization.NewStore(sqlstore), sharder)
|
||||
notificationManager := nfmanagertest.NewMock()
|
||||
require.NoError(t, err)
|
||||
maintenanceStore := sqlrulestore.NewMaintenanceStore(sqlstore)
|
||||
alertmanager, err := signozalertmanager.New(providerSettings, alertmanager.Config{}, sqlstore, orgGetter, notificationManager, maintenanceStore)
|
||||
alertmanager, err := signozalertmanager.New(context.TODO(), providerSettings, alertmanager.Config{}, sqlstore, orgGetter, notificationManager)
|
||||
require.NoError(t, err)
|
||||
tokenizer := tokenizertest.NewMockTokenizer(t)
|
||||
emailing := emailingtest.New()
|
||||
|
||||
@@ -65,7 +65,6 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/tokenizer/tokenizerstore/sqltokenizerstore"
|
||||
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/featuretypes"
|
||||
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
|
||||
"github.com/SigNoz/signoz/pkg/version"
|
||||
"github.com/SigNoz/signoz/pkg/web"
|
||||
"github.com/SigNoz/signoz/pkg/web/noopweb"
|
||||
@@ -222,14 +221,9 @@ func NewNotificationManagerProviderFactories(routeStore alertmanagertypes.RouteS
|
||||
)
|
||||
}
|
||||
|
||||
func NewAlertmanagerProviderFactories(
|
||||
sqlstore sqlstore.SQLStore,
|
||||
orgGetter organization.Getter,
|
||||
nfManager nfmanager.NotificationManager,
|
||||
maintenanceStore ruletypes.MaintenanceStore,
|
||||
) factory.NamedMap[factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config]] {
|
||||
func NewAlertmanagerProviderFactories(sqlstore sqlstore.SQLStore, orgGetter organization.Getter, nfManager nfmanager.NotificationManager) factory.NamedMap[factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config]] {
|
||||
return factory.MustNewNamedMap(
|
||||
signozalertmanager.NewFactory(sqlstore, orgGetter, nfManager, maintenanceStore),
|
||||
signozalertmanager.NewFactory(sqlstore, orgGetter, nfManager),
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -14,7 +14,6 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/modules/user/impluser"
|
||||
"github.com/SigNoz/signoz/pkg/sqlschema"
|
||||
"github.com/SigNoz/signoz/pkg/sqlschema/sqlschematest"
|
||||
"github.com/SigNoz/signoz/pkg/ruler/rulestore/sqlrulestore"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore/sqlstoretest"
|
||||
"github.com/SigNoz/signoz/pkg/statsreporter"
|
||||
@@ -60,11 +59,9 @@ func TestNewProviderFactories(t *testing.T) {
|
||||
})
|
||||
|
||||
assert.NotPanics(t, func() {
|
||||
store := sqlstoretest.New(sqlstore.Config{Provider: "sqlite"}, sqlmock.QueryMatcherEqual)
|
||||
orgGetter := implorganization.NewGetter(implorganization.NewStore(store), nil)
|
||||
orgGetter := implorganization.NewGetter(implorganization.NewStore(sqlstoretest.New(sqlstore.Config{Provider: "sqlite"}, sqlmock.QueryMatcherEqual)), nil)
|
||||
notificationManager := nfmanagertest.NewMock()
|
||||
maintenanceStore := sqlrulestore.NewMaintenanceStore(store)
|
||||
NewAlertmanagerProviderFactories(store, orgGetter, notificationManager, maintenanceStore)
|
||||
NewAlertmanagerProviderFactories(sqlstoretest.New(sqlstore.Config{Provider: "sqlite"}, sqlmock.QueryMatcherEqual), orgGetter, notificationManager)
|
||||
})
|
||||
|
||||
assert.NotPanics(t, func() {
|
||||
|
||||
@@ -34,7 +34,6 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/querier"
|
||||
"github.com/SigNoz/signoz/pkg/queryparser"
|
||||
"github.com/SigNoz/signoz/pkg/ruler"
|
||||
sqlrulestore "github.com/SigNoz/signoz/pkg/ruler/rulestore/sqlrulestore"
|
||||
"github.com/SigNoz/signoz/pkg/sharder"
|
||||
"github.com/SigNoz/signoz/pkg/sqlmigration"
|
||||
"github.com/SigNoz/signoz/pkg/sqlmigrator"
|
||||
@@ -363,14 +362,12 @@ func New(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
maintenanceStore := sqlrulestore.NewMaintenanceStore(sqlstore)
|
||||
|
||||
// Initialize alertmanager from the available alertmanager provider factories
|
||||
alertmanager, err := factory.NewProviderFromNamedMap(
|
||||
ctx,
|
||||
providerSettings,
|
||||
config.Alertmanager,
|
||||
NewAlertmanagerProviderFactories(sqlstore, orgGetter, nfManager, maintenanceStore),
|
||||
NewAlertmanagerProviderFactories(sqlstore, orgGetter, nfManager),
|
||||
config.Alertmanager.Provider,
|
||||
)
|
||||
if err != nil {
|
||||
|
||||
@@ -11,7 +11,9 @@ import (
|
||||
"github.com/uptrace/bun"
|
||||
)
|
||||
|
||||
var ErrCodeInvalidPlannedMaintenancePayload = errors.MustNewCode("invalid_planned_maintenance_payload")
|
||||
var (
|
||||
ErrCodeInvalidPlannedMaintenancePayload = errors.MustNewCode("invalid_planned_maintenance_payload")
|
||||
)
|
||||
|
||||
type MaintenanceStatus struct {
|
||||
valuer.String
|
||||
@@ -61,15 +63,15 @@ type StorablePlannedMaintenance struct {
|
||||
}
|
||||
|
||||
type PlannedMaintenance struct {
|
||||
ID valuer.UUID `json:"id" required:"true"`
|
||||
Name string `json:"name" required:"true"`
|
||||
Description string `json:"description"`
|
||||
Schedule *Schedule `json:"schedule" required:"true"`
|
||||
RuleIDs []string `json:"alertIds"`
|
||||
CreatedAt time.Time `json:"createdAt"`
|
||||
CreatedBy string `json:"createdBy"`
|
||||
UpdatedAt time.Time `json:"updatedAt"`
|
||||
UpdatedBy string `json:"updatedBy"`
|
||||
ID valuer.UUID `json:"id" required:"true"`
|
||||
Name string `json:"name" required:"true"`
|
||||
Description string `json:"description"`
|
||||
Schedule *Schedule `json:"schedule" required:"true"`
|
||||
RuleIDs []string `json:"alertIds"`
|
||||
CreatedAt time.Time `json:"createdAt"`
|
||||
CreatedBy string `json:"createdBy"`
|
||||
UpdatedAt time.Time `json:"updatedAt"`
|
||||
UpdatedBy string `json:"updatedBy"`
|
||||
Status MaintenanceStatus `json:"status" required:"true"`
|
||||
Kind MaintenanceKind `json:"kind" required:"true"`
|
||||
}
|
||||
@@ -159,11 +161,8 @@ func (m *PlannedMaintenance) ShouldSkip(ruleID string, now time.Time) bool {
|
||||
|
||||
currentTime := now.In(loc)
|
||||
|
||||
// fixed schedule — only when no recurrence is configured.
|
||||
// When recurrence is set, the recurring check below handles everything;
|
||||
// falling through here would cause the window to match the absolute
|
||||
// StartTime–EndTime range instead of the daily/weekly/monthly pattern.
|
||||
if m.Schedule.Recurrence == nil && !m.Schedule.StartTime.IsZero() && !m.Schedule.EndTime.IsZero() {
|
||||
// fixed schedule
|
||||
if !m.Schedule.StartTime.IsZero() && !m.Schedule.EndTime.IsZero() {
|
||||
startTime := m.Schedule.StartTime.In(loc)
|
||||
endTime := m.Schedule.EndTime.In(loc)
|
||||
if currentTime.Equal(startTime) || currentTime.Equal(endTime) ||
|
||||
@@ -334,11 +333,6 @@ func (m *PlannedMaintenance) Validate() error {
|
||||
}
|
||||
|
||||
if m.Schedule.Recurrence != nil {
|
||||
// Fixed range and recurrence are mutually exclusive.
|
||||
if !m.Schedule.StartTime.IsZero() && !m.Schedule.EndTime.IsZero() {
|
||||
return errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidPlannedMaintenancePayload,
|
||||
"cannot set both a fixed time range (startTime/endTime) and a recurrence; use one or the other")
|
||||
}
|
||||
if m.Schedule.Recurrence.RepeatType.IsZero() {
|
||||
return errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidPlannedMaintenancePayload, "missing repeat type in the payload")
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package ruletypes
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -14,6 +13,7 @@ func timePtr(t time.Time) *time.Time {
|
||||
}
|
||||
|
||||
func TestShouldSkipMaintenance(t *testing.T) {
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
maintenance *PlannedMaintenance
|
||||
@@ -499,7 +499,7 @@ func TestShouldSkipMaintenance(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
ts: time.Date(2024, 4, 1, 12, 10, 0, 0, time.UTC),
|
||||
ts: time.Date(2024, 04, 1, 12, 10, 0, 0, time.UTC),
|
||||
skip: true,
|
||||
},
|
||||
{
|
||||
@@ -508,14 +508,14 @@ func TestShouldSkipMaintenance(t *testing.T) {
|
||||
Schedule: &Schedule{
|
||||
Timezone: "UTC",
|
||||
Recurrence: &Recurrence{
|
||||
StartTime: time.Date(2024, 4, 1, 12, 0, 0, 0, time.UTC),
|
||||
StartTime: time.Date(2024, 04, 01, 12, 0, 0, 0, time.UTC),
|
||||
Duration: valuer.MustParseTextDuration("2h"),
|
||||
RepeatType: RepeatTypeWeekly,
|
||||
RepeatOn: []RepeatOn{RepeatOnMonday},
|
||||
},
|
||||
},
|
||||
},
|
||||
ts: time.Date(2024, 4, 15, 12, 10, 0, 0, time.UTC),
|
||||
ts: time.Date(2024, 04, 15, 12, 10, 0, 0, time.UTC),
|
||||
skip: true,
|
||||
},
|
||||
{
|
||||
@@ -524,14 +524,14 @@ func TestShouldSkipMaintenance(t *testing.T) {
|
||||
Schedule: &Schedule{
|
||||
Timezone: "UTC",
|
||||
Recurrence: &Recurrence{
|
||||
StartTime: time.Date(2024, 4, 1, 12, 0, 0, 0, time.UTC),
|
||||
StartTime: time.Date(2024, 04, 01, 12, 0, 0, 0, time.UTC),
|
||||
Duration: valuer.MustParseTextDuration("2h"),
|
||||
RepeatType: RepeatTypeWeekly,
|
||||
RepeatOn: []RepeatOn{RepeatOnMonday},
|
||||
},
|
||||
},
|
||||
},
|
||||
ts: time.Date(2024, 4, 14, 12, 10, 0, 0, time.UTC), // 14th 04 is sunday
|
||||
ts: time.Date(2024, 04, 14, 12, 10, 0, 0, time.UTC), // 14th 04 is sunday
|
||||
skip: false,
|
||||
},
|
||||
{
|
||||
@@ -540,14 +540,14 @@ func TestShouldSkipMaintenance(t *testing.T) {
|
||||
Schedule: &Schedule{
|
||||
Timezone: "UTC",
|
||||
Recurrence: &Recurrence{
|
||||
StartTime: time.Date(2024, 4, 1, 12, 0, 0, 0, time.UTC),
|
||||
StartTime: time.Date(2024, 04, 01, 12, 0, 0, 0, time.UTC),
|
||||
Duration: valuer.MustParseTextDuration("2h"),
|
||||
RepeatType: RepeatTypeWeekly,
|
||||
RepeatOn: []RepeatOn{RepeatOnMonday},
|
||||
},
|
||||
},
|
||||
},
|
||||
ts: time.Date(2024, 4, 16, 12, 10, 0, 0, time.UTC), // 16th 04 is tuesday
|
||||
ts: time.Date(2024, 04, 16, 12, 10, 0, 0, time.UTC), // 16th 04 is tuesday
|
||||
skip: false,
|
||||
},
|
||||
{
|
||||
@@ -556,14 +556,14 @@ func TestShouldSkipMaintenance(t *testing.T) {
|
||||
Schedule: &Schedule{
|
||||
Timezone: "UTC",
|
||||
Recurrence: &Recurrence{
|
||||
StartTime: time.Date(2024, 4, 1, 12, 0, 0, 0, time.UTC),
|
||||
StartTime: time.Date(2024, 04, 01, 12, 0, 0, 0, time.UTC),
|
||||
Duration: valuer.MustParseTextDuration("2h"),
|
||||
RepeatType: RepeatTypeWeekly,
|
||||
RepeatOn: []RepeatOn{RepeatOnMonday},
|
||||
},
|
||||
},
|
||||
},
|
||||
ts: time.Date(2024, 5, 6, 12, 10, 0, 0, time.UTC),
|
||||
ts: time.Date(2024, 05, 06, 12, 10, 0, 0, time.UTC),
|
||||
skip: true,
|
||||
},
|
||||
{
|
||||
@@ -572,14 +572,14 @@ func TestShouldSkipMaintenance(t *testing.T) {
|
||||
Schedule: &Schedule{
|
||||
Timezone: "UTC",
|
||||
Recurrence: &Recurrence{
|
||||
StartTime: time.Date(2024, 4, 1, 12, 0, 0, 0, time.UTC),
|
||||
StartTime: time.Date(2024, 04, 01, 12, 0, 0, 0, time.UTC),
|
||||
Duration: valuer.MustParseTextDuration("2h"),
|
||||
RepeatType: RepeatTypeWeekly,
|
||||
RepeatOn: []RepeatOn{RepeatOnMonday},
|
||||
},
|
||||
},
|
||||
},
|
||||
ts: time.Date(2024, 5, 6, 14, 0, 0, 0, time.UTC),
|
||||
ts: time.Date(2024, 05, 06, 14, 00, 0, 0, time.UTC),
|
||||
skip: true,
|
||||
},
|
||||
{
|
||||
@@ -588,13 +588,13 @@ func TestShouldSkipMaintenance(t *testing.T) {
|
||||
Schedule: &Schedule{
|
||||
Timezone: "UTC",
|
||||
Recurrence: &Recurrence{
|
||||
StartTime: time.Date(2024, 4, 4, 12, 0, 0, 0, time.UTC),
|
||||
StartTime: time.Date(2024, 04, 04, 12, 0, 0, 0, time.UTC),
|
||||
Duration: valuer.MustParseTextDuration("2h"),
|
||||
RepeatType: RepeatTypeMonthly,
|
||||
},
|
||||
},
|
||||
},
|
||||
ts: time.Date(2024, 4, 4, 12, 10, 0, 0, time.UTC),
|
||||
ts: time.Date(2024, 04, 04, 12, 10, 0, 0, time.UTC),
|
||||
skip: true,
|
||||
},
|
||||
{
|
||||
@@ -603,13 +603,13 @@ func TestShouldSkipMaintenance(t *testing.T) {
|
||||
Schedule: &Schedule{
|
||||
Timezone: "UTC",
|
||||
Recurrence: &Recurrence{
|
||||
StartTime: time.Date(2024, 4, 4, 12, 0, 0, 0, time.UTC),
|
||||
StartTime: time.Date(2024, 04, 04, 12, 0, 0, 0, time.UTC),
|
||||
Duration: valuer.MustParseTextDuration("2h"),
|
||||
RepeatType: RepeatTypeMonthly,
|
||||
},
|
||||
},
|
||||
},
|
||||
ts: time.Date(2024, 4, 4, 14, 10, 0, 0, time.UTC),
|
||||
ts: time.Date(2024, 04, 04, 14, 10, 0, 0, time.UTC),
|
||||
skip: false,
|
||||
},
|
||||
{
|
||||
@@ -618,53 +618,13 @@ func TestShouldSkipMaintenance(t *testing.T) {
|
||||
Schedule: &Schedule{
|
||||
Timezone: "UTC",
|
||||
Recurrence: &Recurrence{
|
||||
StartTime: time.Date(2024, 4, 4, 12, 0, 0, 0, time.UTC),
|
||||
StartTime: time.Date(2024, 04, 04, 12, 0, 0, 0, time.UTC),
|
||||
Duration: valuer.MustParseTextDuration("2h"),
|
||||
RepeatType: RepeatTypeMonthly,
|
||||
},
|
||||
},
|
||||
},
|
||||
ts: time.Date(2024, 5, 4, 12, 10, 0, 0, time.UTC),
|
||||
skip: true,
|
||||
},
|
||||
// #7548: recurring daily with Schedule.StartTime/EndTime also set.
|
||||
// The recurrence should govern — not the fixed range.
|
||||
{
|
||||
name: "recurring-daily-with-fixed-times-outside-daily-window",
|
||||
maintenance: &PlannedMaintenance{
|
||||
Schedule: &Schedule{
|
||||
Timezone: "UTC",
|
||||
// These fixed fields should be ignored when Recurrence is set.
|
||||
StartTime: time.Date(2024, 6, 1, 10, 0, 0, 0, time.UTC),
|
||||
EndTime: time.Date(2024, 6, 30, 18, 0, 0, 0, time.UTC),
|
||||
Recurrence: &Recurrence{
|
||||
StartTime: time.Date(2024, 6, 1, 14, 0, 0, 0, time.UTC), // daily at 14:00
|
||||
Duration: valuer.MustParseTextDuration("2h"), // until 16:00
|
||||
RepeatType: RepeatTypeDaily,
|
||||
},
|
||||
},
|
||||
},
|
||||
// 11:00 is inside the fixed range but outside the daily 14:00-16:00 window.
|
||||
// Before the fix this returned true (bug); after fix it returns false.
|
||||
ts: time.Date(2024, 6, 15, 11, 0, 0, 0, time.UTC),
|
||||
skip: false,
|
||||
},
|
||||
{
|
||||
name: "recurring-daily-with-fixed-times-inside-daily-window",
|
||||
maintenance: &PlannedMaintenance{
|
||||
Schedule: &Schedule{
|
||||
Timezone: "UTC",
|
||||
StartTime: time.Date(2024, 6, 1, 10, 0, 0, 0, time.UTC),
|
||||
EndTime: time.Date(2024, 6, 30, 18, 0, 0, 0, time.UTC),
|
||||
Recurrence: &Recurrence{
|
||||
StartTime: time.Date(2024, 6, 1, 14, 0, 0, 0, time.UTC),
|
||||
Duration: valuer.MustParseTextDuration("2h"),
|
||||
RepeatType: RepeatTypeDaily,
|
||||
},
|
||||
},
|
||||
},
|
||||
// 15:00 is inside the daily 14:00-16:00 window — should skip.
|
||||
ts: time.Date(2024, 6, 15, 15, 0, 0, 0, time.UTC),
|
||||
ts: time.Date(2024, 05, 04, 12, 10, 0, 0, time.UTC),
|
||||
skip: true,
|
||||
},
|
||||
}
|
||||
@@ -676,27 +636,3 @@ func TestShouldSkipMaintenance(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// #7548: Validate rejects payloads that set both fixed range and recurrence.
|
||||
func TestValidate_FixedAndRecurrenceMutuallyExclusive(t *testing.T) {
|
||||
m := PlannedMaintenance{
|
||||
Name: "test",
|
||||
Schedule: &Schedule{
|
||||
Timezone: "UTC",
|
||||
StartTime: time.Now().Add(-time.Hour),
|
||||
EndTime: time.Now().Add(time.Hour),
|
||||
Recurrence: &Recurrence{
|
||||
StartTime: time.Now().Add(-time.Hour),
|
||||
Duration: valuer.MustParseTextDuration("2h"),
|
||||
RepeatType: RepeatTypeDaily,
|
||||
},
|
||||
},
|
||||
}
|
||||
err := m.Validate()
|
||||
if err == nil {
|
||||
t.Fatal("expected validation error when both fixed range and recurrence are set")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "cannot set both") {
|
||||
t.Errorf("unexpected error message: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user