Compare commits

..

1 Commits

Author SHA1 Message Date
nityanandagohain
aa140b3456 feat: trace based filters for logs, supporting aggregations as well 2026-05-21 18:47:25 +05:30
17 changed files with 701 additions and 680 deletions

View File

@@ -129,8 +129,6 @@ components:
type: string
schedule:
$ref: '#/components/schemas/AlertmanagertypesSchedule'
scope:
type: string
status:
$ref: '#/components/schemas/AlertmanagertypesMaintenanceStatus'
updatedAt:
@@ -274,8 +272,6 @@ components:
type: string
schedule:
$ref: '#/components/schemas/AlertmanagertypesSchedule'
scope:
type: string
required:
- name
- schedule

View File

@@ -225,10 +225,6 @@ export interface AlertmanagertypesPlannedMaintenanceDTO {
*/
name: string;
schedule: AlertmanagertypesScheduleDTO;
/**
* @type string
*/
scope?: string;
status: AlertmanagertypesMaintenanceStatusDTO;
/**
* @type string
@@ -1718,10 +1714,6 @@ export interface AlertmanagertypesPostablePlannedMaintenanceDTO {
*/
name: string;
schedule: AlertmanagertypesScheduleDTO;
/**
* @type string
*/
scope?: string;
}
export interface AlertmanagertypesPostableRoutePolicyDTO {

View File

@@ -1,5 +1,5 @@
import React, { useCallback, useEffect, useMemo, useState } from 'react';
import { Check, Info } from '@signozhq/icons';
import { Check } from '@signozhq/icons';
import {
Button,
DatePicker,
@@ -11,7 +11,6 @@ import {
Select,
SelectProps,
Spin,
Tooltip,
} from 'antd';
import { Typography } from '@signozhq/ui/typography';
import type { DefaultOptionType } from 'antd/es/select';
@@ -79,7 +78,6 @@ interface PlannedDowntimeFormData {
alertRules: DefaultOptionType[];
recurrenceSelect?: AlertmanagertypesRecurrenceDTO;
timezone?: string;
scope?: string;
}
const customFormat = DATE_TIME_FORMATS.ORDINAL_DATETIME;
@@ -146,7 +144,6 @@ export function PlannedDowntimeForm(
.map((alert) => alert.value)
.filter((alert) => alert !== undefined) as string[],
name: values.name,
scope: values.scope,
schedule: {
startTime: values.startTime?.format(),
endTime: values.endTime?.format(),
@@ -281,7 +278,6 @@ export function PlannedDowntimeForm(
duration: getDurationInfo(schedule?.recurrence?.duration)?.value ?? '',
} as AlertmanagertypesRecurrenceDTO,
timezone: schedule?.timezone as string,
scope: initialValues.scope || '',
};
}, [initialValues, alertOptions]);
@@ -315,7 +311,7 @@ export function PlannedDowntimeForm(
default:
return `Scheduled for ${formattedStartDate} starting at ${formattedStartTime}.`;
}
}, [formData, recurrenceType]);
}, [formData, recurrenceType, timezone]);
const endTimeText = useMemo((): string => {
const endTime = formData.endTime;
@@ -326,7 +322,7 @@ export function PlannedDowntimeForm(
const formattedEndTime = endTime.format(TIME_FORMAT);
const formattedEndDate = endTime.format(DATE_FORMAT);
return `Scheduled to end maintenance on ${formattedEndDate} at ${formattedEndTime}.`;
}, [formData, recurrenceType]);
}, [formData, recurrenceType, timezone]);
return (
<Modal
@@ -492,36 +488,6 @@ export function PlannedDowntimeForm(
</Select>
</Form.Item>
</div>
<Form.Item
label={
<span>
Scope&nbsp;
<Tooltip
mouseLeaveDelay={0.3}
title={
<span>
Scope the planned downtime by alert labels.{' '}
<a
href="https://signoz.io/docs/alerts-management/planned-maintenance/#scoping-with-label-expressions"
target="_blank"
rel="noopener noreferrer"
>
Learn more
</a>
</span>
}
>
<Info size={13} />
</Tooltip>
</span>
}
name="scope"
>
<Input.TextArea
placeholder='e.g. env = "prod" AND region = "us-east-1"'
autoSize={{ minRows: 2, maxRows: 4 }}
/>
</Form.Item>
<Form.Item style={{ marginBottom: 0 }}>
<ModalButtonWrapper>
<Button

View File

@@ -42,7 +42,7 @@ func (m *MaintenanceMuter) Mutes(ctx context.Context, lset model.LabelSet) bool
}
now := time.Now()
for _, mw := range m.getMaintenances(ctx) {
if mw.ShouldSkip(ruleID, now, lset) {
if mw.ShouldSkip(ruleID, now) {
return true
}
}
@@ -61,7 +61,7 @@ func (m *MaintenanceMuter) MutedBy(ctx context.Context, lset model.LabelSet) []s
var ids []string
now := time.Now()
for _, mw := range m.getMaintenances(ctx) {
if mw.ShouldSkip(ruleID, now, lset) {
if mw.ShouldSkip(ruleID, now) {
ids = append(ids, mw.ID.String())
}
}

View File

@@ -87,25 +87,18 @@ func TestEndToEndAlertManagerFlow(t *testing.T) {
err = notificationManager.SetNotificationConfig(orgID, "high-cpu-usage", &notifConfig)
require.NoError(t, err)
activeSchedule := &alertmanagertypes.Schedule{
Timezone: "UTC",
StartTime: time.Now().Add(-time.Hour),
EndTime: time.Now().Add(time.Hour),
}
// mwRuleIDAndScope: only critical high-cpu-usage alerts.
mwRuleIDAndScope := valuer.GenerateUUID()
// mwRuleIDOnly: all high-cpu-usage alerts regardless of severity.
mwRuleIDOnly := valuer.GenerateUUID()
// mwScopeOnly: all critical alerts regardless of rule ID.
mwScopeOnly := valuer.GenerateUUID()
mwID := valuer.GenerateUUID()
maintenanceStore := alertmanagertypestest.NewMockMaintenanceStore(t)
maintenanceStore.On("ListPlannedMaintenance", mock.Anything, orgID).Return(
[]*alertmanagertypes.PlannedMaintenance{
{ID: mwRuleIDAndScope, Schedule: activeSchedule, RuleIDs: []string{"high-cpu-usage"}, Scope: `severity == "critical"`},
{ID: mwRuleIDOnly, Schedule: activeSchedule, RuleIDs: []string{"high-cpu-usage"}},
{ID: mwScopeOnly, Schedule: activeSchedule, Scope: `severity == "critical"`},
}, nil,
[]*alertmanagertypes.PlannedMaintenance{{
ID: mwID,
Schedule: &alertmanagertypes.Schedule{
Timezone: "UTC",
StartTime: time.Now().Add(-time.Hour),
EndTime: time.Now().Add(time.Hour),
},
RuleIDs: []string{"high-cpu-usage"},
}}, nil,
)
srvCfg := NewConfig()
@@ -256,42 +249,18 @@ func TestEndToEndAlertManagerFlow(t *testing.T) {
require.Equal(t, "{__receiver__=\"webhook\"}:{cluster=\"prod-cluster\", instance=\"server-03\", ruleId=\"high-cpu-usage\"}", alertGroups[2].GroupKey)
})
req, err := http.NewRequest(http.MethodGet, "/alerts", nil)
require.NoError(t, err)
params, err := alertmanagertypes.NewGettableAlertsParams(req)
require.NoError(t, err)
alerts, err := server.GetAlerts(ctx, params)
require.NoError(t, err)
t.Run("verify_muting_ruleid_and_scope", func(t *testing.T) {
// Window with ruleID + scope mutes only alerts matching both.
for _, alert := range alerts {
if alert.Labels["ruleId"] == "high-cpu-usage" && alert.Labels["severity"] == "critical" {
require.Contains(t, alert.Status.MutedBy, mwRuleIDAndScope.String())
} else {
require.NotContains(t, alert.Status.MutedBy, mwRuleIDAndScope.String())
}
}
})
t.Run("verify_muting_ruleid_only", func(t *testing.T) {
// Window with ruleID but no scope mutes all severities for that rule.
t.Run("verify_muting", func(t *testing.T) {
req, err := http.NewRequest(http.MethodGet, "/alerts", nil)
require.NoError(t, err)
params, err := alertmanagertypes.NewGettableAlertsParams(req)
require.NoError(t, err)
alerts, err := server.GetAlerts(ctx, params)
require.NoError(t, err)
for _, alert := range alerts {
if alert.Labels["ruleId"] == "high-cpu-usage" {
require.Contains(t, alert.Status.MutedBy, mwRuleIDOnly.String())
require.Equal(t, []string{mwID.String()}, alert.Status.MutedBy)
} else {
require.NotContains(t, alert.Status.MutedBy, mwRuleIDOnly.String())
}
}
})
t.Run("verify_muting_scope_only", func(t *testing.T) {
// Window with scope but no ruleIDs mutes all critical alerts regardless of rule.
for _, alert := range alerts {
if alert.Labels["severity"] == "critical" {
require.Contains(t, alert.Status.MutedBy, mwScopeOnly.String())
} else {
require.NotContains(t, alert.Status.MutedBy, mwScopeOnly.String())
require.Empty(t, alert.Status.MutedBy)
}
}
})

View File

@@ -89,7 +89,6 @@ func (r *maintenance) CreatePlannedMaintenance(ctx context.Context, maintenance
Description: maintenance.Description,
Schedule: maintenance.Schedule,
OrgID: claims.OrgID,
Scope: maintenance.Scope,
}
maintenanceRules := make([]*alertmanagertypes.StorablePlannedMaintenanceRule, 0)
@@ -124,6 +123,7 @@ func (r *maintenance) CreatePlannedMaintenance(ctx context.Context, maintenance
NewInsert().
Model(&maintenanceRules).
Exec(ctx)
if err != nil {
return err
}
@@ -141,7 +141,6 @@ func (r *maintenance) CreatePlannedMaintenance(ctx context.Context, maintenance
Description: storablePlannedMaintenance.Description,
Schedule: storablePlannedMaintenance.Schedule,
RuleIDs: maintenance.AlertIds,
Scope: maintenance.Scope,
CreatedAt: storablePlannedMaintenance.CreatedAt,
CreatedBy: storablePlannedMaintenance.CreatedBy,
UpdatedAt: storablePlannedMaintenance.UpdatedAt,
@@ -190,7 +189,6 @@ func (r *maintenance) UpdatePlannedMaintenance(ctx context.Context, maintenance
Description: maintenance.Description,
Schedule: maintenance.Schedule,
OrgID: claims.OrgID,
Scope: maintenance.Scope,
}
storablePlannedMaintenanceRules := make([]*alertmanagertypes.StorablePlannedMaintenanceRule, 0)
@@ -226,6 +224,7 @@ func (r *maintenance) UpdatePlannedMaintenance(ctx context.Context, maintenance
Model(new(alertmanagertypes.StorablePlannedMaintenanceRule)).
Where("planned_maintenance_id = ?", storablePlannedMaintenance.ID.StringValue()).
Exec(ctx)
if err != nil {
return err
}
@@ -242,6 +241,7 @@ func (r *maintenance) UpdatePlannedMaintenance(ctx context.Context, maintenance
}
return nil
})
if err != nil {
return err

View File

@@ -235,20 +235,66 @@ func (r *provider) Match(ctx context.Context, orgID string, ruleID string, set m
return matchedChannels, nil
}
// convertLabelSetToEnv delegates to alertmanagertypes.ConvertLabelSetToEnv and
// logs when a key is a prefix of another (e.g. "foo" alongside "foo.bar").
// convertLabelSetToEnv converts a flat label set with dotted keys into a nested map structure for expr env.
// when both a leaf and a deeper nested path exist (e.g. "foo" and "foo.bar"),
// the nested structure takes precedence. That means we will replace an existing leaf at any
// intermediate path with a map so we can materialize the deeper structure.
// TODO(srikanthccv): we need a better solution to handle this, remove the following
// when we update the expr to support dotted keys.
func (r *provider) convertLabelSetToEnv(ctx context.Context, labelSet model.LabelSet) map[string]interface{} {
outer:
for lk := range labelSet {
prefix := string(lk) + "."
for lk2 := range labelSet {
if strings.HasPrefix(string(lk2), prefix) {
r.settings.Logger().InfoContext(ctx, "found label set with conflicting prefix dotted keys", slog.Any("labels", labelSet))
break outer
env := make(map[string]interface{})
logForReview := false
for lk, lv := range labelSet {
key := strings.TrimSpace(string(lk))
value := string(lv)
if strings.Contains(key, ".") {
parts := strings.Split(key, ".")
current := env
for i, raw := range parts {
part := strings.TrimSpace(raw)
last := i == len(parts)-1
if last {
if _, isMap := current[part].(map[string]interface{}); isMap {
logForReview = true
// deeper structure already exists; do not overwrite.
break
}
current[part] = value
break
}
// ensure a map so we can keep descending.
if nextMap, ok := current[part].(map[string]interface{}); ok {
current = nextMap
continue
}
// if absent or a leaf, replace it with a map.
newMap := make(map[string]interface{})
current[part] = newMap
current = newMap
}
continue
}
// if a map already sits here (due to nested keys), keep the map (nested wins).
if _, isMap := env[key].(map[string]interface{}); isMap {
logForReview = true
continue
}
env[key] = value
}
return alertmanagertypes.ConvertLabelSetToEnv(labelSet)
if logForReview {
r.settings.Logger().InfoContext(ctx, "found label set with conflicting prefix dotted keys", slog.Any("labels", labelSet))
}
return env
}
func (r *provider) evaluateExpr(ctx context.Context, expression string, labelSet model.LabelSet) (bool, error) {

View File

@@ -925,3 +925,72 @@ func TestProvider_CreateRoutes(t *testing.T) {
})
}
}
func TestConvertLabelSetToEnv(t *testing.T) {
tests := []struct {
name string
labelSet model.LabelSet
expected map[string]interface{}
}{
{
name: "simple keys",
labelSet: model.LabelSet{
"key1": "value1",
"key2": "value2",
},
expected: map[string]interface{}{
"key1": "value1",
"key2": "value2",
},
},
{
name: "nested keys",
labelSet: model.LabelSet{
"foo.bar": "value1",
"foo.baz": "value2",
},
expected: map[string]interface{}{
"foo": map[string]interface{}{
"bar": "value1",
"baz": "value2",
},
},
},
{
name: "conflict - nested structure wins",
labelSet: model.LabelSet{
"foo.bar.baz": "deep",
"foo.bar": "shallow",
},
expected: map[string]interface{}{
"foo": map[string]interface{}{
"bar": map[string]interface{}{
"baz": "deep",
},
},
},
},
{
name: "conflict - leaf value vs nested",
labelSet: model.LabelSet{
"foo.bar": "value",
"foo": "should_be_ignored",
},
expected: map[string]interface{}{
"foo": map[string]interface{}{
"bar": "value",
},
},
},
}
provider := &provider{
settings: factory.NewScopedProviderSettings(createTestProviderSettings(), "provider_test"),
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := provider.convertLabelSetToEnv(context.Background(), tt.labelSet)
assert.Equal(t, tt.expected, result)
})
}
}

View File

@@ -2,8 +2,6 @@ package envprovider
import (
"context"
"os"
"strings"
"testing"
"github.com/SigNoz/signoz/pkg/config"
@@ -11,21 +9,7 @@ import (
"github.com/stretchr/testify/require"
)
// clearSignozEnv unsets all existing SIGNOZ_* env vars for the duration of the test.
func clearSignozEnv(t *testing.T) {
t.Helper()
for _, kv := range os.Environ() {
if strings.HasPrefix(kv, prefix) {
key := strings.SplitN(kv, "=", 2)[0]
orig, _ := os.LookupEnv(key)
os.Unsetenv(key)
t.Cleanup(func() { os.Setenv(key, orig) })
}
}
}
func TestGetWithStrings(t *testing.T) {
clearSignozEnv(t)
t.Setenv("SIGNOZ_K1_K2", "string")
t.Setenv("SIGNOZ_K3__K4", "string")
t.Setenv("SIGNOZ_K5__K6_K7__K8", "string")
@@ -47,7 +31,6 @@ func TestGetWithStrings(t *testing.T) {
}
func TestGetWithNoPrefix(t *testing.T) {
clearSignozEnv(t)
t.Setenv("K1_K2", "string")
t.Setenv("K3_K4", "string")
expected := map[string]any{}
@@ -60,7 +43,6 @@ func TestGetWithNoPrefix(t *testing.T) {
}
func TestGetWithGoTypes(t *testing.T) {
clearSignozEnv(t)
t.Setenv("SIGNOZ_BOOL", "true")
t.Setenv("SIGNOZ_STRING", "string")
t.Setenv("SIGNOZ_INT", "1")

View File

@@ -199,7 +199,16 @@ func (q *builderQuery[T]) Execute(ctx context.Context) (*qbtypes.Result, error)
return q.executeWindowList(ctx)
}
stmt, err := q.stmtBuilder.Build(ctx, q.fromMS, q.toMS, q.kind, q.spec, q.variables)
fromMS, toMS := q.fromMS, q.toMS
if q.spec.Signal == telemetrytypes.SignalTraces || q.spec.Signal == telemetrytypes.SignalLogs {
var overlap bool
fromMS, toMS, overlap = q.narrowWindowByTraceID(ctx, fromMS, toMS)
if !overlap {
return emptyResultFor(q.kind, q.spec.Name), nil
}
}
stmt, err := q.stmtBuilder.Build(ctx, fromMS, toMS, q.kind, q.spec, q.variables)
if err != nil {
return nil, err
}
@@ -215,6 +224,81 @@ func (q *builderQuery[T]) Execute(ctx context.Context) (*qbtypes.Result, error)
return result, nil
}
// narrowWindowByTraceID inspects the filter for trace_id predicates and clamps
// [fromMS,toMS] to the time range stored in signoz_traces.distributed_trace_summary.
// Returns the (possibly narrowed) window and overlap=false when the trace lies
// completely outside the query window — callers should short-circuit in that case.
//
// When the trace_id is not present in trace_summary the behaviour differs by
// signal:
// - traces: trace_summary is derived from the spans table, so a missing row
// means no spans exist for that trace_id; we short-circuit to empty.
// - logs: logs can carry a trace_id even when traces are not ingested at all
// (e.g. traces disabled). We must not short-circuit; instead leave the
// window untouched and let the query run.
func (q *builderQuery[T]) narrowWindowByTraceID(ctx context.Context, fromMS, toMS uint64) (uint64, uint64, bool) {
if q.spec.Filter == nil || q.spec.Filter.Expression == "" {
return fromMS, toMS, true
}
traceIDs, found := telemetrytraces.ExtractTraceIDsFromFilter(q.spec.Filter.Expression)
if !found || len(traceIDs) == 0 {
return fromMS, toMS, true
}
finder := telemetrytraces.NewTraceTimeRangeFinder(q.telemetryStore)
traceStart, traceEnd, ok := finder.GetTraceTimeRangeMulti(ctx, traceIDs)
if !ok {
if q.spec.Signal == telemetrytypes.SignalTraces {
q.logger.DebugContext(ctx, "trace_id not found in trace_summary; short-circuiting traces query to empty",
slog.Any("trace_ids", traceIDs))
return fromMS, toMS, false
}
q.logger.DebugContext(ctx, "trace_id not found in trace_summary; leaving time range untouched for logs",
slog.Any("trace_ids", traceIDs))
return fromMS, toMS, true
}
traceStartMS := uint64(traceStart) / 1_000_000
traceEndMS := uint64(traceEnd) / 1_000_000
if traceStartMS == 0 || traceEndMS == 0 {
return fromMS, toMS, true
}
if traceStartMS > toMS || traceEndMS < fromMS {
return fromMS, toMS, false
}
if traceStartMS > fromMS {
fromMS = traceStartMS
}
if traceEndMS < toMS {
toMS = traceEndMS
}
q.logger.DebugContext(ctx, "optimized time range using trace_id lookup",
slog.String("signal", q.spec.Signal.StringValue()),
slog.Any("trace_ids", traceIDs),
slog.Uint64("start", fromMS),
slog.Uint64("end", toMS))
return fromMS, toMS, true
}
// emptyResultFor returns an empty result payload appropriate for the given kind.
func emptyResultFor(kind qbtypes.RequestType, queryName string) *qbtypes.Result {
var value any
switch kind {
case qbtypes.RequestTypeTimeSeries:
value = &qbtypes.TimeSeriesData{QueryName: queryName}
case qbtypes.RequestTypeScalar:
value = &qbtypes.ScalarData{QueryName: queryName}
default:
value = &qbtypes.RawData{QueryName: queryName}
}
return &qbtypes.Result{
Type: kind,
Value: value,
}
}
// executeWithContext executes the query with query window and step context for partial value detection.
func (q *builderQuery[T]) executeWithContext(ctx context.Context, query string, args []any) (*qbtypes.Result, error) {
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
@@ -310,42 +394,22 @@ func (q *builderQuery[T]) executeWindowList(ctx context.Context) (*qbtypes.Resul
totalBytes := uint64(0)
start := time.Now()
// Check if filter contains trace_id(s) and optimize time range if needed
if q.spec.Signal == telemetrytypes.SignalTraces &&
q.spec.Filter != nil && q.spec.Filter.Expression != "" {
traceIDs, found := telemetrytraces.ExtractTraceIDsFromFilter(q.spec.Filter.Expression)
if found && len(traceIDs) > 0 {
finder := telemetrytraces.NewTraceTimeRangeFinder(q.telemetryStore)
traceStart, traceEnd, ok := finder.GetTraceTimeRangeMulti(ctx, traceIDs)
traceStartMS := uint64(traceStart) / 1_000_000
traceEndMS := uint64(traceEnd) / 1_000_000
if !ok {
q.logger.DebugContext(ctx, "failed to get trace time range", slog.Any("trace_ids", traceIDs))
} else if traceStartMS > 0 && traceEndMS > 0 {
// no overlap — nothing to return
if uint64(traceStartMS) > toMS || uint64(traceEndMS) < fromMS {
return &qbtypes.Result{
Type: qbtypes.RequestTypeRaw,
Value: &qbtypes.RawData{
QueryName: q.spec.Name,
},
Stats: qbtypes.ExecStats{
DurationMS: uint64(time.Since(start).Milliseconds()),
},
}, nil
}
// clamp window to trace time range before bucketing
if uint64(traceStartMS) > fromMS {
fromMS = uint64(traceStartMS)
}
if uint64(traceEndMS) < toMS {
toMS = uint64(traceEndMS)
}
q.logger.DebugContext(ctx, "optimized time range for traces", slog.Any("trace_ids", traceIDs), slog.Uint64("start", fromMS), slog.Uint64("end", toMS))
}
// Check if filter contains trace_id(s) and optimize time range if needed.
// Applies to both traces (the listing this branch was built for) and logs
// (which carry trace_id and benefit from the same clamp before bucketing).
if q.spec.Signal == telemetrytypes.SignalTraces || q.spec.Signal == telemetrytypes.SignalLogs {
var overlap bool
fromMS, toMS, overlap = q.narrowWindowByTraceID(ctx, fromMS, toMS)
if !overlap {
return &qbtypes.Result{
Type: qbtypes.RequestTypeRaw,
Value: &qbtypes.RawData{
QueryName: q.spec.Name,
},
Stats: qbtypes.ExecStats{
DurationMS: uint64(time.Since(start).Milliseconds()),
},
}, nil
}
}

View File

@@ -204,7 +204,6 @@ func NewSQLMigrationProviderFactories(
sqlmigration.NewAddTagsFactory(sqlstore, sqlschema),
sqlmigration.NewAddRoleCRUDTuplesFactory(sqlstore),
sqlmigration.NewAddIntegrationDashboardFactory(sqlstore, sqlschema),
sqlmigration.NewAddScopeToPlannedMaintenanceFactory(sqlstore, sqlschema),
sqlmigration.NewAddSourceToDashboardFactory(sqlstore, sqlschema),
)
}

View File

@@ -1,97 +0,0 @@
package sqlmigration
import (
"context"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/sqlschema"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
)
type addScopeToPlannedMaintenance struct {
sqlstore sqlstore.SQLStore
sqlschema sqlschema.SQLSchema
}
func NewAddScopeToPlannedMaintenanceFactory(sqlstore sqlstore.SQLStore, sqlschema sqlschema.SQLSchema) factory.ProviderFactory[SQLMigration, Config] {
return factory.NewProviderFactory(
factory.MustNewName("add_scope_to_planned"),
func(ctx context.Context, ps factory.ProviderSettings, c Config) (SQLMigration, error) {
return &addScopeToPlannedMaintenance{
sqlstore: sqlstore,
sqlschema: sqlschema,
}, nil
},
)
}
func (migration *addScopeToPlannedMaintenance) Register(migrations *migrate.Migrations) error {
if err := migrations.Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *addScopeToPlannedMaintenance) Up(ctx context.Context, db *bun.DB) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer func() {
_ = tx.Rollback()
}()
table, _, err := migration.sqlschema.GetTable(ctx, "planned_maintenance")
if err != nil {
return err
}
column := &sqlschema.Column{
Name: sqlschema.ColumnName("scope"),
DataType: sqlschema.DataTypeText,
Nullable: true,
}
sqls := migration.sqlschema.Operator().AddColumn(table, nil, column, nil)
for _, sql := range sqls {
if _, err := tx.ExecContext(ctx, string(sql)); err != nil {
return err
}
}
return tx.Commit()
}
func (migration *addScopeToPlannedMaintenance) Down(ctx context.Context, db *bun.DB) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer func() {
_ = tx.Rollback()
}()
table, _, err := migration.sqlschema.GetTable(ctx, "planned_maintenance")
if err != nil {
return err
}
column := &sqlschema.Column{
Name: sqlschema.ColumnName("scope"),
DataType: sqlschema.DataTypeText,
Nullable: true,
}
sqls := migration.sqlschema.Operator().DropColumn(table, column)
for _, sql := range sqls {
if _, err := tx.ExecContext(ctx, string(sql)); err != nil {
return err
}
}
return tx.Commit()
}

View File

@@ -3,16 +3,12 @@ package alertmanagertypes
import (
"context"
"encoding/json"
"strings"
"time"
"github.com/expr-lang/expr"
"github.com/prometheus/common/model"
"github.com/uptrace/bun"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun"
)
var ErrCodeInvalidPlannedMaintenancePayload = errors.MustNewCode("invalid_planned_maintenance_payload")
@@ -62,7 +58,6 @@ type StorablePlannedMaintenance struct {
Description string `bun:"description,type:text"`
Schedule *Schedule `bun:"schedule,type:text,notnull"`
OrgID string `bun:"org_id,type:text"`
Scope string `bun:"scope,type:text"`
}
type PlannedMaintenance struct {
@@ -71,7 +66,6 @@ type PlannedMaintenance struct {
Description string `json:"description"`
Schedule *Schedule `json:"schedule" required:"true"`
RuleIDs []string `json:"alertIds"`
Scope string `json:"scope,omitempty"`
CreatedAt time.Time `json:"createdAt"`
CreatedBy string `json:"createdBy"`
UpdatedAt time.Time `json:"updatedAt"`
@@ -88,7 +82,6 @@ type PostablePlannedMaintenance struct {
Description string `json:"description"`
Schedule *Schedule `json:"schedule" required:"true"`
AlertIds []string `json:"alertIds"`
Scope string `json:"scope"`
}
func (p *PostablePlannedMaintenance) Validate() error {
@@ -123,11 +116,6 @@ func (p *PostablePlannedMaintenance) Validate() error {
return errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidPlannedMaintenancePayload, "end time cannot be before start time")
}
}
if p.Scope != "" {
if _, err := expr.Compile(p.Scope, expr.AllowUndefinedVariables(), expr.AsBool()); err != nil {
return errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidPlannedMaintenancePayload, "invalid scope: %v", err)
}
}
return nil
}
@@ -163,7 +151,7 @@ func (m *PlannedMaintenance) HasScheduleRecurrenceBoundsMismatch() bool {
(recurrence.EndTime != nil && !recurrence.EndTime.Equal(m.Schedule.EndTime))
}
func (m *PlannedMaintenance) ShouldSkip(ruleID string, now time.Time, lset model.LabelSet) bool {
func (m *PlannedMaintenance) ShouldSkip(ruleID string, now time.Time) bool {
// Check if the alert ID is in the maintenance window
found := false
if len(m.RuleIDs) > 0 {
@@ -183,23 +171,6 @@ func (m *PlannedMaintenance) ShouldSkip(ruleID string, now time.Time, lset model
return false
}
if !m.isScheduleActive(now) {
return false
}
// lset is empty when called from IsActive (no instance labels available);
// skip expression filtering in that case.
if m.Scope != "" && len(lset) != 0 {
if !evalScopeExpression(m.Scope, lset) {
return false
}
}
return true
}
// isScheduleActive reports whether now falls inside the maintenance window's schedule.
func (m *PlannedMaintenance) isScheduleActive(now time.Time) bool {
// If alert is found, we check if it should be skipped based on the schedule
loc, err := time.LoadLocation(m.Schedule.Timezone)
if err != nil {
@@ -249,59 +220,6 @@ func (m *PlannedMaintenance) isScheduleActive(now time.Time) bool {
return false
}
// ConvertLabelSetToEnv converts a label set into a map suitable for use as an
// expr environment. Dotted keys (e.g. "kubernetes.node") are expanded into
// nested maps so that expr can resolve them without panicking. When a dotted
// path conflicts with a plain key, the nested structure takes precedence.
func ConvertLabelSetToEnv(lset model.LabelSet) map[string]any {
env := map[string]any{}
for lk, lv := range lset {
key := strings.TrimSpace(string(lk))
value := string(lv)
if strings.Contains(key, ".") {
parts := strings.Split(key, ".")
current := env
for i, raw := range parts {
part := strings.TrimSpace(raw)
if i == len(parts)-1 {
if _, isMap := current[part].(map[string]any); !isMap {
current[part] = value
}
break
}
if nextMap, ok := current[part].(map[string]any); ok {
current = nextMap
} else {
newMap := map[string]any{}
current[part] = newMap
current = newMap
}
}
continue
}
if _, isMap := env[key].(map[string]any); !isMap {
env[key] = value
}
}
return env
}
// evalScopeExpression compiles and runs the expression against the provided labels.
// Returns false on any error (safety-first: don't suppress on a bad expression).
func evalScopeExpression(expression string, lset model.LabelSet) bool {
env := ConvertLabelSetToEnv(lset)
program, err := expr.Compile(expression, expr.Env(env), expr.AllowUndefinedVariables())
if err != nil {
return false
}
output, err := expr.Run(program, env)
if err != nil {
return false
}
result, ok := output.(bool)
return ok && result
}
// checkDaily rebases the recurrence start to today (or yesterday if needed)
// and returns true if currentTime is within [candidate, candidate+Duration].
func (m *PlannedMaintenance) checkDaily(currentTime time.Time, rec *Recurrence, loc *time.Location) bool {
@@ -388,7 +306,7 @@ func (m *PlannedMaintenance) IsActive(now time.Time) bool {
if len(m.RuleIDs) > 0 {
ruleID = (m.RuleIDs)[0]
}
return m.ShouldSkip(ruleID, now, nil)
return m.ShouldSkip(ruleID, now)
}
func (m *PlannedMaintenance) IsUpcoming() bool {
@@ -471,7 +389,6 @@ func (m PlannedMaintenance) MarshalJSON() ([]byte, error) {
Description string `json:"description" db:"description"`
Schedule *Schedule `json:"schedule" db:"schedule"`
AlertIds []string `json:"alertIds" db:"alert_ids"`
Scope string `json:"scope,omitempty" db:"scope"`
CreatedAt time.Time `json:"createdAt" db:"created_at"`
CreatedBy string `json:"createdBy" db:"created_by"`
UpdatedAt time.Time `json:"updatedAt" db:"updated_at"`
@@ -484,7 +401,6 @@ func (m PlannedMaintenance) MarshalJSON() ([]byte, error) {
Description: m.Description,
Schedule: m.Schedule,
AlertIds: m.RuleIDs,
Scope: m.Scope,
CreatedAt: m.CreatedAt,
CreatedBy: m.CreatedBy,
UpdatedAt: m.UpdatedAt,
@@ -508,7 +424,6 @@ func (m *PlannedMaintenanceWithRules) ToPlannedMaintenance() *PlannedMaintenance
Description: m.Description,
Schedule: m.Schedule,
RuleIDs: ruleIDs,
Scope: m.Scope,
CreatedAt: m.CreatedAt,
UpdatedAt: m.UpdatedAt,
CreatedBy: m.CreatedBy,

View File

@@ -1,12 +1,10 @@
package alertmanagertypes
import (
"reflect"
"testing"
"time"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/prometheus/common/model"
)
// Helper function to create a time pointer.
@@ -670,330 +668,9 @@ func TestShouldSkipMaintenance(t *testing.T) {
}
for idx, c := range cases {
result := c.maintenance.ShouldSkip(c.name, c.ts, model.LabelSet{})
result := c.maintenance.ShouldSkip(c.name, c.ts)
if result != c.skip {
t.Errorf("skip %v, got %v, case:%d - %s", c.skip, result, idx, c.name)
}
}
}
func TestShouldSkip_Scope(t *testing.T) {
activeSchedule := func() *Schedule {
return &Schedule{
Timezone: "UTC",
StartTime: time.Now().UTC().Add(-time.Hour),
EndTime: time.Now().UTC().Add(time.Hour),
}
}
now := time.Now().UTC()
cases := []struct {
name string
maintenance *PlannedMaintenance
ruleID string
ts time.Time
lset model.LabelSet
skip bool
}{
{
name: "empty scope - no label filtering applied",
maintenance: &PlannedMaintenance{Schedule: activeSchedule()},
ruleID: "rule-1",
ts: now,
lset: model.LabelSet{"env": "production"},
skip: true,
},
{
name: "scope matches labels",
maintenance: &PlannedMaintenance{Schedule: activeSchedule(), Scope: `env == "production"`},
ruleID: "rule-1",
ts: now,
lset: model.LabelSet{"env": "production"},
skip: true,
},
{
name: "scope does not match labels",
maintenance: &PlannedMaintenance{Schedule: activeSchedule(), Scope: `env == "production"`},
ruleID: "rule-1",
ts: now,
lset: model.LabelSet{"env": "staging"},
skip: false,
},
{
name: "AND expression - both conditions match",
maintenance: &PlannedMaintenance{Schedule: activeSchedule(), Scope: `env == "production" && service == "api"`},
ruleID: "rule-1",
ts: now,
lset: model.LabelSet{"env": "production", "service": "api"},
skip: true,
},
{
name: "AND expression - one condition does not match",
maintenance: &PlannedMaintenance{Schedule: activeSchedule(), Scope: `env == "production" && service == "api"`},
ruleID: "rule-1",
ts: now,
lset: model.LabelSet{"env": "production", "service": "worker"},
skip: false,
},
{
name: "OR expression - first alternative matches",
maintenance: &PlannedMaintenance{Schedule: activeSchedule(), Scope: `env == "production" || env == "staging"`},
ruleID: "rule-1",
ts: now,
lset: model.LabelSet{"env": "production"},
skip: true,
},
{
name: "OR expression - second alternative matches",
maintenance: &PlannedMaintenance{Schedule: activeSchedule(), Scope: `env == "production" || env == "staging"`},
ruleID: "rule-1",
ts: now,
lset: model.LabelSet{"env": "staging"},
skip: true,
},
{
name: "OR expression - neither alternative matches",
maintenance: &PlannedMaintenance{Schedule: activeSchedule(), Scope: `env == "production" || env == "staging"`},
ruleID: "rule-1",
ts: now,
lset: model.LabelSet{"env": "development"},
skip: false,
},
{
name: "scope references label absent from lset",
maintenance: &PlannedMaintenance{Schedule: activeSchedule(), Scope: `env == "production"`},
ruleID: "rule-1",
ts: now,
lset: model.LabelSet{"service": "api"},
skip: false,
},
{
name: "in expression - value is in list",
maintenance: &PlannedMaintenance{Schedule: activeSchedule(), Scope: `env in ["production", "staging"]`},
ruleID: "rule-1",
ts: now,
lset: model.LabelSet{"env": "staging"},
skip: true,
},
{
name: "in expression - value not in list",
maintenance: &PlannedMaintenance{Schedule: activeSchedule(), Scope: `env in ["production", "staging"]`},
ruleID: "rule-1",
ts: now,
lset: model.LabelSet{"env": "development"},
skip: false,
},
{
name: "ruleID in list and scope matches - should skip",
maintenance: &PlannedMaintenance{Schedule: activeSchedule(), RuleIDs: []string{"rule-1", "rule-2"}, Scope: `env == "production"`},
ruleID: "rule-1",
ts: now,
lset: model.LabelSet{"env": "production"},
skip: true,
},
{
name: "ruleID not in list and scope matches - ruleID gate prevents skip",
maintenance: &PlannedMaintenance{Schedule: activeSchedule(), RuleIDs: []string{"rule-2"}, Scope: `env == "production"`},
ruleID: "rule-1",
ts: now,
lset: model.LabelSet{"env": "production"},
skip: false,
},
{
name: "ruleID in list but scope does not match - should not skip",
maintenance: &PlannedMaintenance{Schedule: activeSchedule(), RuleIDs: []string{"rule-1"}, Scope: `env == "production"`},
ruleID: "rule-1",
ts: now,
lset: model.LabelSet{"env": "staging"},
skip: false,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
got := c.maintenance.ShouldSkip(c.ruleID, c.ts, c.lset)
if got != c.skip {
t.Errorf("ShouldSkip() = %v, want %v", got, c.skip)
}
})
}
}
func TestEvalScopeExpression(t *testing.T) {
cases := []struct {
name string
expression string
lset model.LabelSet
want bool
}{
{
name: "equality match",
expression: `env == "production"`,
lset: model.LabelSet{"env": "production"},
want: true,
},
{
name: "equality no match",
expression: `env == "production"`,
lset: model.LabelSet{"env": "staging"},
want: false,
},
{
name: "inequality match",
expression: `env != "production"`,
lset: model.LabelSet{"env": "staging"},
want: true,
},
{
name: "AND - both match",
expression: `env == "production" && service == "api"`,
lset: model.LabelSet{"env": "production", "service": "api"},
want: true,
},
{
name: "AND - partial match",
expression: `env == "production" && service == "api"`,
lset: model.LabelSet{"env": "production", "service": "worker"},
want: false,
},
{
name: "OR - first matches",
expression: `env == "production" || env == "staging"`,
lset: model.LabelSet{"env": "production"},
want: true,
},
{
name: "OR - second matches",
expression: `env == "production" || env == "staging"`,
lset: model.LabelSet{"env": "staging"},
want: true,
},
{
name: "OR - none match",
expression: `env == "production" || env == "staging"`,
lset: model.LabelSet{"env": "development"},
want: false,
},
{
name: "undefined label returns false",
expression: `env == "production"`,
lset: model.LabelSet{"service": "api"},
want: false,
},
{
name: "in list - present",
expression: `env in ["production", "staging"]`,
lset: model.LabelSet{"env": "production"},
want: true,
},
{
name: "in list - absent",
expression: `env in ["production", "staging"]`,
lset: model.LabelSet{"env": "development"},
want: false,
},
{
name: "invalid expression returns false",
expression: `env ==`,
lset: model.LabelSet{"env": "production"},
want: false,
},
{
name: "non-bool expression returns false",
expression: `env`,
lset: model.LabelSet{"env": "production"},
want: false,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
got := evalScopeExpression(c.expression, c.lset)
if got != c.want {
t.Errorf("evalScopeExpression(%q, %v) = %v, want %v", c.expression, c.lset, got, c.want)
}
})
}
}
func TestPostablePlannedMaintenance_ValidateScope(t *testing.T) {
validSchedule := &Schedule{
Timezone: "UTC",
StartTime: time.Now().UTC(),
EndTime: time.Now().UTC().Add(time.Hour),
}
cases := []struct {
name string
scope string
wantErr bool
}{
{name: "empty scope", scope: "", wantErr: false},
{name: "simple equality", scope: `env == "production"`, wantErr: false},
{name: "AND expression", scope: `env == "production" && service == "api"`, wantErr: false},
{name: "OR expression", scope: `env == "production" || env == "staging"`, wantErr: false},
{name: "in expression", scope: `env in ["production", "staging"]`, wantErr: false},
{name: "incomplete expression", scope: `env ==`, wantErr: true},
{name: "non-bool expression", scope: `"just a string"`, wantErr: true},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
p := &PostablePlannedMaintenance{
Name: "test",
Schedule: validSchedule,
Scope: c.scope,
}
err := p.Validate()
if (err != nil) != c.wantErr {
t.Errorf("Validate() error = %v, wantErr %v", err, c.wantErr)
}
})
}
}
func TestConvertLabelSetToEnv(t *testing.T) {
cases := []struct {
name string
lset model.LabelSet
expected map[string]interface{}
}{
{
name: "simple keys",
lset: model.LabelSet{"key1": "value1", "key2": "value2"},
expected: map[string]interface{}{"key1": "value1", "key2": "value2"},
},
{
name: "dotted keys become nested maps",
lset: model.LabelSet{"foo.bar": "value1", "foo.baz": "value2"},
expected: map[string]interface{}{
"foo": map[string]interface{}{"bar": "value1", "baz": "value2"},
},
},
{
name: "deeper dotted key wins over shallow dotted key",
lset: model.LabelSet{"foo.bar.baz": "deep", "foo.bar": "shallow"},
expected: map[string]interface{}{
"foo": map[string]interface{}{
"bar": map[string]interface{}{"baz": "deep"},
},
},
},
{
name: "nested structure wins over plain key",
lset: model.LabelSet{"foo.bar": "value", "foo": "ignored"},
expected: map[string]interface{}{
"foo": map[string]interface{}{"bar": "value"},
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
got := ConvertLabelSetToEnv(c.lset)
if !reflect.DeepEqual(got, c.expected) {
t.Errorf("ConvertLabelSetToEnv() = %v, want %v", got, c.expected)
}
})
}
}

View File

@@ -108,7 +108,6 @@ func (s *Schedule) UnmarshalJSON(data []byte) error {
if err != nil {
return err
}
// TODO(jatinderjit): if endTime.IsZero() then we should not set the endTime
s.EndTime = time.Date(endTime.Year(), endTime.Month(), endTime.Day(), endTime.Hour(), endTime.Minute(), endTime.Second(), endTime.Nanosecond(), loc)
}

View File

@@ -20,6 +20,7 @@ from fixtures.querier import (
index_series_by_label,
make_query_request,
)
from fixtures.traces import TraceIdGenerator, Traces, TracesKind, TracesStatusCode
def test_logs_list(
@@ -2293,3 +2294,333 @@ def test_logs_formula_orderby_and_limit(
assert len(f3_services) == 3, f"F3: expected 3 rows after limit, got {len(f3_services)}"
assert f3_values == f4_values[:3], f"F3 values {f3_values} do not match F4[:3] values {f4_values[:3]}"
assert set(f3_services) == set(f4_services[:3]), f"F3 services {f3_services} do not match F4[:3] services {f4_services[:3]}"
def test_logs_list_filter_by_trace_id(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[list[Logs]], None],
insert_traces: Callable[[list[Traces]], None],
) -> None:
"""
Tests that filtering logs by trace_id uses the trace_summary lookup to
narrow the query window before scanning the logs table:
1. Returns the matching log (narrow window, single bucket).
2. Does not return duplicate logs when the query window spans multiple
exponential buckets (>1 h).
3. Returns no results when the query window does not contain the trace.
4. Logs carrying a trace_id whose trace is NOT in trace_summary (e.g.
traces disabled) are still returned — the lookup miss must not
short-circuit logs queries.
"""
target_trace_id = TraceIdGenerator.trace_id()
other_trace_id = TraceIdGenerator.trace_id()
orphan_trace_id = TraceIdGenerator.trace_id()
target_root_span_id = TraceIdGenerator.span_id()
target_child_span_id = TraceIdGenerator.span_id()
other_span_id = TraceIdGenerator.span_id()
orphan_span_id = TraceIdGenerator.span_id()
now = datetime.now(tz=UTC).replace(second=0, microsecond=0)
common_resources = {
"deployment.environment": "production",
"service.name": "logs-trace-filter-service",
"cloud.provider": "integration",
}
# Populate signoz_traces.distributed_trace_summary by inserting spans for
# the target trace_id. trace_summary records min/max of span timestamps
# (it ignores span duration), so two spans are inserted to give the trace
# a non-trivial recorded window of [now-10s, now-5s].
insert_traces(
[
Traces(
timestamp=now - timedelta(seconds=10),
duration=timedelta(seconds=1),
trace_id=target_trace_id,
span_id=target_root_span_id,
parent_span_id="",
name="root-span",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources=common_resources,
attributes={},
),
Traces(
timestamp=now - timedelta(seconds=5),
duration=timedelta(seconds=1),
trace_id=target_trace_id,
span_id=target_child_span_id,
parent_span_id=target_root_span_id,
name="child-span",
kind=TracesKind.SPAN_KIND_CLIENT,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources=common_resources,
attributes={},
),
]
)
# Insert logs:
# - one with the target trace_id, at a timestamp within the trace's
# recorded window (now-10s..now-5s, padded ±1s).
# - one with a different trace_id; must never appear in target_trace_id
# results.
# - one with an orphan trace_id whose trace was never ingested — used to
# verify the lookup miss does NOT short-circuit logs queries.
insert_logs(
[
Logs(
timestamp=now - timedelta(seconds=7),
resources=common_resources,
attributes={"http.method": "GET"},
body="log inside the target trace window",
severity_text="INFO",
trace_id=target_trace_id,
span_id=target_root_span_id,
),
Logs(
timestamp=now - timedelta(seconds=3),
resources=common_resources,
attributes={"http.method": "POST"},
body="log with a different trace_id",
severity_text="INFO",
trace_id=other_trace_id,
span_id=other_span_id,
),
Logs(
timestamp=now - timedelta(seconds=2),
resources=common_resources,
attributes={"http.method": "PUT"},
body="log with a trace_id absent from trace_summary",
severity_text="INFO",
trace_id=orphan_trace_id,
span_id=orphan_span_id,
),
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
def _query(start_ms: int, end_ms: int, trace_id: str) -> list:
response = make_query_request(
signoz,
token,
start_ms=start_ms,
end_ms=end_ms,
request_type="raw",
queries=[
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "logs",
"disabled": False,
"limit": 100,
"offset": 0,
"filter": {"expression": f"trace_id = '{trace_id}'"},
"order": [
{"key": {"name": "timestamp"}, "direction": "desc"},
{"key": {"name": "id"}, "direction": "desc"},
],
"having": {"expression": ""},
"aggregations": [{"expression": "count()"}],
},
}
],
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
return response.json()["data"]["data"]["results"][0]["rows"] or []
now_ms = int(now.timestamp() * 1000)
# --- Test 1: narrow window (single bucket, <1 h) ---
narrow_start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
narrow_rows = _query(narrow_start_ms, now_ms, target_trace_id)
assert len(narrow_rows) == 1, f"Expected 1 log for trace_id filter (narrow window), got {len(narrow_rows)}"
assert narrow_rows[0]["data"]["trace_id"] == target_trace_id
assert narrow_rows[0]["data"]["span_id"] == target_root_span_id
# --- Test 2: wide window (>1 h, triggers multiple exponential buckets) ---
# Should still return exactly one log — no duplicates from multi-bucket scan.
wide_start_ms = int((now - timedelta(hours=12)).timestamp() * 1000)
wide_rows = _query(wide_start_ms, now_ms, target_trace_id)
assert len(wide_rows) == 1, f"Expected 1 log for trace_id filter (wide window, multi-bucket), got {len(wide_rows)} — possible duplicate-log regression"
assert wide_rows[0]["data"]["trace_id"] == target_trace_id
assert wide_rows[0]["data"]["span_id"] == target_root_span_id
# --- Test 3: window that does not contain the trace returns no results ---
past_start_ms = int((now - timedelta(hours=6)).timestamp() * 1000)
past_end_ms = int((now - timedelta(hours=2)).timestamp() * 1000)
past_rows = _query(past_start_ms, past_end_ms, target_trace_id)
assert len(past_rows) == 0, f"Expected 0 logs for trace_id filter outside time window, got {len(past_rows)}"
# --- Test 4: trace_id not present in trace_summary still returns logs ---
orphan_rows = _query(narrow_start_ms, now_ms, orphan_trace_id)
assert len(orphan_rows) == 1, f"Expected 1 log for orphan trace_id (no trace_summary entry), got {len(orphan_rows)} — logs query may have been incorrectly short-circuited"
assert orphan_rows[0]["data"]["trace_id"] == orphan_trace_id
def test_logs_aggregation_filter_by_trace_id(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[list[Logs]], None],
insert_traces: Callable[[list[Traces]], None],
) -> None:
"""
Tests that the trace_id time-range optimization also applies to
non-window-list (time_series / aggregation) logs queries:
1. Wide query window containing the trace returns the correct count.
2. Query window outside the trace's time range short-circuits to an
empty result.
3. A trace_id with no row in trace_summary (e.g. traces disabled) still
returns the matching logs — the lookup miss must not short-circuit
logs aggregation queries.
"""
target_trace_id = TraceIdGenerator.trace_id()
orphan_trace_id = TraceIdGenerator.trace_id()
target_root_span_id = TraceIdGenerator.span_id()
target_child_span_id = TraceIdGenerator.span_id()
orphan_span_id = TraceIdGenerator.span_id()
now = datetime.now(tz=UTC).replace(second=0, microsecond=0)
common_resources = {
"deployment.environment": "production",
"service.name": "logs-trace-agg-service",
"cloud.provider": "integration",
}
# trace_summary records min/max of span timestamps (it ignores duration),
# so insert two spans to give the trace a recorded window wide enough to
# comfortably contain the log timestamps below.
insert_traces(
[
Traces(
timestamp=now - timedelta(seconds=10),
duration=timedelta(seconds=1),
trace_id=target_trace_id,
span_id=target_root_span_id,
parent_span_id="",
name="root-span",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources=common_resources,
attributes={},
),
Traces(
timestamp=now - timedelta(seconds=5),
duration=timedelta(seconds=1),
trace_id=target_trace_id,
span_id=target_child_span_id,
parent_span_id=target_root_span_id,
name="child-span",
kind=TracesKind.SPAN_KIND_CLIENT,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources=common_resources,
attributes={},
),
]
)
# Two logs for the target trace_id, both inside the recorded trace window.
# One additional log carries an orphan trace_id with no row in
# trace_summary — used to verify that the lookup miss does not
# short-circuit logs aggregations.
insert_logs(
[
Logs(
timestamp=now - timedelta(seconds=9),
resources=common_resources,
attributes={},
body="log A inside trace window",
severity_text="INFO",
trace_id=target_trace_id,
span_id=target_root_span_id,
),
Logs(
timestamp=now - timedelta(seconds=6),
resources=common_resources,
attributes={},
body="log B inside trace window",
severity_text="INFO",
trace_id=target_trace_id,
span_id=target_root_span_id,
),
Logs(
timestamp=now - timedelta(seconds=2),
resources=common_resources,
attributes={},
body="log with a trace_id absent from trace_summary",
severity_text="INFO",
trace_id=orphan_trace_id,
span_id=orphan_span_id,
),
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
def _count(start_ms: int, end_ms: int, trace_id: str) -> float:
response = make_query_request(
signoz,
token,
start_ms=start_ms,
end_ms=end_ms,
request_type="time_series",
queries=[
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "logs",
"stepInterval": 60,
"disabled": False,
"filter": {"expression": f"trace_id = '{trace_id}'"},
"having": {"expression": ""},
"aggregations": [{"expression": "count()"}],
},
}
],
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
results = response.json()["data"]["data"]["results"]
assert len(results) == 1
aggregations = results[0].get("aggregations") or []
if not aggregations:
return 0
series = aggregations[0].get("series") or []
if not series:
return 0
return sum(v["value"] for v in series[0]["values"])
now_ms = int(now.timestamp() * 1000)
narrow_start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
# --- Test 1: wide window (>1 h) containing the trace returns 2 logs ---
wide_start_ms = int((now - timedelta(hours=12)).timestamp() * 1000)
wide_count = _count(wide_start_ms, now_ms, target_trace_id)
assert wide_count == 2, f"Expected count=2 for trace_id aggregation (wide window), got {wide_count}"
# --- Test 2: window outside the trace short-circuits to empty ---
past_start_ms = int((now - timedelta(hours=6)).timestamp() * 1000)
past_end_ms = int((now - timedelta(hours=2)).timestamp() * 1000)
past_count = _count(past_start_ms, past_end_ms, target_trace_id)
assert past_count == 0, f"Expected count=0 for trace_id aggregation outside time window, got {past_count}"
# --- Test 3: trace_id not present in trace_summary still returns logs ---
orphan_count = _count(narrow_start_ms, now_ms, orphan_trace_id)
assert orphan_count == 1, f"Expected count=1 for orphan trace_id aggregation, got {orphan_count} — query may have been incorrectly short-circuited"

View File

@@ -2123,3 +2123,116 @@ def test_traces_list_filter_by_trace_id(
past_rows = _query(past_start_ms, past_end_ms)
assert len(past_rows) == 0, f"Expected 0 spans for trace_id filter outside time window, got {len(past_rows)}"
def test_traces_aggregation_filter_by_trace_id(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[list[Traces]], None],
) -> None:
"""
Tests that the trace_id time-range optimization also applies to
non-window-list (time_series / aggregation) traces queries:
1. Wide query window containing the trace returns the correct count.
2. Query window outside the trace's time range short-circuits to empty.
3. Filter referencing a trace_id with no row in trace_summary
short-circuits to empty (trace_summary is authoritative for traces).
"""
target_trace_id = TraceIdGenerator.trace_id()
target_root_span_id = TraceIdGenerator.span_id()
target_child_span_id = TraceIdGenerator.span_id()
missing_trace_id = TraceIdGenerator.trace_id()
now = datetime.now(tz=UTC).replace(second=0, microsecond=0)
common_resources = {
"deployment.environment": "production",
"service.name": "traces-agg-filter-service",
"cloud.provider": "integration",
}
insert_traces(
[
Traces(
timestamp=now - timedelta(seconds=10),
duration=timedelta(seconds=5),
trace_id=target_trace_id,
span_id=target_root_span_id,
parent_span_id="",
name="root-span",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources=common_resources,
attributes={"http.request.method": "GET"},
),
Traces(
timestamp=now - timedelta(seconds=9),
duration=timedelta(seconds=1),
trace_id=target_trace_id,
span_id=target_child_span_id,
parent_span_id=target_root_span_id,
name="child-span",
kind=TracesKind.SPAN_KIND_CLIENT,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources=common_resources,
attributes={},
),
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
def _count(start_ms: int, end_ms: int, trace_id: str) -> float:
response = make_query_request(
signoz,
token,
start_ms=start_ms,
end_ms=end_ms,
request_type="time_series",
queries=[
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "traces",
"stepInterval": 60,
"disabled": False,
"filter": {"expression": f"trace_id = '{trace_id}'"},
"having": {"expression": ""},
"aggregations": [{"expression": "count()"}],
},
}
],
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
results = response.json()["data"]["data"]["results"]
assert len(results) == 1
aggregations = results[0].get("aggregations") or []
if not aggregations:
return 0
series = aggregations[0].get("series") or []
if not series:
return 0
return sum(v["value"] for v in series[0]["values"])
now_ms = int(now.timestamp() * 1000)
# --- Test 1: wide window (>1 h) containing the trace returns both spans ---
wide_start_ms = int((now - timedelta(hours=12)).timestamp() * 1000)
wide_count = _count(wide_start_ms, now_ms, target_trace_id)
assert wide_count == 2, f"Expected count=2 for trace_id aggregation (wide window), got {wide_count}"
# --- Test 2: window outside the trace short-circuits to empty ---
past_start_ms = int((now - timedelta(hours=6)).timestamp() * 1000)
past_end_ms = int((now - timedelta(hours=2)).timestamp() * 1000)
past_count = _count(past_start_ms, past_end_ms, target_trace_id)
assert past_count == 0, f"Expected count=0 for trace_id aggregation outside time window, got {past_count}"
# --- Test 3: trace_id with no entry in trace_summary short-circuits ---
missing_start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
missing_count = _count(missing_start_ms, now_ms, missing_trace_id)
assert missing_count == 0, f"Expected count=0 for trace_id absent from trace_summary, got {missing_count}"