mirror of
https://github.com/SigNoz/signoz.git
synced 2026-06-21 07:40:28 +01:00
Compare commits
1 Commits
main
...
issue_1170
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4744f83cfe |
@@ -141,6 +141,10 @@ querier:
|
||||
flux_interval: 5m
|
||||
# The maximum number of concurrent queries for missing ranges.
|
||||
max_concurrent_queries: 4
|
||||
# When filtering logs by trace_id, clamp the query window to the trace time
|
||||
# range with padding to include slightly delayed log exports. Logs only; set
|
||||
# to 0 to disable.
|
||||
log_trace_id_window_padding: 5m
|
||||
|
||||
##################### TelemetryStore #####################
|
||||
telemetrystore:
|
||||
|
||||
@@ -31,6 +31,8 @@ type builderQuery[T any] struct {
|
||||
fromMS uint64
|
||||
toMS uint64
|
||||
kind qbtypes.RequestType
|
||||
|
||||
logTraceIDWindowPaddingMS uint64
|
||||
}
|
||||
|
||||
var _ qbtypes.Query = (*builderQuery[any])(nil)
|
||||
@@ -43,16 +45,18 @@ func newBuilderQuery[T any](
|
||||
tr qbtypes.TimeRange,
|
||||
kind qbtypes.RequestType,
|
||||
variables map[string]qbtypes.VariableItem,
|
||||
logTraceIDWindowPaddingMS uint64,
|
||||
) *builderQuery[T] {
|
||||
return &builderQuery[T]{
|
||||
logger: logger,
|
||||
telemetryStore: telemetryStore,
|
||||
stmtBuilder: stmtBuilder,
|
||||
spec: spec,
|
||||
variables: variables,
|
||||
fromMS: tr.From,
|
||||
toMS: tr.To,
|
||||
kind: kind,
|
||||
logger: logger,
|
||||
telemetryStore: telemetryStore,
|
||||
stmtBuilder: stmtBuilder,
|
||||
spec: spec,
|
||||
variables: variables,
|
||||
fromMS: tr.From,
|
||||
toMS: tr.To,
|
||||
kind: kind,
|
||||
logTraceIDWindowPaddingMS: logTraceIDWindowPaddingMS,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -277,9 +281,20 @@ func (q *builderQuery[T]) narrowWindowByTraceID(ctx context.Context, fromMS, toM
|
||||
return fromMS, toMS, true, ""
|
||||
}
|
||||
|
||||
// Logs can be flushed slightly after the span ends. The trace
|
||||
// time range comes from the spans table, so for logs we widen it by the
|
||||
// configured padding before clamping. Keep the actual recorded bounds for
|
||||
// the user-facing warning so it reports where the trace truly lies, not the
|
||||
// padded range.
|
||||
actualStartMS, actualEndMS := traceStartMS, traceEndMS
|
||||
if q.spec.Signal == telemetrytypes.SignalLogs {
|
||||
traceStartMS -= q.logTraceIDWindowPaddingMS
|
||||
traceEndMS += q.logTraceIDWindowPaddingMS
|
||||
}
|
||||
|
||||
if traceStartMS > toMS || traceEndMS < fromMS {
|
||||
traceStartUTC := time.UnixMilli(int64(traceStartMS)).UTC().Format(time.RFC3339)
|
||||
traceEndUTC := time.UnixMilli(int64(traceEndMS)).UTC().Format(time.RFC3339)
|
||||
traceStartUTC := time.UnixMilli(int64(actualStartMS)).UTC().Format(time.RFC3339)
|
||||
traceEndUTC := time.UnixMilli(int64(actualEndMS)).UTC().Format(time.RFC3339)
|
||||
return fromMS, toMS, false, fmt.Sprintf(traceOutsideRangeWarn, q.spec.Name, traceStartUTC, traceEndUTC)
|
||||
}
|
||||
if traceStartMS > fromMS {
|
||||
|
||||
@@ -23,6 +23,8 @@ type Config struct {
|
||||
MaxConcurrentQueries int `yaml:"max_concurrent_queries" mapstructure:"max_concurrent_queries"`
|
||||
// SkipResourceFingerprint configures when the resource fingerprint subquery is skipped in favor of main-table filtering.
|
||||
SkipResourceFingerprint SkipResourceFingerprint `yaml:"skip_resource_fingerprint" mapstructure:"skip_resource_fingerprint"`
|
||||
// LogTraceIDWindowPadding is the padding added to narrowed down timerange from trace summary to logs with trace_id filter.
|
||||
LogTraceIDWindowPadding time.Duration `yaml:"log_trace_id_window_padding" mapstructure:"log_trace_id_window_padding"`
|
||||
}
|
||||
|
||||
// NewConfigFactory creates a new config factory for querier.
|
||||
@@ -40,6 +42,7 @@ func newConfig() factory.Config {
|
||||
Enabled: false,
|
||||
Threshold: 100000,
|
||||
},
|
||||
LogTraceIDWindowPadding: 5 * time.Minute,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -57,6 +60,9 @@ func (c Config) Validate() error {
|
||||
if c.SkipResourceFingerprint.Enabled && c.SkipResourceFingerprint.Threshold == 0 {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput, "skip_resource_fingerprint.threshold must be > 0 when enabled")
|
||||
}
|
||||
if c.LogTraceIDWindowPadding < 0 {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput, "log_trace_id_window_padding must not be negative, got %v", c.LogTraceIDWindowPadding)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -35,19 +35,20 @@ var (
|
||||
)
|
||||
|
||||
type querier struct {
|
||||
logger *slog.Logger
|
||||
fl flagger.Flagger
|
||||
telemetryStore telemetrystore.TelemetryStore
|
||||
metadataStore telemetrytypes.MetadataStore
|
||||
promEngine prometheus.Prometheus
|
||||
traceStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation]
|
||||
logStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation]
|
||||
auditStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation]
|
||||
metricStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation]
|
||||
meterStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation]
|
||||
traceOperatorStmtBuilder qbtypes.TraceOperatorStatementBuilder
|
||||
bucketCache BucketCache
|
||||
liveDataRefresh time.Duration
|
||||
logger *slog.Logger
|
||||
fl flagger.Flagger
|
||||
telemetryStore telemetrystore.TelemetryStore
|
||||
metadataStore telemetrytypes.MetadataStore
|
||||
promEngine prometheus.Prometheus
|
||||
traceStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation]
|
||||
logStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation]
|
||||
auditStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation]
|
||||
metricStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation]
|
||||
meterStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation]
|
||||
traceOperatorStmtBuilder qbtypes.TraceOperatorStatementBuilder
|
||||
bucketCache BucketCache
|
||||
liveDataRefresh time.Duration
|
||||
logTraceIDWindowPaddingMS uint64
|
||||
}
|
||||
|
||||
var _ Querier = (*querier)(nil)
|
||||
@@ -65,22 +66,24 @@ func New(
|
||||
traceOperatorStmtBuilder qbtypes.TraceOperatorStatementBuilder,
|
||||
bucketCache BucketCache,
|
||||
flagger flagger.Flagger,
|
||||
logTraceIDWindowPadding time.Duration,
|
||||
) *querier {
|
||||
querierSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/querier")
|
||||
return &querier{
|
||||
logger: querierSettings.Logger(),
|
||||
fl: flagger,
|
||||
telemetryStore: telemetryStore,
|
||||
metadataStore: metadataStore,
|
||||
promEngine: promEngine,
|
||||
traceStmtBuilder: traceStmtBuilder,
|
||||
logStmtBuilder: logStmtBuilder,
|
||||
auditStmtBuilder: auditStmtBuilder,
|
||||
metricStmtBuilder: metricStmtBuilder,
|
||||
meterStmtBuilder: meterStmtBuilder,
|
||||
traceOperatorStmtBuilder: traceOperatorStmtBuilder,
|
||||
bucketCache: bucketCache,
|
||||
liveDataRefresh: 5 * time.Second,
|
||||
logger: querierSettings.Logger(),
|
||||
fl: flagger,
|
||||
telemetryStore: telemetryStore,
|
||||
metadataStore: metadataStore,
|
||||
promEngine: promEngine,
|
||||
traceStmtBuilder: traceStmtBuilder,
|
||||
logStmtBuilder: logStmtBuilder,
|
||||
auditStmtBuilder: auditStmtBuilder,
|
||||
metricStmtBuilder: metricStmtBuilder,
|
||||
meterStmtBuilder: meterStmtBuilder,
|
||||
traceOperatorStmtBuilder: traceOperatorStmtBuilder,
|
||||
bucketCache: bucketCache,
|
||||
liveDataRefresh: 5 * time.Second,
|
||||
logTraceIDWindowPaddingMS: uint64(logTraceIDWindowPadding.Milliseconds()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -173,7 +176,7 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
|
||||
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
|
||||
spec.ShiftBy = extractShiftFromBuilderQuery(spec)
|
||||
timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
|
||||
bq := newBuilderQuery(q.logger, q.telemetryStore, q.traceStmtBuilder, spec, timeRange, req.RequestType, tmplVars)
|
||||
bq := newBuilderQuery(q.logger, q.telemetryStore, q.traceStmtBuilder, spec, timeRange, req.RequestType, tmplVars, 0)
|
||||
queries[spec.Name] = bq
|
||||
steps[spec.Name] = spec.StepInterval
|
||||
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
|
||||
@@ -183,7 +186,7 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
|
||||
if spec.Source == telemetrytypes.SourceAudit {
|
||||
stmtBuilder = q.auditStmtBuilder
|
||||
}
|
||||
bq := newBuilderQuery(q.logger, q.telemetryStore, stmtBuilder, spec, timeRange, req.RequestType, tmplVars)
|
||||
bq := newBuilderQuery(q.logger, q.telemetryStore, stmtBuilder, spec, timeRange, req.RequestType, tmplVars, q.logTraceIDWindowPaddingMS)
|
||||
queries[spec.Name] = bq
|
||||
steps[spec.Name] = spec.StepInterval
|
||||
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
|
||||
@@ -200,9 +203,9 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
|
||||
|
||||
if spec.Source == telemetrytypes.SourceMeter {
|
||||
event.Source = telemetrytypes.SourceMeter.StringValue()
|
||||
bq = newBuilderQuery(q.logger, q.telemetryStore, q.meterStmtBuilder, spec, timeRange, req.RequestType, tmplVars)
|
||||
bq = newBuilderQuery(q.logger, q.telemetryStore, q.meterStmtBuilder, spec, timeRange, req.RequestType, tmplVars, 0)
|
||||
} else {
|
||||
bq = newBuilderQuery(q.logger, q.telemetryStore, q.metricStmtBuilder, spec, timeRange, req.RequestType, tmplVars)
|
||||
bq = newBuilderQuery(q.logger, q.telemetryStore, q.metricStmtBuilder, spec, timeRange, req.RequestType, tmplVars, 0)
|
||||
}
|
||||
|
||||
queries[spec.Name] = bq
|
||||
@@ -508,7 +511,7 @@ func (q *querier) QueryRawStream(ctx context.Context, orgID valuer.UUID, req *qb
|
||||
"id": {
|
||||
Value: updatedLogID,
|
||||
},
|
||||
})
|
||||
}, q.logTraceIDWindowPaddingMS)
|
||||
queries[spec.Name] = bq
|
||||
|
||||
qbResp, qbErr := q.run(ctx, orgID, queries, req, nil, event, nil)
|
||||
@@ -804,7 +807,7 @@ func (q *querier) createRangedQuery(originalQuery qbtypes.Query, timeRange qbtyp
|
||||
specCopy := qt.spec.Copy()
|
||||
specCopy.ShiftBy = extractShiftFromBuilderQuery(specCopy)
|
||||
adjustedTimeRange := adjustTimeRangeForShift(specCopy, timeRange, qt.kind)
|
||||
return newBuilderQuery(q.logger, q.telemetryStore, q.traceStmtBuilder, specCopy, adjustedTimeRange, qt.kind, qt.variables)
|
||||
return newBuilderQuery(q.logger, q.telemetryStore, q.traceStmtBuilder, specCopy, adjustedTimeRange, qt.kind, qt.variables, 0)
|
||||
|
||||
case *builderQuery[qbtypes.LogAggregation]:
|
||||
specCopy := qt.spec.Copy()
|
||||
@@ -814,16 +817,16 @@ func (q *querier) createRangedQuery(originalQuery qbtypes.Query, timeRange qbtyp
|
||||
if qt.spec.Source == telemetrytypes.SourceAudit {
|
||||
shiftStmtBuilder = q.auditStmtBuilder
|
||||
}
|
||||
return newBuilderQuery(q.logger, q.telemetryStore, shiftStmtBuilder, specCopy, adjustedTimeRange, qt.kind, qt.variables)
|
||||
return newBuilderQuery(q.logger, q.telemetryStore, shiftStmtBuilder, specCopy, adjustedTimeRange, qt.kind, qt.variables, q.logTraceIDWindowPaddingMS)
|
||||
|
||||
case *builderQuery[qbtypes.MetricAggregation]:
|
||||
specCopy := qt.spec.Copy()
|
||||
specCopy.ShiftBy = extractShiftFromBuilderQuery(specCopy)
|
||||
adjustedTimeRange := adjustTimeRangeForShift(specCopy, timeRange, qt.kind)
|
||||
if qt.spec.Source == telemetrytypes.SourceMeter {
|
||||
return newBuilderQuery(q.logger, q.telemetryStore, q.meterStmtBuilder, specCopy, adjustedTimeRange, qt.kind, qt.variables)
|
||||
return newBuilderQuery(q.logger, q.telemetryStore, q.meterStmtBuilder, specCopy, adjustedTimeRange, qt.kind, qt.variables, 0)
|
||||
}
|
||||
return newBuilderQuery(q.logger, q.telemetryStore, q.metricStmtBuilder, specCopy, adjustedTimeRange, qt.kind, qt.variables)
|
||||
return newBuilderQuery(q.logger, q.telemetryStore, q.metricStmtBuilder, specCopy, adjustedTimeRange, qt.kind, qt.variables, 0)
|
||||
case *traceOperatorQuery:
|
||||
specCopy := qt.spec.Copy()
|
||||
return &traceOperatorQuery{
|
||||
|
||||
@@ -54,6 +54,7 @@ func TestQueryRange_MetricTypeMissing(t *testing.T) {
|
||||
nil, // traceOperatorStmtBuilder
|
||||
nil, // bucketCache
|
||||
flaggertest.New(t), // flagger
|
||||
0,
|
||||
)
|
||||
|
||||
req := &qbtypes.QueryRangeRequest{
|
||||
@@ -124,6 +125,7 @@ func TestQueryRange_MetricTypeFromStore(t *testing.T) {
|
||||
nil, // traceOperatorStmtBuilder
|
||||
nil, // bucketCache
|
||||
flaggertest.New(t), // flagger
|
||||
0,
|
||||
)
|
||||
|
||||
req := &qbtypes.QueryRangeRequest{
|
||||
|
||||
@@ -192,5 +192,6 @@ func newProvider(
|
||||
traceOperatorStmtBuilder,
|
||||
bucketCache,
|
||||
flagger,
|
||||
cfg.LogTraceIDWindowPadding,
|
||||
), nil
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package rules
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/flagger"
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
@@ -54,6 +55,7 @@ func prepareQuerierForMetrics(t *testing.T, telemetryStore telemetrystore.Teleme
|
||||
nil, // traceOperatorStmtBuilder
|
||||
nil, // bucketCache
|
||||
flagger,
|
||||
0,
|
||||
), metadataStore
|
||||
}
|
||||
|
||||
@@ -107,6 +109,7 @@ func prepareQuerierForLogs(t *testing.T, telemetryStore telemetrystore.Telemetry
|
||||
nil, // traceOperatorStmtBuilder
|
||||
nil, // bucketCache
|
||||
fl,
|
||||
5*time.Minute, // logTraceIDWindowPadding
|
||||
)
|
||||
}
|
||||
|
||||
@@ -154,5 +157,6 @@ func prepareQuerierForTraces(t *testing.T, telemetryStore telemetrystore.Telemet
|
||||
nil, // traceOperatorStmtBuilder
|
||||
nil, // bucketCache
|
||||
fl,
|
||||
0,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -2306,9 +2306,11 @@ def test_logs_list_filter_by_trace_id(
|
||||
"""
|
||||
Tests that filtering logs by trace_id uses the trace_summary lookup to
|
||||
narrow the query window before scanning the logs table:
|
||||
1. Returns the matching log (narrow window, single bucket).
|
||||
1. Returns the matching logs (narrow window, single bucket), including a log
|
||||
flushed shortly after the span ends — kept by the configured padding.
|
||||
2. Does not return duplicate logs when the query window should span multiple
|
||||
exponential buckets (>1 h). But is clamped to the timerange of trace.
|
||||
exponential buckets (>1 h). The window is clamped to the trace's recorded
|
||||
range widened by the padding, so the post-span log survives the clamp.
|
||||
3. Returns no results when the query window does not contain the trace.
|
||||
4. Logs carrying a trace_id whose trace is NOT in trace_summary (e.g.
|
||||
traces disabled) are still returned — the lookup miss must not
|
||||
@@ -2366,6 +2368,9 @@ def test_logs_list_filter_by_trace_id(
|
||||
# Insert logs:
|
||||
# - one with the target trace_id, at a timestamp within the trace's
|
||||
# recorded window (now-10s..now-5s, padded ±1s).
|
||||
# - one with the target trace_id flushed ~3s AFTER the span's recorded end
|
||||
# (now-2s). This is outside the ±1s base pad but inside the multi-minute
|
||||
# log_trace_id_window_padding, so it must still be returned.
|
||||
# - one with an orphan trace_id whose trace was never ingested — used to
|
||||
# verify the lookup miss does NOT short-circuit logs queries.
|
||||
insert_logs(
|
||||
@@ -2379,6 +2384,15 @@ def test_logs_list_filter_by_trace_id(
|
||||
trace_id=target_trace_id,
|
||||
span_id=target_root_span_id,
|
||||
),
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=2),
|
||||
resources=common_resources,
|
||||
attributes={"http.method": "POST"},
|
||||
body="log flushed after the span ends, within padding window",
|
||||
severity_text="INFO",
|
||||
trace_id=target_trace_id,
|
||||
span_id=target_root_span_id,
|
||||
),
|
||||
Logs(
|
||||
timestamp=now - timedelta(seconds=2),
|
||||
resources=common_resources,
|
||||
@@ -2429,23 +2443,31 @@ def test_logs_list_filter_by_trace_id(
|
||||
|
||||
now_ms = int(now.timestamp() * 1000)
|
||||
|
||||
inside_window_body = "log inside the target trace window"
|
||||
post_span_body = "log flushed after the span ends, within padding window"
|
||||
|
||||
# --- Test 1: narrow window (single bucket, <1 h) ---
|
||||
narrow_start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
|
||||
narrow_rows, narrow_warnings = _query(narrow_start_ms, now_ms, target_trace_id)
|
||||
|
||||
assert len(narrow_rows) == 1, f"Expected 1 log for trace_id filter (narrow window), got {len(narrow_rows)}"
|
||||
assert narrow_rows[0]["data"]["trace_id"] == target_trace_id
|
||||
assert narrow_rows[0]["data"]["span_id"] == target_root_span_id
|
||||
assert len(narrow_rows) == 2, f"Expected 2 logs for trace_id filter (narrow window), got {len(narrow_rows)}"
|
||||
assert {r["data"]["trace_id"] for r in narrow_rows} == {target_trace_id}
|
||||
narrow_bodies = {r["data"]["body"] for r in narrow_rows}
|
||||
assert inside_window_body in narrow_bodies
|
||||
assert post_span_body in narrow_bodies, "post-span log should be returned within the padding window"
|
||||
assert not any(outside_range_msg in m for m in narrow_warnings), f"Did not expect outside-range warning, got {narrow_warnings}"
|
||||
|
||||
# --- Test 2: wide window (>1 h, clamp to the timerange from trace_summary) ---
|
||||
# Should still return exactly one log — no duplicates from multi-bucket scan.
|
||||
# --- Test 2: wide window (>1 h, clamp to the padded timerange from trace_summary) ---
|
||||
# Should return exactly the two target logs — no duplicates from multi-bucket
|
||||
# scan, and the post-span log survives the clamp only because of the padding.
|
||||
wide_start_ms = int((now - timedelta(hours=12)).timestamp() * 1000)
|
||||
wide_rows, wide_warnings = _query(wide_start_ms, now_ms, target_trace_id)
|
||||
|
||||
assert len(wide_rows) == 1, f"Expected 1 log for trace_id filter (wide window, multi-bucket), got {len(wide_rows)} — possible duplicate-log regression"
|
||||
assert wide_rows[0]["data"]["trace_id"] == target_trace_id
|
||||
assert wide_rows[0]["data"]["span_id"] == target_root_span_id
|
||||
assert len(wide_rows) == 2, f"Expected 2 logs for trace_id filter (wide window, multi-bucket), got {len(wide_rows)} — possible duplicate-log regression or padding not applied"
|
||||
assert {r["data"]["trace_id"] for r in wide_rows} == {target_trace_id}
|
||||
wide_bodies = {r["data"]["body"] for r in wide_rows}
|
||||
assert inside_window_body in wide_bodies
|
||||
assert post_span_body in wide_bodies, "post-span log should survive the clamp because of the padding"
|
||||
assert not any(outside_range_msg in m for m in wide_warnings), f"Did not expect outside-range warning, got {wide_warnings}"
|
||||
|
||||
# --- Test 3: window that does not contain the trace returns no results + warning ---
|
||||
|
||||
@@ -15,7 +15,6 @@ from fixtures.querier import (
|
||||
build_builder_query,
|
||||
find_named_result,
|
||||
get_all_warnings,
|
||||
get_error_message,
|
||||
index_series_by_label,
|
||||
make_query_request,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user