mirror of
https://github.com/SigNoz/signoz.git
synced 2026-02-03 08:33:26 +00:00
chore: add ability to delay the evaluation start for new groups (#9621)
This commit is contained in:
committed by
GitHub
parent
b38ffce7f2
commit
0a81bf8060
@@ -12,6 +12,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/queryparser"
|
||||
"github.com/SigNoz/signoz/pkg/ruler/rulestore/sqlrulestore"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux"
|
||||
"go.opentelemetry.io/otel/propagation"
|
||||
|
||||
@@ -104,6 +105,7 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
|
||||
signoz.Alertmanager,
|
||||
signoz.SQLStore,
|
||||
signoz.TelemetryStore,
|
||||
signoz.TelemetryMetadataStore,
|
||||
signoz.Prometheus,
|
||||
signoz.Modules.OrgGetter,
|
||||
signoz.Querier,
|
||||
@@ -355,12 +357,13 @@ func (s *Server) Stop(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func makeRulesManager(ch baseint.Reader, cache cache.Cache, alertmanager alertmanager.Alertmanager, sqlstore sqlstore.SQLStore, telemetryStore telemetrystore.TelemetryStore, prometheus prometheus.Prometheus, orgGetter organization.Getter, querier querier.Querier, providerSettings factory.ProviderSettings, queryParser queryparser.QueryParser) (*baserules.Manager, error) {
|
||||
func makeRulesManager(ch baseint.Reader, cache cache.Cache, alertmanager alertmanager.Alertmanager, sqlstore sqlstore.SQLStore, telemetryStore telemetrystore.TelemetryStore, metadataStore telemetrytypes.MetadataStore, prometheus prometheus.Prometheus, orgGetter organization.Getter, querier querier.Querier, providerSettings factory.ProviderSettings, queryParser queryparser.QueryParser) (*baserules.Manager, error) {
|
||||
ruleStore := sqlrulestore.NewRuleStore(sqlstore, queryParser, providerSettings)
|
||||
maintenanceStore := sqlrulestore.NewMaintenanceStore(sqlstore)
|
||||
// create manager opts
|
||||
managerOpts := &baserules.ManagerOptions{
|
||||
TelemetryStore: telemetryStore,
|
||||
MetadataStore: metadataStore,
|
||||
Prometheus: prometheus,
|
||||
Context: context.Background(),
|
||||
Logger: zap.L(),
|
||||
@@ -376,6 +379,7 @@ func makeRulesManager(ch baseint.Reader, cache cache.Cache, alertmanager alertma
|
||||
RuleStore: ruleStore,
|
||||
MaintenanceStore: maintenanceStore,
|
||||
SqlStore: sqlstore,
|
||||
QueryParser: queryParser,
|
||||
}
|
||||
|
||||
// create Manager
|
||||
|
||||
@@ -292,7 +292,19 @@ func (r *AnomalyRule) buildAndRunQueryV5(ctx context.Context, orgID valuer.UUID,
|
||||
scoresJSON, _ := json.Marshal(queryResult.AnomalyScores)
|
||||
r.logger.InfoContext(ctx, "anomaly scores", "scores", string(scoresJSON))
|
||||
|
||||
for _, series := range queryResult.AnomalyScores {
|
||||
// Filter out new series if newGroupEvalDelay is configured
|
||||
seriesToProcess := queryResult.AnomalyScores
|
||||
if r.ShouldSkipNewGroups() {
|
||||
filteredSeries, filterErr := r.BaseRule.FilterNewSeries(ctx, ts, seriesToProcess)
|
||||
// In case of error we log the error and continue with the original series
|
||||
if filterErr != nil {
|
||||
r.logger.ErrorContext(ctx, "Error filtering new series, ", "error", filterErr, "rule_name", r.Name())
|
||||
} else {
|
||||
seriesToProcess = filteredSeries
|
||||
}
|
||||
}
|
||||
|
||||
for _, series := range seriesToProcess {
|
||||
if r.Condition().RequireMinPoints {
|
||||
if len(series.Points) < r.Condition().RequiredNumPoints {
|
||||
r.logger.InfoContext(ctx, "not enough data points to evaluate series, skipping", "ruleid", r.ID(), "numPoints", len(series.Points), "requiredPoints", r.Condition().RequiredNumPoints)
|
||||
|
||||
@@ -37,6 +37,8 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
|
||||
opts.SLogger,
|
||||
baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay),
|
||||
baserules.WithSQLStore(opts.SQLStore),
|
||||
baserules.WithQueryParser(opts.ManagerOpts.QueryParser),
|
||||
baserules.WithMetadataStore(opts.ManagerOpts.MetadataStore),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
@@ -59,6 +61,8 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
|
||||
opts.Reader,
|
||||
opts.ManagerOpts.Prometheus,
|
||||
baserules.WithSQLStore(opts.SQLStore),
|
||||
baserules.WithQueryParser(opts.ManagerOpts.QueryParser),
|
||||
baserules.WithMetadataStore(opts.ManagerOpts.MetadataStore),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
@@ -82,6 +86,8 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
|
||||
opts.Cache,
|
||||
baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay),
|
||||
baserules.WithSQLStore(opts.SQLStore),
|
||||
baserules.WithQueryParser(opts.ManagerOpts.QueryParser),
|
||||
baserules.WithMetadataStore(opts.ManagerOpts.MetadataStore),
|
||||
)
|
||||
if err != nil {
|
||||
return task, err
|
||||
@@ -140,6 +146,8 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
|
||||
baserules.WithSendAlways(),
|
||||
baserules.WithSendUnmatched(),
|
||||
baserules.WithSQLStore(opts.SQLStore),
|
||||
baserules.WithQueryParser(opts.ManagerOpts.QueryParser),
|
||||
baserules.WithMetadataStore(opts.ManagerOpts.MetadataStore),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
@@ -160,6 +168,8 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
|
||||
baserules.WithSendAlways(),
|
||||
baserules.WithSendUnmatched(),
|
||||
baserules.WithSQLStore(opts.SQLStore),
|
||||
baserules.WithQueryParser(opts.ManagerOpts.QueryParser),
|
||||
baserules.WithMetadataStore(opts.ManagerOpts.MetadataStore),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
@@ -179,6 +189,8 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
|
||||
baserules.WithSendAlways(),
|
||||
baserules.WithSendUnmatched(),
|
||||
baserules.WithSQLStore(opts.SQLStore),
|
||||
baserules.WithQueryParser(opts.ManagerOpts.QueryParser),
|
||||
baserules.WithMetadataStore(opts.ManagerOpts.MetadataStore),
|
||||
)
|
||||
if err != nil {
|
||||
zap.L().Error("failed to prepare a new anomaly rule for test", zap.String("name", alertname), zap.Error(err))
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/queryparser"
|
||||
"github.com/SigNoz/signoz/pkg/ruler/rulestore/sqlrulestore"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
|
||||
"github.com/gorilla/handlers"
|
||||
|
||||
@@ -104,6 +105,7 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
|
||||
signoz.Alertmanager,
|
||||
signoz.SQLStore,
|
||||
signoz.TelemetryStore,
|
||||
signoz.TelemetryMetadataStore,
|
||||
signoz.Prometheus,
|
||||
signoz.Modules.OrgGetter,
|
||||
signoz.Querier,
|
||||
@@ -337,6 +339,7 @@ func makeRulesManager(
|
||||
alertmanager alertmanager.Alertmanager,
|
||||
sqlstore sqlstore.SQLStore,
|
||||
telemetryStore telemetrystore.TelemetryStore,
|
||||
metadataStore telemetrytypes.MetadataStore,
|
||||
prometheus prometheus.Prometheus,
|
||||
orgGetter organization.Getter,
|
||||
querier querier.Querier,
|
||||
@@ -348,6 +351,7 @@ func makeRulesManager(
|
||||
// create manager opts
|
||||
managerOpts := &rules.ManagerOptions{
|
||||
TelemetryStore: telemetryStore,
|
||||
MetadataStore: metadataStore,
|
||||
Prometheus: prometheus,
|
||||
Context: context.Background(),
|
||||
Logger: zap.L(),
|
||||
@@ -361,6 +365,7 @@ func makeRulesManager(
|
||||
RuleStore: ruleStore,
|
||||
MaintenanceStore: maintenanceStore,
|
||||
SqlStore: sqlstore,
|
||||
QueryParser: queryParser,
|
||||
}
|
||||
|
||||
// create Manager
|
||||
|
||||
@@ -13,8 +13,11 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/query-service/model"
|
||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||
qslabels "github.com/SigNoz/signoz/pkg/query-service/utils/labels"
|
||||
"github.com/SigNoz/signoz/pkg/queryparser"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
@@ -87,7 +90,14 @@ type BaseRule struct {
|
||||
|
||||
sqlstore sqlstore.SQLStore
|
||||
|
||||
metadataStore telemetrytypes.MetadataStore
|
||||
|
||||
evaluation ruletypes.Evaluation
|
||||
|
||||
// newGroupEvalDelay is the grace period for new alert groups
|
||||
newGroupEvalDelay *time.Duration
|
||||
|
||||
queryParser queryparser.QueryParser
|
||||
}
|
||||
|
||||
type RuleOption func(*BaseRule)
|
||||
@@ -122,6 +132,18 @@ func WithSQLStore(sqlstore sqlstore.SQLStore) RuleOption {
|
||||
}
|
||||
}
|
||||
|
||||
func WithQueryParser(queryParser queryparser.QueryParser) RuleOption {
|
||||
return func(r *BaseRule) {
|
||||
r.queryParser = queryParser
|
||||
}
|
||||
}
|
||||
|
||||
func WithMetadataStore(metadataStore telemetrytypes.MetadataStore) RuleOption {
|
||||
return func(r *BaseRule) {
|
||||
r.metadataStore = metadataStore
|
||||
}
|
||||
}
|
||||
|
||||
func NewBaseRule(id string, orgID valuer.UUID, p *ruletypes.PostableRule, reader interfaces.Reader, opts ...RuleOption) (*BaseRule, error) {
|
||||
if p.RuleCondition == nil || !p.RuleCondition.IsValid() {
|
||||
return nil, fmt.Errorf("invalid rule condition")
|
||||
@@ -154,6 +176,12 @@ func NewBaseRule(id string, orgID valuer.UUID, p *ruletypes.PostableRule, reader
|
||||
evaluation: evaluation,
|
||||
}
|
||||
|
||||
// Store newGroupEvalDelay and groupBy keys from NotificationSettings
|
||||
if p.NotificationSettings != nil && p.NotificationSettings.NewGroupEvalDelay != nil {
|
||||
newGroupEvalDelay := time.Duration(*p.NotificationSettings.NewGroupEvalDelay)
|
||||
baseRule.newGroupEvalDelay = &newGroupEvalDelay
|
||||
}
|
||||
|
||||
if baseRule.evalWindow == 0 {
|
||||
baseRule.evalWindow = 5 * time.Minute
|
||||
}
|
||||
@@ -535,3 +563,197 @@ func (r *BaseRule) PopulateTemporality(ctx context.Context, orgID valuer.UUID, q
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ShouldSkipNewGroups returns true if new group filtering should be applied
|
||||
func (r *BaseRule) ShouldSkipNewGroups() bool {
|
||||
return r.newGroupEvalDelay != nil && *r.newGroupEvalDelay > 0
|
||||
}
|
||||
|
||||
// isFilterNewSeriesSupported checks if the query is supported for new series filtering
|
||||
func (r *BaseRule) isFilterNewSeriesSupported() bool {
|
||||
if r.ruleCondition.CompositeQuery.QueryType == v3.QueryTypeBuilder {
|
||||
for _, query := range r.ruleCondition.CompositeQuery.Queries {
|
||||
if query.Type != qbtypes.QueryTypeBuilder {
|
||||
continue
|
||||
}
|
||||
switch query.Spec.(type) {
|
||||
// query spec is for Logs or Traces, return with blank metric names and group by fields
|
||||
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation], qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// extractMetricAndGroupBys extracts metric names and groupBy keys from the rule's query.
|
||||
// Returns a map where key is the metric name and value is the list of groupBy keys associated with it.
|
||||
// TODO: implement caching for query parsing results to avoid re-parsing the query + cache invalidation
|
||||
func (r *BaseRule) extractMetricAndGroupBys(ctx context.Context) (map[string][]string, error) {
|
||||
metricToGroupedFields := make(map[string][]string)
|
||||
|
||||
// check to avoid processing the query for Logs and Traces
|
||||
// as excluding new series is not supported for Logs and Traces for now
|
||||
if !r.isFilterNewSeriesSupported() {
|
||||
return metricToGroupedFields, nil
|
||||
}
|
||||
|
||||
results, err := r.queryParser.AnalyzeQueryEnvelopes(ctx, r.ruleCondition.CompositeQuery.Queries)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// temp map to avoid duplicates group by fields for the same metric
|
||||
// map[metricName]map[groupKey]struct{}
|
||||
tempMap := make(map[string]map[string]struct{})
|
||||
|
||||
// Aggregate results from all queries
|
||||
for _, result := range results {
|
||||
if len(result.MetricNames) == 0 {
|
||||
continue
|
||||
}
|
||||
// Collect unique groupBy columns for this query result
|
||||
uniqueGroups := make(map[string]struct{})
|
||||
for _, col := range result.GroupByColumns {
|
||||
uniqueGroups[col.GroupName()] = struct{}{}
|
||||
}
|
||||
// walk through the metric names and group by fields for this query result and add them to the temp map
|
||||
for _, metricName := range result.MetricNames {
|
||||
if _, ok := tempMap[metricName]; !ok {
|
||||
tempMap[metricName] = make(map[string]struct{})
|
||||
}
|
||||
for groupKey := range uniqueGroups {
|
||||
tempMap[metricName][groupKey] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Convert to final map
|
||||
for metricName, groups := range tempMap {
|
||||
for groupKey := range groups {
|
||||
metricToGroupedFields[metricName] = append(metricToGroupedFields[metricName], groupKey)
|
||||
}
|
||||
}
|
||||
|
||||
return metricToGroupedFields, nil
|
||||
}
|
||||
|
||||
// FilterNewSeries filters out items that are too new based on metadata first_seen timestamps.
|
||||
// Returns the filtered series (old ones) excluding new series that are still within the grace period.
|
||||
func (r *BaseRule) FilterNewSeries(ctx context.Context, ts time.Time, series []*v3.Series) ([]*v3.Series, error) {
|
||||
// Extract metric names and groupBy keys
|
||||
metricToGroupedFields, err := r.extractMetricAndGroupBys(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(metricToGroupedFields) == 0 {
|
||||
// No metrics or groupBy keys, nothing to filter (non-ideal case, return all series)
|
||||
return series, nil
|
||||
}
|
||||
|
||||
// Build lookup keys from series which will be used to query metadata from CH
|
||||
lookupKeys := make([]telemetrytypes.MetricMetadataLookupKey, 0)
|
||||
seriesIdxToLookupKeys := make(map[int][]telemetrytypes.MetricMetadataLookupKey) // series index -> lookup keys
|
||||
|
||||
for i := 0; i < len(series); i++ {
|
||||
metricLabelMap := series[i].Labels
|
||||
|
||||
// Collect groupBy attribute-value pairs for this series
|
||||
seriesKeys := make([]telemetrytypes.MetricMetadataLookupKey, 0)
|
||||
|
||||
for metricName, groupedFields := range metricToGroupedFields {
|
||||
for _, groupByKey := range groupedFields {
|
||||
if attrValue, ok := metricLabelMap[groupByKey]; ok {
|
||||
lookupKey := telemetrytypes.MetricMetadataLookupKey{
|
||||
MetricName: metricName,
|
||||
AttributeName: groupByKey,
|
||||
AttributeValue: attrValue,
|
||||
}
|
||||
lookupKeys = append(lookupKeys, lookupKey)
|
||||
seriesKeys = append(seriesKeys, lookupKey)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(seriesKeys) > 0 {
|
||||
seriesIdxToLookupKeys[i] = seriesKeys
|
||||
}
|
||||
}
|
||||
|
||||
if len(lookupKeys) == 0 {
|
||||
// No lookup keys to query, return all series
|
||||
// this can happen when the series has no labels at all
|
||||
// in that case, we include all series as we don't know if it is new or old series
|
||||
return series, nil
|
||||
}
|
||||
|
||||
// unique lookup keys
|
||||
uniqueLookupKeysMap := make(map[telemetrytypes.MetricMetadataLookupKey]struct{})
|
||||
uniqueLookupKeys := make([]telemetrytypes.MetricMetadataLookupKey, 0)
|
||||
for _, key := range lookupKeys {
|
||||
if _, ok := uniqueLookupKeysMap[key]; !ok {
|
||||
uniqueLookupKeysMap[key] = struct{}{}
|
||||
uniqueLookupKeys = append(uniqueLookupKeys, key)
|
||||
}
|
||||
}
|
||||
// Query metadata for first_seen timestamps
|
||||
firstSeenMap, err := r.metadataStore.GetFirstSeenFromMetricMetadata(ctx, uniqueLookupKeys)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Filter series based on first_seen + delay
|
||||
filteredSeries := make([]*v3.Series, 0, len(series))
|
||||
evalTimeMs := ts.UnixMilli()
|
||||
newGroupEvalDelayMs := r.newGroupEvalDelay.Milliseconds()
|
||||
|
||||
for i := 0; i < len(series); i++ {
|
||||
seriesKeys, ok := seriesIdxToLookupKeys[i]
|
||||
if !ok {
|
||||
// No matching labels used in groupBy from this series, include it
|
||||
// as we can't decide if it is new or old series
|
||||
filteredSeries = append(filteredSeries, series[i])
|
||||
continue
|
||||
}
|
||||
|
||||
// Find the maximum first_seen across all groupBy attributes for this series
|
||||
// if the latest is old enough we're good, if latest is new we need to skip it
|
||||
maxFirstSeen := int64(0)
|
||||
// metadataFound tracks if we have metadata for any of the lookup keys
|
||||
metadataFound := false
|
||||
|
||||
for _, lookupKey := range seriesKeys {
|
||||
if firstSeen, exists := firstSeenMap[lookupKey]; exists {
|
||||
metadataFound = true
|
||||
if firstSeen > maxFirstSeen {
|
||||
maxFirstSeen = firstSeen
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// if we don't have metadata for any of the lookup keys, we can't decide if it is new or old series
|
||||
// in that case, we include it
|
||||
if !metadataFound {
|
||||
filteredSeries = append(filteredSeries, series[i])
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if first_seen + delay has passed
|
||||
if maxFirstSeen+newGroupEvalDelayMs > evalTimeMs {
|
||||
// Still within grace period, skip this series
|
||||
r.logger.InfoContext(ctx, "Skipping new series", "rule_name", r.Name(), "series_idx", i, "max_first_seen", maxFirstSeen, "eval_time_ms", evalTimeMs, "delay_ms", newGroupEvalDelayMs, "labels", series[i].Labels)
|
||||
continue
|
||||
}
|
||||
|
||||
// Old enough, include this series
|
||||
filteredSeries = append(filteredSeries, series[i])
|
||||
}
|
||||
|
||||
skippedCount := len(series) - len(filteredSeries)
|
||||
if skippedCount > 0 {
|
||||
r.logger.InfoContext(ctx, "Filtered new series", "rule_name", r.Name(), "skipped_count", skippedCount, "total_count", len(series), "delay_ms", newGroupEvalDelayMs)
|
||||
}
|
||||
|
||||
return filteredSeries, nil
|
||||
}
|
||||
|
||||
@@ -1,12 +1,29 @@
|
||||
package rules
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/cache"
|
||||
"github.com/SigNoz/signoz/pkg/cache/cachetest"
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
"github.com/SigNoz/signoz/pkg/prometheus"
|
||||
"github.com/SigNoz/signoz/pkg/prometheus/prometheustest"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/app/clickhouseReader"
|
||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
|
||||
"github.com/SigNoz/signoz/pkg/queryparser"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
|
||||
"github.com/SigNoz/signoz/pkg/types/metrictypes"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
func TestBaseRule_RequireMinPoints(t *testing.T) {
|
||||
@@ -81,3 +98,742 @@ func TestBaseRule_RequireMinPoints(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// createTestSeries creates a *v3.Series with the given labels and optional points
|
||||
// so we don't exactly need the points in the series because the labels are used to determine if the series is new or old
|
||||
// we use the labels to create a lookup key for the series and then check the first_seen timestamp for the series in the metadata table
|
||||
func createTestSeries(labels map[string]string, points []v3.Point) *v3.Series {
|
||||
if points == nil {
|
||||
points = []v3.Point{}
|
||||
}
|
||||
return &v3.Series{
|
||||
Labels: labels,
|
||||
Points: points,
|
||||
}
|
||||
}
|
||||
|
||||
// seriesEqual compares two v3.Series by their labels
|
||||
// Returns true if the series have the same labels (order doesn't matter)
|
||||
func seriesEqual(s1, s2 *v3.Series) bool {
|
||||
if s1 == nil && s2 == nil {
|
||||
return true
|
||||
}
|
||||
if s1 == nil || s2 == nil {
|
||||
return false
|
||||
}
|
||||
if len(s1.Labels) != len(s2.Labels) {
|
||||
return false
|
||||
}
|
||||
for k, v := range s1.Labels {
|
||||
if s2.Labels[k] != v {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// calculateFirstSeen calculates first_seen timestamp based on evalTime, delay, and isOld flag
|
||||
func calculateFirstSeen(evalTime time.Time, delay time.Duration, isOld bool) int64 {
|
||||
if isOld {
|
||||
// Old: evalTime - (2 * delay)
|
||||
return evalTime.Add(-2 * delay).UnixMilli()
|
||||
}
|
||||
// New: evalTime - (delay / 2)
|
||||
return evalTime.Add(-delay / 2).UnixMilli()
|
||||
}
|
||||
|
||||
// createFirstSeenMap creates a first_seen map for a series with given attributes
|
||||
// metricName: the metric name
|
||||
// groupByFields: list of groupBy field names
|
||||
// evalTime: evaluation time
|
||||
// delay: newGroupEvalDelay
|
||||
// isOld: whether the series is old (true) or new (false)
|
||||
// attributeValues: values for each groupBy field in order
|
||||
func createFirstSeenMap(metricName string, groupByFields []string, evalTime time.Time, delay time.Duration, isOld bool, attributeValues ...string) map[telemetrytypes.MetricMetadataLookupKey]int64 {
|
||||
result := make(map[telemetrytypes.MetricMetadataLookupKey]int64)
|
||||
firstSeen := calculateFirstSeen(evalTime, delay, isOld)
|
||||
|
||||
for i, field := range groupByFields {
|
||||
if i < len(attributeValues) {
|
||||
key := telemetrytypes.MetricMetadataLookupKey{
|
||||
MetricName: metricName,
|
||||
AttributeName: field,
|
||||
AttributeValue: attributeValues[i],
|
||||
}
|
||||
result[key] = firstSeen
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// mergeFirstSeenMaps merges multiple first_seen maps into one
|
||||
// When the same key exists in multiple maps, it keeps the lowest value
|
||||
// which simulatest the behavior of the ClickHouse query
|
||||
// finding the minimum first_seen timestamp across all groupBy attributes for a single series
|
||||
func mergeFirstSeenMaps(maps ...map[telemetrytypes.MetricMetadataLookupKey]int64) map[telemetrytypes.MetricMetadataLookupKey]int64 {
|
||||
result := make(map[telemetrytypes.MetricMetadataLookupKey]int64)
|
||||
for _, m := range maps {
|
||||
for k, v := range m {
|
||||
if existingValue, exists := result[k]; exists {
|
||||
// Keep the lowest value
|
||||
if v < existingValue {
|
||||
result[k] = v
|
||||
}
|
||||
} else {
|
||||
result[k] = v
|
||||
}
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// createPostableRule creates a PostableRule with the given CompositeQuery
|
||||
func createPostableRule(compositeQuery *v3.CompositeQuery) ruletypes.PostableRule {
|
||||
return ruletypes.PostableRule{
|
||||
AlertName: "Test Rule",
|
||||
AlertType: ruletypes.AlertTypeMetric,
|
||||
RuleType: ruletypes.RuleTypeThreshold,
|
||||
Evaluation: &ruletypes.EvaluationEnvelope{
|
||||
Kind: ruletypes.RollingEvaluation,
|
||||
Spec: ruletypes.RollingWindow{
|
||||
EvalWindow: ruletypes.Duration(5 * time.Minute),
|
||||
Frequency: ruletypes.Duration(1 * time.Minute),
|
||||
},
|
||||
},
|
||||
RuleCondition: &ruletypes.RuleCondition{
|
||||
CompositeQuery: compositeQuery,
|
||||
Thresholds: &ruletypes.RuleThresholdData{
|
||||
Kind: ruletypes.BasicThresholdKind,
|
||||
Spec: ruletypes.BasicRuleThresholds{
|
||||
{
|
||||
Name: "test-threshold",
|
||||
TargetValue: func() *float64 { v := 1.0; return &v }(),
|
||||
CompareOp: ruletypes.ValueIsAbove,
|
||||
MatchType: ruletypes.AtleastOnce,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// filterNewSeriesTestCase represents a test case for FilterNewSeries
|
||||
type filterNewSeriesTestCase struct {
|
||||
name string
|
||||
compositeQuery *v3.CompositeQuery
|
||||
series []*v3.Series
|
||||
firstSeenMap map[telemetrytypes.MetricMetadataLookupKey]int64
|
||||
newGroupEvalDelay *time.Duration
|
||||
evalTime time.Time
|
||||
expectedFiltered []*v3.Series // series that should be in the final filtered result (old enough)
|
||||
expectError bool
|
||||
}
|
||||
|
||||
func TestBaseRule_FilterNewSeries(t *testing.T) {
|
||||
defaultEvalTime := time.Unix(1700000000, 0)
|
||||
defaultDelay := 2 * time.Minute
|
||||
defaultGroupByFields := []string{"service_name", "env"}
|
||||
|
||||
logger := instrumentationtest.New().Logger()
|
||||
settings := instrumentationtest.New().ToProviderSettings()
|
||||
|
||||
tests := []filterNewSeriesTestCase{
|
||||
{
|
||||
name: "mixed old and new series - Builder query",
|
||||
compositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypeBuilder,
|
||||
Queries: []qbtypes.QueryEnvelope{
|
||||
{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
||||
Name: "A",
|
||||
StepInterval: qbtypes.Step{Duration: 60 * time.Second},
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
Aggregations: []qbtypes.MetricAggregation{
|
||||
{
|
||||
MetricName: "request_total",
|
||||
TimeAggregation: metrictypes.TimeAggregationCount,
|
||||
SpaceAggregation: metrictypes.SpaceAggregationSum,
|
||||
},
|
||||
},
|
||||
GroupBy: []qbtypes.GroupByKey{
|
||||
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service_name"}},
|
||||
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "env"}},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
series: []*v3.Series{
|
||||
createTestSeries(map[string]string{"service_name": "svc-old", "env": "prod"}, nil),
|
||||
createTestSeries(map[string]string{"service_name": "svc-new", "env": "prod"}, nil),
|
||||
createTestSeries(map[string]string{"service_name": "svc-missing", "env": "stage"}, nil),
|
||||
},
|
||||
firstSeenMap: mergeFirstSeenMaps(
|
||||
createFirstSeenMap("request_total", defaultGroupByFields, defaultEvalTime, defaultDelay, true, "svc-old", "prod"),
|
||||
createFirstSeenMap("request_total", defaultGroupByFields, defaultEvalTime, defaultDelay, false, "svc-new", "prod"),
|
||||
// svc-missing has no metadata, so it will be included
|
||||
),
|
||||
newGroupEvalDelay: &defaultDelay,
|
||||
evalTime: defaultEvalTime,
|
||||
expectedFiltered: []*v3.Series{
|
||||
createTestSeries(map[string]string{"service_name": "svc-old", "env": "prod"}, nil),
|
||||
createTestSeries(map[string]string{"service_name": "svc-missing", "env": "stage"}, nil),
|
||||
}, // svc-old and svc-missing should be included; svc-new is filtered out
|
||||
},
|
||||
{
|
||||
name: "all new series - PromQL query",
|
||||
compositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypePromQL,
|
||||
Queries: []qbtypes.QueryEnvelope{
|
||||
{
|
||||
Type: qbtypes.QueryTypePromQL,
|
||||
Spec: qbtypes.PromQuery{
|
||||
Name: "P1",
|
||||
Query: "sum by (service_name,env) (rate(request_total[5m]))",
|
||||
Disabled: false,
|
||||
Step: qbtypes.Step{Duration: 0},
|
||||
Stats: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
series: []*v3.Series{
|
||||
createTestSeries(map[string]string{"service_name": "svc-new1", "env": "prod"}, nil),
|
||||
createTestSeries(map[string]string{"service_name": "svc-new2", "env": "stage"}, nil),
|
||||
},
|
||||
firstSeenMap: mergeFirstSeenMaps(
|
||||
createFirstSeenMap("request_total", defaultGroupByFields, defaultEvalTime, defaultDelay, false, "svc-new1", "prod"),
|
||||
createFirstSeenMap("request_total", defaultGroupByFields, defaultEvalTime, defaultDelay, false, "svc-new2", "stage"),
|
||||
),
|
||||
newGroupEvalDelay: &defaultDelay,
|
||||
evalTime: defaultEvalTime,
|
||||
expectedFiltered: []*v3.Series{}, // all should be filtered out (new series)
|
||||
},
|
||||
{
|
||||
name: "all old series - ClickHouse query",
|
||||
compositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypeClickHouseSQL,
|
||||
Queries: []qbtypes.QueryEnvelope{
|
||||
{
|
||||
Type: qbtypes.QueryTypeClickHouseSQL,
|
||||
Spec: qbtypes.ClickHouseQuery{
|
||||
Name: "CH1",
|
||||
Query: "SELECT service_name, env FROM metrics WHERE metric_name='request_total' GROUP BY service_name, env",
|
||||
Disabled: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
series: []*v3.Series{
|
||||
createTestSeries(map[string]string{"service_name": "svc-old1", "env": "prod"}, nil),
|
||||
createTestSeries(map[string]string{"service_name": "svc-old2", "env": "stage"}, nil),
|
||||
},
|
||||
firstSeenMap: mergeFirstSeenMaps(
|
||||
createFirstSeenMap("request_total", defaultGroupByFields, defaultEvalTime, defaultDelay, true, "svc-old1", "prod"),
|
||||
createFirstSeenMap("request_total", defaultGroupByFields, defaultEvalTime, defaultDelay, true, "svc-old2", "stage"),
|
||||
),
|
||||
newGroupEvalDelay: &defaultDelay,
|
||||
evalTime: defaultEvalTime,
|
||||
expectedFiltered: []*v3.Series{
|
||||
createTestSeries(map[string]string{"service_name": "svc-old1", "env": "prod"}, nil),
|
||||
createTestSeries(map[string]string{"service_name": "svc-old2", "env": "stage"}, nil),
|
||||
}, // all should be included (old series)
|
||||
},
|
||||
{
|
||||
name: "no grouping in query - Builder",
|
||||
compositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypeBuilder,
|
||||
Queries: []qbtypes.QueryEnvelope{
|
||||
{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
||||
Name: "A",
|
||||
StepInterval: qbtypes.Step{Duration: 60 * time.Second},
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
Aggregations: []qbtypes.MetricAggregation{
|
||||
{
|
||||
MetricName: "request_total",
|
||||
TimeAggregation: metrictypes.TimeAggregationCount,
|
||||
SpaceAggregation: metrictypes.SpaceAggregationSum,
|
||||
},
|
||||
},
|
||||
GroupBy: []qbtypes.GroupByKey{},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
series: []*v3.Series{
|
||||
createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil),
|
||||
},
|
||||
firstSeenMap: make(map[telemetrytypes.MetricMetadataLookupKey]int64),
|
||||
newGroupEvalDelay: &defaultDelay,
|
||||
evalTime: defaultEvalTime,
|
||||
expectedFiltered: []*v3.Series{
|
||||
createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil),
|
||||
}, // early return, no filtering - all series included
|
||||
},
|
||||
{
|
||||
name: "no metric names - Builder",
|
||||
compositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypeBuilder,
|
||||
Queries: []qbtypes.QueryEnvelope{
|
||||
{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
||||
Name: "A",
|
||||
StepInterval: qbtypes.Step{Duration: 60 * time.Second},
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
Aggregations: []qbtypes.MetricAggregation{},
|
||||
GroupBy: []qbtypes.GroupByKey{
|
||||
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service_name"}},
|
||||
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "env"}},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
series: []*v3.Series{
|
||||
createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil),
|
||||
},
|
||||
firstSeenMap: make(map[telemetrytypes.MetricMetadataLookupKey]int64),
|
||||
newGroupEvalDelay: &defaultDelay,
|
||||
evalTime: defaultEvalTime,
|
||||
expectedFiltered: []*v3.Series{
|
||||
createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil),
|
||||
}, // early return, no filtering - all series included
|
||||
},
|
||||
{
|
||||
name: "series with no matching labels - Builder",
|
||||
compositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypeBuilder,
|
||||
Queries: []qbtypes.QueryEnvelope{
|
||||
{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
||||
Name: "A",
|
||||
StepInterval: qbtypes.Step{Duration: 60 * time.Second},
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
Aggregations: []qbtypes.MetricAggregation{
|
||||
{
|
||||
MetricName: "request_total",
|
||||
TimeAggregation: metrictypes.TimeAggregationCount,
|
||||
SpaceAggregation: metrictypes.SpaceAggregationSum,
|
||||
},
|
||||
},
|
||||
GroupBy: []qbtypes.GroupByKey{
|
||||
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service_name"}},
|
||||
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "env"}},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
series: []*v3.Series{
|
||||
createTestSeries(map[string]string{"status": "200"}, nil), // no service_name or env
|
||||
},
|
||||
firstSeenMap: make(map[telemetrytypes.MetricMetadataLookupKey]int64),
|
||||
newGroupEvalDelay: &defaultDelay,
|
||||
evalTime: defaultEvalTime,
|
||||
expectedFiltered: []*v3.Series{
|
||||
createTestSeries(map[string]string{"status": "200"}, nil),
|
||||
}, // series included as we can't decide if it's new or old
|
||||
},
|
||||
{
|
||||
name: "series with missing metadata - PromQL",
|
||||
compositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypePromQL,
|
||||
Queries: []qbtypes.QueryEnvelope{
|
||||
{
|
||||
Type: qbtypes.QueryTypePromQL,
|
||||
Spec: qbtypes.PromQuery{
|
||||
Name: "P1",
|
||||
Query: "sum by (service_name,env) (rate(request_total[5m]))",
|
||||
Disabled: false,
|
||||
Step: qbtypes.Step{Duration: 0},
|
||||
Stats: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
series: []*v3.Series{
|
||||
createTestSeries(map[string]string{"service_name": "svc-old", "env": "prod"}, nil),
|
||||
createTestSeries(map[string]string{"service_name": "svc-no-metadata", "env": "prod"}, nil),
|
||||
},
|
||||
firstSeenMap: createFirstSeenMap("request_total", defaultGroupByFields, defaultEvalTime, defaultDelay, true, "svc-old", "prod"),
|
||||
// svc-no-metadata has no entry in firstSeenMap
|
||||
newGroupEvalDelay: &defaultDelay,
|
||||
evalTime: defaultEvalTime,
|
||||
expectedFiltered: []*v3.Series{
|
||||
createTestSeries(map[string]string{"service_name": "svc-old", "env": "prod"}, nil),
|
||||
createTestSeries(map[string]string{"service_name": "svc-no-metadata", "env": "prod"}, nil),
|
||||
}, // both should be included - svc-old is old, svc-no-metadata can't be decided
|
||||
},
|
||||
{
|
||||
name: "series with partial metadata - ClickHouse",
|
||||
compositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypeClickHouseSQL,
|
||||
Queries: []qbtypes.QueryEnvelope{
|
||||
{
|
||||
Type: qbtypes.QueryTypeClickHouseSQL,
|
||||
Spec: qbtypes.ClickHouseQuery{
|
||||
Name: "CH1",
|
||||
Query: "SELECT service_name, env FROM metrics WHERE metric_name='request_total' GROUP BY service_name, env",
|
||||
Disabled: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
series: []*v3.Series{
|
||||
createTestSeries(map[string]string{"service_name": "svc-partial", "env": "prod"}, nil),
|
||||
},
|
||||
// Only provide metadata for service_name, not env
|
||||
firstSeenMap: map[telemetrytypes.MetricMetadataLookupKey]int64{
|
||||
{MetricName: "request_total", AttributeName: "service_name", AttributeValue: "svc-partial"}: calculateFirstSeen(defaultEvalTime, defaultDelay, true),
|
||||
// env metadata is missing
|
||||
},
|
||||
newGroupEvalDelay: &defaultDelay,
|
||||
evalTime: defaultEvalTime,
|
||||
expectedFiltered: []*v3.Series{
|
||||
createTestSeries(map[string]string{"service_name": "svc-partial", "env": "prod"}, nil),
|
||||
}, // has some metadata, uses max first_seen which is old
|
||||
},
|
||||
{
|
||||
name: "empty series array - Builder",
|
||||
compositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypeBuilder,
|
||||
Queries: []qbtypes.QueryEnvelope{
|
||||
{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
||||
Name: "A",
|
||||
StepInterval: qbtypes.Step{Duration: 60 * time.Second},
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
Aggregations: []qbtypes.MetricAggregation{
|
||||
{
|
||||
MetricName: "request_total",
|
||||
TimeAggregation: metrictypes.TimeAggregationCount,
|
||||
SpaceAggregation: metrictypes.SpaceAggregationSum,
|
||||
},
|
||||
},
|
||||
GroupBy: []qbtypes.GroupByKey{
|
||||
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service_name"}},
|
||||
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "env"}},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
series: []*v3.Series{},
|
||||
firstSeenMap: make(map[telemetrytypes.MetricMetadataLookupKey]int64),
|
||||
newGroupEvalDelay: &defaultDelay,
|
||||
evalTime: defaultEvalTime,
|
||||
expectedFiltered: []*v3.Series{},
|
||||
},
|
||||
{
|
||||
name: "zero delay - Builder",
|
||||
compositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypeBuilder,
|
||||
Queries: []qbtypes.QueryEnvelope{
|
||||
{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
||||
Name: "A",
|
||||
StepInterval: qbtypes.Step{Duration: 60 * time.Second},
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
Aggregations: []qbtypes.MetricAggregation{
|
||||
{
|
||||
MetricName: "request_total",
|
||||
TimeAggregation: metrictypes.TimeAggregationCount,
|
||||
SpaceAggregation: metrictypes.SpaceAggregationSum,
|
||||
},
|
||||
},
|
||||
GroupBy: []qbtypes.GroupByKey{
|
||||
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service_name"}},
|
||||
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "env"}},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
series: []*v3.Series{
|
||||
createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil),
|
||||
},
|
||||
firstSeenMap: createFirstSeenMap("request_total", defaultGroupByFields, defaultEvalTime, defaultDelay, true, "svc1", "prod"),
|
||||
newGroupEvalDelay: func() *time.Duration { d := time.Duration(0); return &d }(), // zero delay
|
||||
evalTime: defaultEvalTime,
|
||||
expectedFiltered: []*v3.Series{
|
||||
createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil),
|
||||
}, // with zero delay, all series pass
|
||||
},
|
||||
{
|
||||
name: "multiple metrics with same groupBy keys - Builder",
|
||||
compositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypeBuilder,
|
||||
Queries: []qbtypes.QueryEnvelope{
|
||||
{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
||||
Name: "A",
|
||||
StepInterval: qbtypes.Step{Duration: 60 * time.Second},
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
Aggregations: []qbtypes.MetricAggregation{
|
||||
{
|
||||
MetricName: "request_total",
|
||||
TimeAggregation: metrictypes.TimeAggregationCount,
|
||||
SpaceAggregation: metrictypes.SpaceAggregationSum,
|
||||
},
|
||||
{
|
||||
MetricName: "error_total",
|
||||
TimeAggregation: metrictypes.TimeAggregationCount,
|
||||
SpaceAggregation: metrictypes.SpaceAggregationSum,
|
||||
},
|
||||
},
|
||||
GroupBy: []qbtypes.GroupByKey{
|
||||
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service_name"}},
|
||||
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "env"}},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
series: []*v3.Series{
|
||||
createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil),
|
||||
},
|
||||
firstSeenMap: mergeFirstSeenMaps(
|
||||
createFirstSeenMap("request_total", defaultGroupByFields, defaultEvalTime, defaultDelay, true, "svc1", "prod"),
|
||||
createFirstSeenMap("error_total", defaultGroupByFields, defaultEvalTime, defaultDelay, true, "svc1", "prod"),
|
||||
),
|
||||
newGroupEvalDelay: &defaultDelay,
|
||||
evalTime: defaultEvalTime,
|
||||
expectedFiltered: []*v3.Series{
|
||||
createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "series with multiple groupBy attributes where one is new and one is old - Builder",
|
||||
compositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypeBuilder,
|
||||
Queries: []qbtypes.QueryEnvelope{
|
||||
{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
||||
Name: "A",
|
||||
StepInterval: qbtypes.Step{Duration: 60 * time.Second},
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
Aggregations: []qbtypes.MetricAggregation{
|
||||
{
|
||||
MetricName: "request_total",
|
||||
TimeAggregation: metrictypes.TimeAggregationCount,
|
||||
SpaceAggregation: metrictypes.SpaceAggregationSum,
|
||||
},
|
||||
},
|
||||
GroupBy: []qbtypes.GroupByKey{
|
||||
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service_name"}},
|
||||
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "env"}},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
series: []*v3.Series{
|
||||
createTestSeries(map[string]string{"service_name": "svc1", "env": "prod"}, nil),
|
||||
},
|
||||
// service_name is old, env is new - should use max (new)
|
||||
firstSeenMap: mergeFirstSeenMaps(
|
||||
createFirstSeenMap("request_total", []string{"service_name"}, defaultEvalTime, defaultDelay, true, "svc1"),
|
||||
createFirstSeenMap("request_total", []string{"env"}, defaultEvalTime, defaultDelay, false, "prod"),
|
||||
),
|
||||
newGroupEvalDelay: &defaultDelay,
|
||||
evalTime: defaultEvalTime,
|
||||
expectedFiltered: []*v3.Series{}, // max first_seen is new, so should be filtered out
|
||||
},
|
||||
{
|
||||
name: "Logs query - should skip filtering and return empty skip indexes",
|
||||
compositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypeBuilder,
|
||||
Queries: []qbtypes.QueryEnvelope{
|
||||
{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
|
||||
Name: "A",
|
||||
StepInterval: qbtypes.Step{Duration: 60 * time.Second},
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
Aggregations: []qbtypes.LogAggregation{
|
||||
{
|
||||
Expression: "count()",
|
||||
},
|
||||
},
|
||||
GroupBy: []qbtypes.GroupByKey{
|
||||
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service_name"}},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
series: []*v3.Series{
|
||||
createTestSeries(map[string]string{"service_name": "svc1"}, nil),
|
||||
createTestSeries(map[string]string{"service_name": "svc2"}, nil),
|
||||
},
|
||||
firstSeenMap: make(map[telemetrytypes.MetricMetadataLookupKey]int64),
|
||||
newGroupEvalDelay: &defaultDelay,
|
||||
evalTime: defaultEvalTime,
|
||||
expectedFiltered: []*v3.Series{
|
||||
createTestSeries(map[string]string{"service_name": "svc1"}, nil),
|
||||
createTestSeries(map[string]string{"service_name": "svc2"}, nil),
|
||||
}, // Logs queries should return early, no filtering - all included
|
||||
},
|
||||
{
|
||||
name: "Traces query - should skip filtering and return empty skip indexes",
|
||||
compositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypeBuilder,
|
||||
Queries: []qbtypes.QueryEnvelope{
|
||||
{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
|
||||
Name: "A",
|
||||
StepInterval: qbtypes.Step{Duration: 60 * time.Second},
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
Aggregations: []qbtypes.TraceAggregation{
|
||||
{
|
||||
Expression: "count()",
|
||||
},
|
||||
},
|
||||
GroupBy: []qbtypes.GroupByKey{
|
||||
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service_name"}},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
series: []*v3.Series{
|
||||
createTestSeries(map[string]string{"service_name": "svc1"}, nil),
|
||||
createTestSeries(map[string]string{"service_name": "svc2"}, nil),
|
||||
},
|
||||
firstSeenMap: make(map[telemetrytypes.MetricMetadataLookupKey]int64),
|
||||
newGroupEvalDelay: &defaultDelay,
|
||||
evalTime: defaultEvalTime,
|
||||
expectedFiltered: []*v3.Series{
|
||||
createTestSeries(map[string]string{"service_name": "svc1"}, nil),
|
||||
createTestSeries(map[string]string{"service_name": "svc2"}, nil),
|
||||
}, // Traces queries should return early, no filtering - all included
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
// Create postableRule from compositeQuery
|
||||
postableRule := createPostableRule(tt.compositeQuery)
|
||||
|
||||
// Setup telemetry store mock
|
||||
telemetryStore := telemetrystoretest.New(telemetrystore.Config{}, &queryMatcherAny{})
|
||||
|
||||
// Setup mock metadata store
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
|
||||
// Create query parser
|
||||
queryParser := queryparser.New(settings)
|
||||
|
||||
// Use query parser to extract metric names and groupBy fields
|
||||
analyzeResults, err := queryParser.AnalyzeQueryEnvelopes(context.Background(), tt.compositeQuery.Queries)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Aggregate results from all queries
|
||||
metricNames := []string{}
|
||||
groupedFields := []string{}
|
||||
for _, result := range analyzeResults {
|
||||
metricNames = append(metricNames, result.MetricNames...)
|
||||
for _, col := range result.GroupByColumns {
|
||||
groupedFields = append(groupedFields, col.OriginField)
|
||||
}
|
||||
}
|
||||
|
||||
// Setup metadata query mock
|
||||
mockMetadataStore.SetFirstSeenFromMetricMetadata(tt.firstSeenMap)
|
||||
|
||||
// Create reader with mocked telemetry store
|
||||
readerCache, err := cachetest.New(
|
||||
cache.Config{
|
||||
Provider: "memory",
|
||||
Memory: cache.Memory{
|
||||
NumCounters: 10 * 1000,
|
||||
MaxCost: 1 << 26,
|
||||
},
|
||||
},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
|
||||
reader := clickhouseReader.NewReader(
|
||||
nil,
|
||||
telemetryStore,
|
||||
prometheustest.New(context.Background(), settings, prometheus.Config{}, telemetryStore),
|
||||
"",
|
||||
time.Duration(time.Second),
|
||||
nil,
|
||||
readerCache,
|
||||
options,
|
||||
)
|
||||
|
||||
// Set newGroupEvalDelay in NotificationSettings if provided
|
||||
if tt.newGroupEvalDelay != nil {
|
||||
postableRule.NotificationSettings = &ruletypes.NotificationSettings{
|
||||
NewGroupEvalDelay: func() *ruletypes.Duration {
|
||||
d := ruletypes.Duration(*tt.newGroupEvalDelay)
|
||||
return &d
|
||||
}(),
|
||||
}
|
||||
}
|
||||
|
||||
// Create BaseRule using NewBaseRule
|
||||
rule, err := NewBaseRule("test-rule", valuer.GenerateUUID(), &postableRule, reader, WithQueryParser(queryParser), WithLogger(logger), WithMetadataStore(mockMetadataStore))
|
||||
require.NoError(t, err)
|
||||
|
||||
filteredSeries, err := rule.FilterNewSeries(context.Background(), tt.evalTime, tt.series)
|
||||
|
||||
if tt.expectError {
|
||||
require.Error(t, err)
|
||||
return
|
||||
}
|
||||
|
||||
require.NoError(t, err)
|
||||
|
||||
// Build a map to count occurrences of each unique label combination in expected series
|
||||
expectedCounts := make(map[string]int)
|
||||
for _, expected := range tt.expectedFiltered {
|
||||
key := labelsKey(expected.Labels)
|
||||
expectedCounts[key]++
|
||||
}
|
||||
|
||||
// Build a map to count occurrences of each unique label combination in filtered series
|
||||
actualCounts := make(map[string]int)
|
||||
for _, filtered := range filteredSeries {
|
||||
key := labelsKey(filtered.Labels)
|
||||
actualCounts[key]++
|
||||
}
|
||||
|
||||
// Verify counts match for all expected label combinations
|
||||
for key, expectedCount := range expectedCounts {
|
||||
actualCount := actualCounts[key]
|
||||
require.Equal(t, expectedCount, actualCount, "series with labels %s should appear %d times, but found %d times", key, expectedCount, actualCount)
|
||||
}
|
||||
|
||||
// Verify no unexpected series were found (all actual series should be in expected)
|
||||
require.Equal(t, len(tt.expectedFiltered), len(filteredSeries), "filtered series count should match expected")
|
||||
for key := range actualCounts {
|
||||
_, exists := expectedCounts[key]
|
||||
require.True(t, exists, "unexpected series found with labels: %s", key)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// labelsKey creates a deterministic string key from a labels map
|
||||
// This is used to group series by their unique label combinations
|
||||
func labelsKey(lbls map[string]string) string {
|
||||
if len(lbls) == 0 {
|
||||
return ""
|
||||
}
|
||||
return labels.FromMap(lbls).String()
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
|
||||
"github.com/SigNoz/signoz/pkg/queryparser"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
@@ -30,6 +31,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
@@ -84,6 +86,7 @@ func prepareTaskName(ruleId interface{}) string {
|
||||
// ManagerOptions bundles options for the Manager.
|
||||
type ManagerOptions struct {
|
||||
TelemetryStore telemetrystore.TelemetryStore
|
||||
MetadataStore telemetrytypes.MetadataStore
|
||||
Prometheus prometheus.Prometheus
|
||||
|
||||
Context context.Context
|
||||
@@ -103,6 +106,7 @@ type ManagerOptions struct {
|
||||
RuleStore ruletypes.RuleStore
|
||||
MaintenanceStore ruletypes.MaintenanceStore
|
||||
SqlStore sqlstore.SQLStore
|
||||
QueryParser queryparser.QueryParser
|
||||
}
|
||||
|
||||
// The Manager manages recording and alerting rules.
|
||||
@@ -125,6 +129,8 @@ type Manager struct {
|
||||
alertmanager alertmanager.Alertmanager
|
||||
sqlstore sqlstore.SQLStore
|
||||
orgGetter organization.Getter
|
||||
// queryParser is used for parsing queries for rules
|
||||
queryParser queryparser.QueryParser
|
||||
}
|
||||
|
||||
func defaultOptions(o *ManagerOptions) *ManagerOptions {
|
||||
@@ -166,6 +172,8 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
|
||||
opts.SLogger,
|
||||
WithEvalDelay(opts.ManagerOpts.EvalDelay),
|
||||
WithSQLStore(opts.SQLStore),
|
||||
WithQueryParser(opts.ManagerOpts.QueryParser),
|
||||
WithMetadataStore(opts.ManagerOpts.MetadataStore),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
@@ -188,6 +196,8 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
|
||||
opts.Reader,
|
||||
opts.ManagerOpts.Prometheus,
|
||||
WithSQLStore(opts.SQLStore),
|
||||
WithQueryParser(opts.ManagerOpts.QueryParser),
|
||||
WithMetadataStore(opts.ManagerOpts.MetadataStore),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
@@ -226,6 +236,7 @@ func NewManager(o *ManagerOptions) (*Manager, error) {
|
||||
alertmanager: o.Alertmanager,
|
||||
orgGetter: o.OrgGetter,
|
||||
sqlstore: o.SqlStore,
|
||||
queryParser: o.QueryParser,
|
||||
}
|
||||
|
||||
zap.L().Debug("Manager created successfully with NotificationGroup")
|
||||
|
||||
@@ -119,6 +119,15 @@ func (r *PromRule) getPqlQuery() (string, error) {
|
||||
return "", fmt.Errorf("invalid promql rule query")
|
||||
}
|
||||
|
||||
func (r *PromRule) matrixToV3Series(res promql.Matrix) []*v3.Series {
|
||||
v3Series := make([]*v3.Series, 0, len(res))
|
||||
for _, series := range res {
|
||||
commonSeries := toCommonSeries(series)
|
||||
v3Series = append(v3Series, &commonSeries)
|
||||
}
|
||||
return v3Series
|
||||
}
|
||||
|
||||
func (r *PromRule) buildAndRunQuery(ctx context.Context, ts time.Time) (ruletypes.Vector, error) {
|
||||
start, end := r.Timestamps(ts)
|
||||
interval := 60 * time.Second // TODO(srikanthccv): this should be configurable
|
||||
@@ -135,9 +144,21 @@ func (r *PromRule) buildAndRunQuery(ctx context.Context, ts time.Time) (ruletype
|
||||
return nil, err
|
||||
}
|
||||
|
||||
matrixToProcess := r.matrixToV3Series(res)
|
||||
// Filter out new series if newGroupEvalDelay is configured
|
||||
if r.ShouldSkipNewGroups() {
|
||||
filteredSeries, filterErr := r.BaseRule.FilterNewSeries(ctx, ts, matrixToProcess)
|
||||
// In case of error we log the error and continue with the original series
|
||||
if filterErr != nil {
|
||||
r.logger.ErrorContext(ctx, "Error filtering new series, ", "error", filterErr, "rule_name", r.Name())
|
||||
} else {
|
||||
matrixToProcess = filteredSeries
|
||||
}
|
||||
}
|
||||
|
||||
var resultVector ruletypes.Vector
|
||||
for _, series := range res {
|
||||
resultSeries, err := r.Threshold.Eval(toCommonSeries(series), r.Unit(), ruletypes.EvalData{
|
||||
for _, series := range matrixToProcess {
|
||||
resultSeries, err := r.Threshold.Eval(*series, r.Unit(), ruletypes.EvalData{
|
||||
ActiveAlerts: r.ActiveAlertsLabelFP(),
|
||||
SendUnmatched: r.ShouldSendUnmatched(),
|
||||
})
|
||||
|
||||
@@ -52,6 +52,8 @@ func defaultTestNotification(opts PrepareTestRuleOptions) (int, *model.ApiError)
|
||||
WithSendAlways(),
|
||||
WithSendUnmatched(),
|
||||
WithSQLStore(opts.SQLStore),
|
||||
WithQueryParser(opts.ManagerOpts.QueryParser),
|
||||
WithMetadataStore(opts.ManagerOpts.MetadataStore),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
@@ -72,6 +74,8 @@ func defaultTestNotification(opts PrepareTestRuleOptions) (int, *model.ApiError)
|
||||
WithSendAlways(),
|
||||
WithSendUnmatched(),
|
||||
WithSQLStore(opts.SQLStore),
|
||||
WithQueryParser(opts.ManagerOpts.QueryParser),
|
||||
WithMetadataStore(opts.ManagerOpts.MetadataStore),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
|
||||
@@ -561,7 +561,19 @@ func (r *ThresholdRule) buildAndRunQueryV5(ctx context.Context, orgID valuer.UUI
|
||||
return resultVector, nil
|
||||
}
|
||||
|
||||
for _, series := range queryResult.Series {
|
||||
// Filter out new series if newGroupEvalDelay is configured
|
||||
seriesToProcess := queryResult.Series
|
||||
if r.ShouldSkipNewGroups() {
|
||||
filteredSeries, filterErr := r.BaseRule.FilterNewSeries(ctx, ts, seriesToProcess)
|
||||
// In case of error we log the error and continue with the original series
|
||||
if filterErr != nil {
|
||||
r.logger.ErrorContext(ctx, "Error filtering new series, ", "error", filterErr, "rule_name", r.Name())
|
||||
} else {
|
||||
seriesToProcess = filteredSeries
|
||||
}
|
||||
}
|
||||
|
||||
for _, series := range seriesToProcess {
|
||||
if r.Condition() != nil && r.Condition().RequireMinPoints {
|
||||
if len(series.Points) < r.Condition().RequiredNumPoints {
|
||||
r.logger.InfoContext(ctx, "not enough data points to evaluate series, skipping", "ruleid", r.ID(), "numPoints", len(series.Points), "requiredPoints", r.Condition().RequiredNumPoints)
|
||||
|
||||
@@ -2,6 +2,7 @@ package queryfilterextractor
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
clickhouse "github.com/AfterShip/clickhouse-sql-parser/parser"
|
||||
@@ -87,6 +88,12 @@ func (e *ClickHouseFilterExtractor) Extract(query string) (*FilterResult, error)
|
||||
result.GroupByColumns = append(result.GroupByColumns, colInfo)
|
||||
}
|
||||
|
||||
// Sort the metric names and group by columns to return deterministic results which helps in tests as well
|
||||
sort.Strings(result.MetricNames)
|
||||
sort.Slice(result.GroupByColumns, func(i, j int) bool {
|
||||
return result.GroupByColumns[i].Name < result.GroupByColumns[j].Name
|
||||
})
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
@@ -331,6 +338,12 @@ func (e *ClickHouseFilterExtractor) stripTableAlias(name string) string {
|
||||
return strings.Trim(name, "`")
|
||||
}
|
||||
|
||||
// Handling for function calls like "UPPER(JSONExtractString(labels, 'region'))"
|
||||
// the stripTableAlias function should return these as is
|
||||
if strings.Contains(name, "(") && strings.Contains(name, ")") {
|
||||
return name
|
||||
}
|
||||
|
||||
// split the name by dot and return the last part
|
||||
parts := strings.Split(name, ".")
|
||||
if len(parts) > 1 {
|
||||
|
||||
@@ -125,6 +125,62 @@ func TestClickHouseFilterExtractor_GroupByColumns(t *testing.T) {
|
||||
{Name: "os.type", Alias: "os_type", OriginExpr: "`os.type`", OriginField: "os.type"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "13 - Group by with function aliases in group by",
|
||||
query: `
|
||||
SELECT
|
||||
toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(60)) as ts,
|
||||
JSONExtractString(labels, 'region') AS region,
|
||||
JSONExtractString(labels, 'instance') AS instance,
|
||||
JSONExtractString(labels, 'http.method') AS http_method,
|
||||
metric_name as metricName,
|
||||
rand() as value
|
||||
FROM signoz_metrics.time_series_v4
|
||||
WHERE (metric_name IN ('test_metric_cardinality'))
|
||||
GROUP BY
|
||||
ts,
|
||||
metric_name,
|
||||
region,
|
||||
instance,
|
||||
http_method
|
||||
`,
|
||||
wantMetrics: []string{"test_metric_cardinality"},
|
||||
wantGroupByColumns: []ColumnInfo{
|
||||
{Name: "region", Alias: "", OriginExpr: "JSONExtractString(labels, 'region')", OriginField: "region"},
|
||||
{Name: "instance", Alias: "", OriginExpr: "JSONExtractString(labels, 'instance')", OriginField: "instance"},
|
||||
{Name: "http_method", Alias: "", OriginExpr: "JSONExtractString(labels, 'http.method')", OriginField: "http.method"},
|
||||
{Name: "metric_name", Alias: "metricName", OriginExpr: "metric_name", OriginField: "metric_name"},
|
||||
{Name: "ts", Alias: "", OriginExpr: "toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(60))", OriginField: ""},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "14 - Group by with function group by column",
|
||||
query: `
|
||||
SELECT
|
||||
toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(60)) as ts,
|
||||
JSONExtractString(labels, 'region'),
|
||||
JSONExtractString(labels, 'instance'),
|
||||
JSONExtractString(labels, 'http.method'),
|
||||
metric_name as metricName,
|
||||
rand() as value
|
||||
FROM signoz_metrics.time_series_v4
|
||||
WHERE (metric_name IN ('test_metric_cardinality'))
|
||||
GROUP BY
|
||||
ts,
|
||||
metric_name,
|
||||
JSONExtractString(labels, 'region'),
|
||||
JSONExtractString(labels, 'instance'),
|
||||
JSONExtractString(labels, 'http.method')
|
||||
`,
|
||||
wantMetrics: []string{"test_metric_cardinality"},
|
||||
wantGroupByColumns: []ColumnInfo{
|
||||
{Name: "JSONExtractString(labels, 'region')", Alias: "", OriginExpr: "JSONExtractString(labels, 'region')", OriginField: "region"},
|
||||
{Name: "JSONExtractString(labels, 'instance')", Alias: "", OriginExpr: "JSONExtractString(labels, 'instance')", OriginField: "instance"},
|
||||
{Name: "JSONExtractString(labels, 'http.method')", Alias: "", OriginExpr: "JSONExtractString(labels, 'http.method')", OriginField: "http.method"},
|
||||
{Name: "metric_name", Alias: "metricName", OriginExpr: "metric_name", OriginField: "metric_name"},
|
||||
{Name: "ts", Alias: "", OriginExpr: "toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(60))", OriginField: ""},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package queryfilterextractor
|
||||
|
||||
import (
|
||||
"sort"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/promql/parser"
|
||||
@@ -45,6 +47,12 @@ func (e *PromQLFilterExtractor) Extract(query string) (*FilterResult, error) {
|
||||
result.GroupByColumns = append(result.GroupByColumns, ColumnInfo{Name: groupKey, OriginExpr: groupKey, OriginField: groupKey})
|
||||
}
|
||||
|
||||
// Sort the metric names and group by columns to return deterministic results which helps in tests as well
|
||||
sort.Strings(result.MetricNames)
|
||||
sort.Slice(result.GroupByColumns, func(i, j int) bool {
|
||||
return result.GroupByColumns[i].Name < result.GroupByColumns[j].Name
|
||||
})
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -5,10 +5,14 @@ import (
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/queryparser/queryfilterextractor"
|
||||
"github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
)
|
||||
|
||||
// QueryParser defines the interface for parsing and analyzing queries.
|
||||
type QueryParser interface {
|
||||
// AnalyzeQueryFilter extracts filter conditions from a given query string.
|
||||
AnalyzeQueryFilter(ctx context.Context, queryType querybuildertypesv5.QueryType, query string) (*queryfilterextractor.FilterResult, error)
|
||||
// AnalyzeQueryEnvelopes extracts filter conditions from a list of query envelopes.
|
||||
// Returns a map of query name to FilterResult.
|
||||
AnalyzeQueryEnvelopes(ctx context.Context, queries []qbtypes.QueryEnvelope) (map[string]*queryfilterextractor.FilterResult, error)
|
||||
}
|
||||
|
||||
@@ -1,40 +0,0 @@
|
||||
package queryparser
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/queryparser/queryfilterextractor"
|
||||
"github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
)
|
||||
|
||||
type queryParserImpl struct {
|
||||
settings factory.ProviderSettings
|
||||
}
|
||||
|
||||
// New creates a new implementation of the QueryParser service.
|
||||
func New(settings factory.ProviderSettings) QueryParser {
|
||||
return &queryParserImpl{
|
||||
settings: settings,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *queryParserImpl) AnalyzeQueryFilter(ctx context.Context, queryType querybuildertypesv5.QueryType, query string) (*queryfilterextractor.FilterResult, error) {
|
||||
var extractorType queryfilterextractor.ExtractorType
|
||||
switch queryType {
|
||||
case querybuildertypesv5.QueryTypePromQL:
|
||||
extractorType = queryfilterextractor.ExtractorTypePromQL
|
||||
case querybuildertypesv5.QueryTypeClickHouseSQL:
|
||||
extractorType = queryfilterextractor.ExtractorTypeClickHouseSQL
|
||||
default:
|
||||
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported queryType: %s. Supported values are '%s' and '%s'", queryType, querybuildertypesv5.QueryTypePromQL, querybuildertypesv5.QueryTypeClickHouseSQL)
|
||||
}
|
||||
|
||||
// Create extractor
|
||||
extractor, err := queryfilterextractor.NewExtractor(extractorType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return extractor.Extract(query)
|
||||
}
|
||||
168
pkg/queryparser/queryparserimpl.go
Normal file
168
pkg/queryparser/queryparserimpl.go
Normal file
@@ -0,0 +1,168 @@
|
||||
package queryparser
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"strings"
|
||||
|
||||
"github.com/SigNoz/govaluate"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/queryparser/queryfilterextractor"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
)
|
||||
|
||||
type queryParserImpl struct {
|
||||
settings factory.ProviderSettings
|
||||
}
|
||||
|
||||
// New creates a new implementation of the QueryParser service.
|
||||
func New(settings factory.ProviderSettings) QueryParser {
|
||||
return &queryParserImpl{
|
||||
settings: settings,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *queryParserImpl) AnalyzeQueryFilter(ctx context.Context, queryType qbtypes.QueryType, query string) (*queryfilterextractor.FilterResult, error) {
|
||||
var extractorType queryfilterextractor.ExtractorType
|
||||
switch queryType {
|
||||
case qbtypes.QueryTypePromQL:
|
||||
extractorType = queryfilterextractor.ExtractorTypePromQL
|
||||
case qbtypes.QueryTypeClickHouseSQL:
|
||||
extractorType = queryfilterextractor.ExtractorTypeClickHouseSQL
|
||||
default:
|
||||
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported queryType: %s. Supported values are '%s' and '%s'", queryType, qbtypes.QueryTypePromQL, qbtypes.QueryTypeClickHouseSQL)
|
||||
}
|
||||
|
||||
// Create extractor
|
||||
extractor, err := queryfilterextractor.NewExtractor(extractorType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return extractor.Extract(query)
|
||||
}
|
||||
|
||||
func (p *queryParserImpl) AnalyzeQueryEnvelopes(ctx context.Context, queries []qbtypes.QueryEnvelope) (map[string]*queryfilterextractor.FilterResult, error) {
|
||||
results := make(map[string]*queryfilterextractor.FilterResult)
|
||||
|
||||
// formulaQueries store the formula queries in the order they are defined
|
||||
formulaQueries := make(map[string]qbtypes.QueryBuilderFormula)
|
||||
|
||||
// First pass: Process non-formula queries
|
||||
for _, query := range queries {
|
||||
result := &queryfilterextractor.FilterResult{
|
||||
MetricNames: []string{},
|
||||
GroupByColumns: []queryfilterextractor.ColumnInfo{},
|
||||
}
|
||||
var queryName string
|
||||
|
||||
switch query.Type {
|
||||
case qbtypes.QueryTypeBuilder:
|
||||
switch spec := query.Spec.(type) {
|
||||
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
|
||||
queryName = spec.Name
|
||||
// extract group by fields
|
||||
for _, groupBy := range spec.GroupBy {
|
||||
if groupBy.Name != "" {
|
||||
result.GroupByColumns = append(result.GroupByColumns, queryfilterextractor.ColumnInfo{Name: groupBy.Name, OriginExpr: groupBy.Name, OriginField: groupBy.Name, Alias: groupBy.Name})
|
||||
}
|
||||
}
|
||||
// extract metric names
|
||||
for _, aggregation := range spec.Aggregations {
|
||||
if aggregation.MetricName != "" {
|
||||
result.MetricNames = append(result.MetricNames, aggregation.MetricName)
|
||||
}
|
||||
}
|
||||
default:
|
||||
// TODO(abhishekhugetech): add support for Traces and Logs Aggregation types
|
||||
p.settings.Logger.WarnContext(ctx, "unsupported QueryBuilderQuery type: ", spec)
|
||||
// Skip result for this query
|
||||
continue
|
||||
}
|
||||
case qbtypes.QueryTypePromQL:
|
||||
spec, ok := query.Spec.(qbtypes.PromQuery)
|
||||
if !ok || spec.Query == "" {
|
||||
// Skip result for this query
|
||||
continue
|
||||
}
|
||||
queryName = spec.Name
|
||||
res, err := p.AnalyzeQueryFilter(ctx, qbtypes.QueryTypePromQL, spec.Query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result.MetricNames = append(result.MetricNames, res.MetricNames...)
|
||||
result.GroupByColumns = append(result.GroupByColumns, res.GroupByColumns...)
|
||||
case qbtypes.QueryTypeClickHouseSQL:
|
||||
spec, ok := query.Spec.(qbtypes.ClickHouseQuery)
|
||||
if !ok || spec.Query == "" {
|
||||
// Skip result for this query
|
||||
continue
|
||||
}
|
||||
queryName = spec.Name
|
||||
res, err := p.AnalyzeQueryFilter(ctx, qbtypes.QueryTypeClickHouseSQL, spec.Query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result.MetricNames = append(result.MetricNames, res.MetricNames...)
|
||||
result.GroupByColumns = append(result.GroupByColumns, res.GroupByColumns...)
|
||||
case qbtypes.QueryTypeFormula:
|
||||
spec, ok := query.Spec.(qbtypes.QueryBuilderFormula)
|
||||
if !ok {
|
||||
// Skip result for this query
|
||||
continue
|
||||
}
|
||||
formulaQueries[spec.Name] = spec
|
||||
default:
|
||||
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported query type: %s", query.Type)
|
||||
}
|
||||
|
||||
if queryName != "" {
|
||||
results[queryName] = result
|
||||
}
|
||||
}
|
||||
|
||||
// Second pass: Process formula queries
|
||||
for _, query := range formulaQueries {
|
||||
result := &queryfilterextractor.FilterResult{
|
||||
MetricNames: []string{},
|
||||
GroupByColumns: []queryfilterextractor.ColumnInfo{},
|
||||
}
|
||||
|
||||
// Parse the expression to find used queries
|
||||
expression, err := govaluate.NewEvaluableExpressionWithFunctions(query.Expression, qbtypes.EvalFuncs())
|
||||
if err != nil {
|
||||
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "failed to parse formula expression %s: %v", query.Name, err)
|
||||
}
|
||||
|
||||
uniqueMetricNames := make(map[string]bool)
|
||||
uniqueGroupByColumns := make(map[string]bool)
|
||||
|
||||
vars := expression.Vars()
|
||||
for _, v := range vars {
|
||||
// variables can be "A" or "A.0" or "A.alias" as per pkg/types/querybuildertypes/querybuildertypesv5/formula.go
|
||||
parts := strings.Split(v, ".")
|
||||
if len(parts) > 0 {
|
||||
refQueryName := parts[0]
|
||||
if refResult, exists := results[refQueryName]; exists {
|
||||
for _, metricName := range refResult.MetricNames {
|
||||
if !uniqueMetricNames[metricName] {
|
||||
uniqueMetricNames[metricName] = true
|
||||
result.MetricNames = append(result.MetricNames, metricName)
|
||||
}
|
||||
}
|
||||
for _, groupByColumn := range refResult.GroupByColumns {
|
||||
if !uniqueGroupByColumns[groupByColumn.Name] {
|
||||
uniqueGroupByColumns[groupByColumn.Name] = true
|
||||
result.GroupByColumns = append(result.GroupByColumns, groupByColumn)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add the formula query filter result to the results map
|
||||
results[query.Name] = result
|
||||
}
|
||||
|
||||
return results, nil
|
||||
}
|
||||
214
pkg/queryparser/queryparserimpl_test.go
Normal file
214
pkg/queryparser/queryparserimpl_test.go
Normal file
@@ -0,0 +1,214 @@
|
||||
package queryparser
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"testing"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
"github.com/SigNoz/signoz/pkg/queryparser/queryfilterextractor"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestBaseRule_ExtractMetricAndGroupBys(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
payload string
|
||||
wantResults map[string]*queryfilterextractor.FilterResult
|
||||
}{
|
||||
{
|
||||
name: "builder multiple grouping",
|
||||
payload: builderQueryWithGrouping,
|
||||
wantResults: map[string]*queryfilterextractor.FilterResult{
|
||||
"A": {
|
||||
MetricNames: []string{"test_metric_cardinality", "cpu_usage_total"},
|
||||
GroupByColumns: []queryfilterextractor.ColumnInfo{
|
||||
{Name: "service_name", Alias: "service_name", OriginExpr: "service_name", OriginField: "service_name"},
|
||||
{Name: "env", Alias: "env", OriginExpr: "env", OriginField: "env"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "builder single grouping",
|
||||
payload: builderQuerySingleGrouping,
|
||||
wantResults: map[string]*queryfilterextractor.FilterResult{
|
||||
"B": {
|
||||
MetricNames: []string{"latency_p50"},
|
||||
GroupByColumns: []queryfilterextractor.ColumnInfo{
|
||||
{Name: "namespace", Alias: "namespace", OriginExpr: "namespace", OriginField: "namespace"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "builder no grouping",
|
||||
payload: builderQueryNoGrouping,
|
||||
wantResults: map[string]*queryfilterextractor.FilterResult{
|
||||
"C": {
|
||||
MetricNames: []string{"disk_usage_total"},
|
||||
GroupByColumns: []queryfilterextractor.ColumnInfo{},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "promql multiple grouping",
|
||||
payload: promQueryWithGrouping,
|
||||
wantResults: map[string]*queryfilterextractor.FilterResult{
|
||||
"P1": {
|
||||
MetricNames: []string{"http_requests_total"},
|
||||
GroupByColumns: []queryfilterextractor.ColumnInfo{
|
||||
{Name: "pod", Alias: "", OriginExpr: "pod", OriginField: "pod"},
|
||||
{Name: "region", Alias: "", OriginExpr: "region", OriginField: "region"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "promql single grouping",
|
||||
payload: promQuerySingleGrouping,
|
||||
wantResults: map[string]*queryfilterextractor.FilterResult{
|
||||
"P2": {
|
||||
MetricNames: []string{"cpu_usage_seconds_total"},
|
||||
GroupByColumns: []queryfilterextractor.ColumnInfo{
|
||||
{Name: "env", Alias: "", OriginExpr: "env", OriginField: "env"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "promql no grouping",
|
||||
payload: promQueryNoGrouping,
|
||||
wantResults: map[string]*queryfilterextractor.FilterResult{
|
||||
"P3": {
|
||||
MetricNames: []string{"node_cpu_seconds_total"},
|
||||
GroupByColumns: []queryfilterextractor.ColumnInfo{},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "clickhouse multiple grouping",
|
||||
payload: clickHouseQueryWithGrouping,
|
||||
wantResults: map[string]*queryfilterextractor.FilterResult{
|
||||
"CH1": {
|
||||
MetricNames: []string{"cpu"},
|
||||
GroupByColumns: []queryfilterextractor.ColumnInfo{
|
||||
{Name: "region", Alias: "r", OriginExpr: "region", OriginField: "region"},
|
||||
{Name: "zone", Alias: "", OriginExpr: "zone", OriginField: "zone"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "clickhouse single grouping",
|
||||
payload: clickHouseQuerySingleGrouping,
|
||||
wantResults: map[string]*queryfilterextractor.FilterResult{
|
||||
"CH2": {
|
||||
MetricNames: []string{"cpu_usage"},
|
||||
GroupByColumns: []queryfilterextractor.ColumnInfo{
|
||||
{Name: "region", Alias: "r", OriginExpr: "region", OriginField: "region"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "clickhouse no grouping",
|
||||
payload: clickHouseQueryNoGrouping,
|
||||
wantResults: map[string]*queryfilterextractor.FilterResult{
|
||||
"CH3": {
|
||||
MetricNames: []string{"memory_usage"},
|
||||
GroupByColumns: []queryfilterextractor.ColumnInfo{},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "builder formula for builder queries",
|
||||
payload: builderQueryWithFormula,
|
||||
wantResults: map[string]*queryfilterextractor.FilterResult{
|
||||
"A": {
|
||||
MetricNames: []string{"cpu_usage"},
|
||||
GroupByColumns: []queryfilterextractor.ColumnInfo{},
|
||||
},
|
||||
"B": {
|
||||
MetricNames: []string{"mem_usage"},
|
||||
GroupByColumns: []queryfilterextractor.ColumnInfo{},
|
||||
},
|
||||
"F1": {
|
||||
MetricNames: []string{"cpu_usage", "mem_usage"},
|
||||
GroupByColumns: []queryfilterextractor.ColumnInfo{},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "builder formula with group by",
|
||||
payload: builderQueryWithFormulaAndGroupBy,
|
||||
wantResults: map[string]*queryfilterextractor.FilterResult{
|
||||
"A": {
|
||||
MetricNames: []string{"cpu"},
|
||||
GroupByColumns: []queryfilterextractor.ColumnInfo{
|
||||
{Name: "host", Alias: "host", OriginExpr: "host", OriginField: "host"},
|
||||
{Name: "region", Alias: "region", OriginExpr: "region", OriginField: "region"},
|
||||
},
|
||||
},
|
||||
"B": {
|
||||
MetricNames: []string{"mem"},
|
||||
GroupByColumns: []queryfilterextractor.ColumnInfo{
|
||||
{Name: "host", Alias: "host", OriginExpr: "host", OriginField: "host"},
|
||||
{Name: "instance", Alias: "instance", OriginExpr: "instance", OriginField: "instance"},
|
||||
},
|
||||
},
|
||||
"F1": {
|
||||
MetricNames: []string{"cpu", "mem"},
|
||||
GroupByColumns: []queryfilterextractor.ColumnInfo{
|
||||
{Name: "host", Alias: "host", OriginExpr: "host", OriginField: "host"},
|
||||
{Name: "instance", Alias: "instance", OriginExpr: "instance", OriginField: "instance"},
|
||||
{Name: "region", Alias: "region", OriginExpr: "region", OriginField: "region"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "builder formula referencing same query multiple times",
|
||||
payload: builderQueryWithFormulaSameQuery,
|
||||
wantResults: map[string]*queryfilterextractor.FilterResult{
|
||||
"A": {
|
||||
MetricNames: []string{"disk_used"},
|
||||
GroupByColumns: []queryfilterextractor.ColumnInfo{},
|
||||
},
|
||||
"F1": {
|
||||
MetricNames: []string{"disk_used"},
|
||||
GroupByColumns: []queryfilterextractor.ColumnInfo{},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
queryParser := New(instrumentationtest.New().ToProviderSettings())
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
queryEnvelopes := mustCompositeQueryEnvelope(t, tt.payload)
|
||||
results, err := queryParser.AnalyzeQueryEnvelopes(ctx, queryEnvelopes)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, results, len(queryEnvelopes), "number of results should match number of queries")
|
||||
|
||||
// Check each expected query result
|
||||
for queryName, expectedResult := range tt.wantResults {
|
||||
result, ok := results[queryName]
|
||||
require.True(t, ok, "query %s should be present in results", queryName)
|
||||
require.ElementsMatch(t, expectedResult.MetricNames, result.MetricNames, "metrics mismatch for query %s", queryName)
|
||||
require.ElementsMatch(t, expectedResult.GroupByColumns, result.GroupByColumns, "group by columns mismatch for query %s", queryName)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func mustCompositeQueryEnvelope(t *testing.T, payload string) []qbtypes.QueryEnvelope {
|
||||
t.Helper()
|
||||
var queryEnvelopes []qbtypes.QueryEnvelope
|
||||
require.NoError(t, json.Unmarshal([]byte(payload), &queryEnvelopes))
|
||||
return queryEnvelopes
|
||||
}
|
||||
255
pkg/queryparser/queryparserimpl_testdata.go
Normal file
255
pkg/queryparser/queryparserimpl_testdata.go
Normal file
@@ -0,0 +1,255 @@
|
||||
package queryparser
|
||||
|
||||
var (
|
||||
builderQueryWithGrouping = `
|
||||
[
|
||||
{
|
||||
"type":"builder_query",
|
||||
"spec":{
|
||||
"name":"A",
|
||||
"signal":"metrics",
|
||||
"stepInterval":null,
|
||||
"disabled":false,
|
||||
"filter":{"expression":""},
|
||||
"groupBy":[
|
||||
{"name":"service_name","fieldDataType":"","fieldContext":""},
|
||||
{"name":"env","fieldDataType":"","fieldContext":""}
|
||||
],
|
||||
"aggregations":[
|
||||
{"metricName":"test_metric_cardinality","timeAggregation":"count","spaceAggregation":"sum"},
|
||||
{"metricName":"cpu_usage_total","timeAggregation":"avg","spaceAggregation":"avg"}
|
||||
]
|
||||
}
|
||||
}
|
||||
]
|
||||
`
|
||||
|
||||
builderQuerySingleGrouping = `
|
||||
[
|
||||
{
|
||||
"type":"builder_query",
|
||||
"spec":{
|
||||
"name":"B",
|
||||
"signal":"metrics",
|
||||
"stepInterval":null,
|
||||
"disabled":false,
|
||||
"groupBy":[
|
||||
{"name":"namespace","fieldDataType":"","fieldContext":""}
|
||||
],
|
||||
"aggregations":[
|
||||
{"metricName":"latency_p50","timeAggregation":"avg","spaceAggregation":"max"}
|
||||
]
|
||||
}
|
||||
}
|
||||
]
|
||||
`
|
||||
|
||||
builderQueryNoGrouping = `
|
||||
[
|
||||
{
|
||||
"type":"builder_query",
|
||||
"spec":{
|
||||
"name":"C",
|
||||
"signal":"metrics",
|
||||
"stepInterval":null,
|
||||
"disabled":false,
|
||||
"groupBy":[],
|
||||
"aggregations":[
|
||||
{"metricName":"disk_usage_total","timeAggregation":"sum","spaceAggregation":"sum"}
|
||||
]
|
||||
}
|
||||
}
|
||||
]
|
||||
`
|
||||
|
||||
promQueryWithGrouping = `
|
||||
[
|
||||
{
|
||||
"type":"promql",
|
||||
"spec":{
|
||||
"name":"P1",
|
||||
"query":"sum by (pod,region) (rate(http_requests_total[5m]))",
|
||||
"disabled":false,
|
||||
"step":0,
|
||||
"stats":false
|
||||
}
|
||||
}
|
||||
]
|
||||
`
|
||||
|
||||
promQuerySingleGrouping = `
|
||||
[
|
||||
{
|
||||
"type":"promql",
|
||||
"spec":{
|
||||
"name":"P2",
|
||||
"query":"sum by (env)(rate(cpu_usage_seconds_total{job=\"api\"}[5m]))",
|
||||
"disabled":false,
|
||||
"step":0,
|
||||
"stats":false
|
||||
}
|
||||
}
|
||||
]
|
||||
`
|
||||
|
||||
promQueryNoGrouping = `
|
||||
[
|
||||
{
|
||||
"type":"promql",
|
||||
"spec":{
|
||||
"name":"P3",
|
||||
"query":"rate(node_cpu_seconds_total[1m])",
|
||||
"disabled":false,
|
||||
"step":0,
|
||||
"stats":false
|
||||
}
|
||||
}
|
||||
]
|
||||
`
|
||||
|
||||
clickHouseQueryWithGrouping = `
|
||||
[
|
||||
{
|
||||
"type":"clickhouse_sql",
|
||||
"spec":{
|
||||
"name":"CH1",
|
||||
"query":"SELECT region as r, zone FROM metrics WHERE metric_name='cpu' GROUP BY region, zone",
|
||||
"disabled":false
|
||||
}
|
||||
}
|
||||
]
|
||||
`
|
||||
|
||||
clickHouseQuerySingleGrouping = `
|
||||
[
|
||||
{
|
||||
"type":"clickhouse_sql",
|
||||
"spec":{
|
||||
"name":"CH2",
|
||||
"query":"SELECT region as r FROM metrics WHERE metric_name='cpu_usage' GROUP BY region",
|
||||
"disabled":false
|
||||
}
|
||||
}
|
||||
]
|
||||
`
|
||||
|
||||
clickHouseQueryNoGrouping = `
|
||||
[
|
||||
{
|
||||
"type":"clickhouse_sql",
|
||||
"spec":{
|
||||
"name":"CH3",
|
||||
"query":"SELECT * FROM metrics WHERE metric_name = 'memory_usage'",
|
||||
"disabled":false
|
||||
}
|
||||
}
|
||||
]
|
||||
`
|
||||
|
||||
builderQueryWithFormula = `
|
||||
[
|
||||
{
|
||||
"type":"builder_query",
|
||||
"spec":{
|
||||
"name":"A",
|
||||
"signal":"metrics",
|
||||
"stepInterval":null,
|
||||
"disabled":false,
|
||||
"groupBy":[],
|
||||
"aggregations":[
|
||||
{"metricName":"cpu_usage","timeAggregation":"avg","spaceAggregation":"sum"}
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"type":"builder_query",
|
||||
"spec":{
|
||||
"name":"B",
|
||||
"signal":"metrics",
|
||||
"stepInterval":null,
|
||||
"disabled":false,
|
||||
"groupBy":[],
|
||||
"aggregations":[
|
||||
{"metricName":"mem_usage","timeAggregation":"avg","spaceAggregation":"sum"}
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"type":"builder_formula",
|
||||
"spec":{
|
||||
"name":"F1",
|
||||
"expression":"A + B"
|
||||
}
|
||||
}
|
||||
]
|
||||
`
|
||||
|
||||
builderQueryWithFormulaAndGroupBy = `
|
||||
[
|
||||
{
|
||||
"type":"builder_query",
|
||||
"spec":{
|
||||
"name":"A",
|
||||
"signal":"metrics",
|
||||
"stepInterval":null,
|
||||
"disabled":false,
|
||||
"groupBy":[
|
||||
{"name":"host","fieldDataType":"","fieldContext":""},
|
||||
{"name":"region","fieldDataType":"","fieldContext":""}
|
||||
],
|
||||
"aggregations":[
|
||||
{"metricName":"cpu","timeAggregation":"avg","spaceAggregation":"sum"}
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"type":"builder_query",
|
||||
"spec":{
|
||||
"name":"B",
|
||||
"signal":"metrics",
|
||||
"stepInterval":null,
|
||||
"disabled":false,
|
||||
"groupBy":[
|
||||
{"name":"host","fieldDataType":"","fieldContext":""},
|
||||
{"name":"instance","fieldDataType":"","fieldContext":""}
|
||||
],
|
||||
"aggregations":[
|
||||
{"metricName":"mem","timeAggregation":"avg","spaceAggregation":"sum"}
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"type":"builder_formula",
|
||||
"spec":{
|
||||
"name":"F1",
|
||||
"expression":"A + B"
|
||||
}
|
||||
}
|
||||
]
|
||||
`
|
||||
|
||||
builderQueryWithFormulaSameQuery = `
|
||||
[
|
||||
{
|
||||
"type":"builder_query",
|
||||
"spec":{
|
||||
"name":"A",
|
||||
"signal":"metrics",
|
||||
"stepInterval":null,
|
||||
"disabled":false,
|
||||
"groupBy":[],
|
||||
"aggregations":[
|
||||
{"metricName":"disk_used","timeAggregation":"sum","spaceAggregation":"sum"}
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"type":"builder_formula",
|
||||
"spec":{
|
||||
"name":"F1",
|
||||
"expression":"A + A"
|
||||
}
|
||||
}
|
||||
]
|
||||
`
|
||||
)
|
||||
@@ -1767,3 +1767,84 @@ func (t *telemetryMetaStore) fetchMeterSourceMetricsTemporality(ctx context.Cont
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// chunkSizeFirstSeenMetricMetadata limits the number of tuples per SQL query to avoid hitting the max_query_size limit.
|
||||
//
|
||||
// Calculation Logic:
|
||||
//
|
||||
// 1. The Ceiling: 250 KB (256,000 bytes). A conservative safety limit set just below the common DB max_query_size (262KB)
|
||||
// to guarantee the database does not reject the query.
|
||||
// Reference: https://clickhouse.com/docs/operations/settings/settings#max_query_size
|
||||
//
|
||||
// 2. Unit Cost: ~150 bytes per tuple. The estimated "weight" of a single lookup key, summing MetricName (40B),
|
||||
// AttrName (30B), AttrValue (64B), and SQL syntax overhead (16B).
|
||||
//
|
||||
// 3. Theoretical Max: ~1,706 tuples. The absolute maximum capacity if all keys adhere to the average size (256,000 / 150).
|
||||
//
|
||||
// 4. Final Limit: 1600. Rounds down to provide a ~6% safety buffer, accounting for data outliers
|
||||
// (unusually long attribute values) and multi-byte character expansion.
|
||||
const chunkSizeFirstSeenMetricMetadata = 1600
|
||||
|
||||
// GetFirstSeenFromMetricMetadata queries the metadata table to get the first_seen timestamp
|
||||
// for each metric-attribute-value combination.
|
||||
// Returns a map where key is `telemetrytypes.MetricMetadataLookupKey` and value is first_seen in milliseconds.
|
||||
func (t *telemetryMetaStore) GetFirstSeenFromMetricMetadata(ctx context.Context, lookupKeys []telemetrytypes.MetricMetadataLookupKey) (map[telemetrytypes.MetricMetadataLookupKey]int64, error) {
|
||||
result := make(map[telemetrytypes.MetricMetadataLookupKey]int64)
|
||||
|
||||
for i := 0; i < len(lookupKeys); i += chunkSizeFirstSeenMetricMetadata {
|
||||
end := i + chunkSizeFirstSeenMetricMetadata
|
||||
if end > len(lookupKeys) {
|
||||
end = len(lookupKeys)
|
||||
}
|
||||
chunk := lookupKeys[i:end]
|
||||
|
||||
sb := sqlbuilder.Select(
|
||||
"metric_name",
|
||||
"attr_name",
|
||||
"attr_string_value",
|
||||
"min(first_reported_unix_milli) AS first_seen",
|
||||
).From(t.metricsDBName + "." + t.metricsFieldsTblName)
|
||||
|
||||
lookupItems := make([]interface{}, 0, len(chunk))
|
||||
for _, key := range chunk {
|
||||
lookupItems = append(lookupItems, sqlbuilder.Tuple(key.MetricName, key.AttributeName, key.AttributeValue))
|
||||
}
|
||||
sb.Where(
|
||||
sb.In(
|
||||
sqlbuilder.TupleNames("metric_name", "attr_name", "attr_string_value"),
|
||||
lookupItems...,
|
||||
),
|
||||
)
|
||||
sb.GroupBy("metric_name", "attr_name", "attr_string_value")
|
||||
sb.OrderBy("first_seen")
|
||||
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to query metadata for first_seen")
|
||||
}
|
||||
|
||||
for rows.Next() {
|
||||
var metricName, attrName, attrValue string
|
||||
var firstSeen uint64
|
||||
if err := rows.Scan(&metricName, &attrName, &attrValue, &firstSeen); err != nil {
|
||||
rows.Close()
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to scan metadata first_seen result")
|
||||
}
|
||||
result[telemetrytypes.MetricMetadataLookupKey{
|
||||
MetricName: metricName,
|
||||
AttributeName: attrName,
|
||||
AttributeValue: attrValue,
|
||||
}] = int64(firstSeen)
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
rows.Close()
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to iterate metadata first_seen results")
|
||||
}
|
||||
rows.Close()
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
86
pkg/telemetrymetadata/metadata_query_test.go
Normal file
86
pkg/telemetrymetadata/metadata_query_test.go
Normal file
@@ -0,0 +1,86 @@
|
||||
package telemetrymetadata
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrylogs"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymeter"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrytraces"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
cmock "github.com/srikanthccv/ClickHouse-go-mock"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestGetFirstSeenFromMetricMetadata(t *testing.T) {
|
||||
mockTelemetryStore := telemetrystoretest.New(telemetrystore.Config{}, ®exMatcher{})
|
||||
mock := mockTelemetryStore.Mock()
|
||||
|
||||
metadata := NewTelemetryMetaStore(
|
||||
instrumentationtest.New().ToProviderSettings(),
|
||||
mockTelemetryStore,
|
||||
telemetrytraces.DBName,
|
||||
telemetrytraces.TagAttributesV2TableName,
|
||||
telemetrytraces.SpanAttributesKeysTblName,
|
||||
telemetrytraces.SpanIndexV3TableName,
|
||||
telemetrymetrics.DBName,
|
||||
telemetrymetrics.AttributesMetadataTableName,
|
||||
telemetrymeter.DBName,
|
||||
telemetrymeter.SamplesAgg1dTableName,
|
||||
telemetrylogs.DBName,
|
||||
telemetrylogs.LogsV2TableName,
|
||||
telemetrylogs.TagAttributesV2TableName,
|
||||
telemetrylogs.LogAttributeKeysTblName,
|
||||
telemetrylogs.LogResourceKeysTblName,
|
||||
DBName,
|
||||
AttributesMetadataLocalTableName,
|
||||
)
|
||||
|
||||
lookupKeys := []telemetrytypes.MetricMetadataLookupKey{
|
||||
{
|
||||
MetricName: "metric1",
|
||||
AttributeName: "attr1",
|
||||
AttributeValue: "val1",
|
||||
},
|
||||
{
|
||||
MetricName: "metric2",
|
||||
AttributeName: "attr2",
|
||||
AttributeValue: "val2",
|
||||
},
|
||||
}
|
||||
|
||||
// ClickHouse tuple syntax is (x, y, z)
|
||||
// the structure should lead to:
|
||||
// SELECT ... WHERE (metric_name, attr_name, attr_string_value) IN ((?, ?, ?), (?, ?, ?)) ...
|
||||
|
||||
expectedQuery := `SELECT metric_name, attr_name, attr_string_value, min\(first_reported_unix_milli\) AS first_seen FROM signoz_metrics.distributed_metadata WHERE \(metric_name, attr_name, attr_string_value\) IN \(\(\?, \?, \?\), \(\?, \?, \?\)\) GROUP BY metric_name, attr_name, attr_string_value ORDER BY first_seen`
|
||||
|
||||
// Note: regexMatcher uses regexp.MatchString, so we escape parens and ?
|
||||
|
||||
mock.ExpectQuery(expectedQuery).
|
||||
WithArgs("metric1", "attr1", "val1", "metric2", "attr2", "val2").
|
||||
WillReturnRows(cmock.NewRows([]cmock.ColumnType{
|
||||
{Name: "metric_name", Type: "String"},
|
||||
{Name: "attr_name", Type: "String"},
|
||||
{Name: "attr_string_value", Type: "String"},
|
||||
{Name: "first_seen", Type: "UInt64"},
|
||||
}, [][]any{
|
||||
{"metric1", "attr1", "val1", uint64(1000)},
|
||||
{"metric2", "attr2", "val2", uint64(2000)},
|
||||
}))
|
||||
|
||||
result, err := metadata.GetFirstSeenFromMetricMetadata(context.Background(), lookupKeys)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, int64(1000), result[lookupKeys[0]])
|
||||
assert.Equal(t, int64(2000), result[lookupKeys[1]])
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("there were unfulfilled expectations: %s", err)
|
||||
}
|
||||
}
|
||||
@@ -70,6 +70,8 @@ type NotificationSettings struct {
|
||||
GroupBy []string `json:"groupBy,omitempty"`
|
||||
Renotify Renotify `json:"renotify,omitempty"`
|
||||
UsePolicy bool `json:"usePolicy,omitempty"`
|
||||
// NewGroupEvalDelay is the grace period for new series to be excluded from alerts evaluation
|
||||
NewGroupEvalDelay *Duration `json:"newGroupEvalDelay,omitempty"`
|
||||
}
|
||||
|
||||
type Renotify struct {
|
||||
|
||||
@@ -40,4 +40,13 @@ type MetadataStore interface {
|
||||
|
||||
// PromotePaths promotes the paths.
|
||||
PromotePaths(ctx context.Context, paths ...string) error
|
||||
|
||||
// GetFirstSeenFromMetricMetadata gets the first seen timestamp for a metric metadata lookup key.
|
||||
GetFirstSeenFromMetricMetadata(ctx context.Context, lookupKeys []MetricMetadataLookupKey) (map[MetricMetadataLookupKey]int64, error)
|
||||
}
|
||||
|
||||
type MetricMetadataLookupKey struct {
|
||||
MetricName string
|
||||
AttributeName string
|
||||
AttributeValue string
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ type MockMetadataStore struct {
|
||||
TemporalityMap map[string]metrictypes.Temporality
|
||||
PromotedPathsMap map[string]struct{}
|
||||
LogsJSONIndexesMap map[string][]schemamigrator.Index
|
||||
LookupKeysMap map[telemetrytypes.MetricMetadataLookupKey]int64
|
||||
}
|
||||
|
||||
// NewMockMetadataStore creates a new instance of MockMetadataStore with initialized maps
|
||||
@@ -29,6 +30,7 @@ func NewMockMetadataStore() *MockMetadataStore {
|
||||
TemporalityMap: make(map[string]metrictypes.Temporality),
|
||||
PromotedPathsMap: make(map[string]struct{}),
|
||||
LogsJSONIndexesMap: make(map[string][]schemamigrator.Index),
|
||||
LookupKeysMap: make(map[telemetrytypes.MetricMetadataLookupKey]int64),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -307,3 +309,13 @@ func (m *MockMetadataStore) ListPromotedPaths(ctx context.Context, paths ...stri
|
||||
func (m *MockMetadataStore) ListLogsJSONIndexes(ctx context.Context, filters ...string) (map[string][]schemamigrator.Index, error) {
|
||||
return m.LogsJSONIndexesMap, nil
|
||||
}
|
||||
|
||||
func (m *MockMetadataStore) GetFirstSeenFromMetricMetadata(ctx context.Context, lookupKeys []telemetrytypes.MetricMetadataLookupKey) (map[telemetrytypes.MetricMetadataLookupKey]int64, error) {
|
||||
return m.LookupKeysMap, nil
|
||||
}
|
||||
|
||||
func (m *MockMetadataStore) SetFirstSeenFromMetricMetadata(firstSeenMap map[telemetrytypes.MetricMetadataLookupKey]int64) {
|
||||
for key, value := range firstSeenMap {
|
||||
m.LookupKeysMap[key] = value
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user