Compare commits

...

8 Commits

Author SHA1 Message Date
Nikhil Soni
168b2eaa9c feat: query full spans for smaller traces 2026-05-27 00:12:38 +05:30
Nikhil Soni
6b613f18a3 feat: add api and module for flamegraph v3 2026-05-26 20:04:20 +05:30
Nikhil Soni
1b0447181d feat: add method to enrich selected spans 2026-05-26 20:03:47 +05:30
Nikhil Soni
20edff4771 feat: add config for flamegraph 2026-05-26 19:21:33 +05:30
Nikhil Soni
2048ef3d2f chore: remove limit from request payload
It's a new api so doesn't need to be backward compatible
2026-05-26 19:06:48 +05:30
Nikhil Soni
53c551359e feat: add types for flamegraph v3 in module structure 2026-05-26 18:56:35 +05:30
Nikhil Soni
1e326159b0 feat(tracedetail): add waterfall api with memory optimisations (#11450)
Some checks failed
build-staging / prepare (push) Has been cancelled
build-staging / js-build (push) Has been cancelled
build-staging / go-build (push) Has been cancelled
build-staging / staging (push) Has been cancelled
Release Drafter / update_release_draft (push) Has been cancelled
* feat: add store methods for minimal trace fetch

* feat: break down waterfall module to handle large spans

Handling large traces in two steps to avoid high
memory allocation

* refactor: keep the waterfall changes in new api version

This is to avoid the contract change in existing v3

* chore: avoid unnecessary diffs

* refactor: move conversion logic to types

* chore: update openapi specs

* refactor: use sqlbuider for queries

* chore: fix comment

* chore: avoid passing request type to module

* refactor: avoid passing whole summary object around

* chore: remove trace_id from querying since its already known

* chore: remove unused reference column from query

* chore: update openapi specs
2026-05-26 10:11:16 +00:00
Nityananda Gohain
ceb1b4871b feat: trace based filters for logs, supporting aggregations as well (#11394)
* feat: trace based filters for logs, supporting aggregations as well

* fix: update comments

* fix: cleanup query from tests

* fix: address comments

* fix: address comments

---------

Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
2026-05-26 09:57:18 +00:00
20 changed files with 1545 additions and 71 deletions

View File

@@ -434,6 +434,17 @@ tracedetail:
max_depth_to_auto_expand: 5
# Threshold below which all spans are returned without windowing.
max_limit_to_select_all_spans: 10000
flamegraph:
# Maximum number of BFS depth levels included in a windowed response.
max_selected_levels: 50
# Maximum spans per level before sampling is applied.
max_spans_per_level: 100
# Number of highest-latency spans always included when sampling a level.
sampling_top_latency_count: 5
# Number of timestamp buckets used for uniform sampling within a level.
sampling_bucket_count: 50
# Threshold below which all spans are returned without windowing or sampling.
select_all_spans_limit: 100000
##################### Authz #################################
authz:

View File

@@ -18948,6 +18948,77 @@ paths:
summary: Get waterfall view for a trace
tags:
- tracedetail
/api/v4/traces/{traceID}/waterfall:
post:
deprecated: false
description: Returns the waterfall view of spans including all spans if total
spans are under a limit, a max count otherwise. Aggregations are dropped compared
to v3
operationId: GetWaterfallV4
parameters:
- in: path
name: traceID
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/SpantypesPostableWaterfall'
responses:
"200":
content:
application/json:
schema:
properties:
data:
$ref: '#/components/schemas/SpantypesGettableWaterfallTrace'
status:
type: string
required:
- status
- data
type: object
description: OK
"400":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Bad Request
"401":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unauthorized
"403":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Forbidden
"404":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Not Found
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Internal Server Error
security:
- api_key:
- VIEWER
- tokenizer:
- VIEWER
summary: Get waterfall view for a trace
tags:
- tracedetail
/api/v5/query_range:
post:
deprecated: false

View File

@@ -9232,6 +9232,17 @@ export type GetWaterfall200 = {
status: string;
};
export type GetWaterfallV4PathParameters = {
traceID: string;
};
export type GetWaterfallV4200 = {
data: SpantypesGettableWaterfallTraceDTO;
/**
* @type string
*/
status: string;
};
export type QueryRangeV5200 = {
data: Querybuildertypesv5QueryRangeResponseDTO;
/**

View File

@@ -14,6 +14,8 @@ import type {
import type {
GetWaterfall200,
GetWaterfallPathParameters,
GetWaterfallV4200,
GetWaterfallV4PathParameters,
RenderErrorResponseDTO,
SpantypesPostableWaterfallDTO,
} from '../sigNoz.schemas';
@@ -120,3 +122,102 @@ export const useGetWaterfall = <
> => {
return useMutation(getGetWaterfallMutationOptions(options));
};
/**
* Returns the waterfall view of spans including all spans if total spans are under a limit, a max count otherwise. Aggregations are dropped compared to v3
* @summary Get waterfall view for a trace
*/
export const getWaterfallV4 = (
{ traceID }: GetWaterfallV4PathParameters,
spantypesPostableWaterfallDTO?: BodyType<SpantypesPostableWaterfallDTO>,
signal?: AbortSignal,
) => {
return GeneratedAPIInstance<GetWaterfallV4200>({
url: `/api/v4/traces/${traceID}/waterfall`,
method: 'POST',
headers: { 'Content-Type': 'application/json' },
data: spantypesPostableWaterfallDTO,
signal,
});
};
export const getGetWaterfallV4MutationOptions = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof getWaterfallV4>>,
TError,
{
pathParams: GetWaterfallV4PathParameters;
data?: BodyType<SpantypesPostableWaterfallDTO>;
},
TContext
>;
}): UseMutationOptions<
Awaited<ReturnType<typeof getWaterfallV4>>,
TError,
{
pathParams: GetWaterfallV4PathParameters;
data?: BodyType<SpantypesPostableWaterfallDTO>;
},
TContext
> => {
const mutationKey = ['getWaterfallV4'];
const { mutation: mutationOptions } = options
? options.mutation &&
'mutationKey' in options.mutation &&
options.mutation.mutationKey
? options
: { ...options, mutation: { ...options.mutation, mutationKey } }
: { mutation: { mutationKey } };
const mutationFn: MutationFunction<
Awaited<ReturnType<typeof getWaterfallV4>>,
{
pathParams: GetWaterfallV4PathParameters;
data?: BodyType<SpantypesPostableWaterfallDTO>;
}
> = (props) => {
const { pathParams, data } = props ?? {};
return getWaterfallV4(pathParams, data);
};
return { mutationFn, ...mutationOptions };
};
export type GetWaterfallV4MutationResult = NonNullable<
Awaited<ReturnType<typeof getWaterfallV4>>
>;
export type GetWaterfallV4MutationBody =
| BodyType<SpantypesPostableWaterfallDTO>
| undefined;
export type GetWaterfallV4MutationError = ErrorType<RenderErrorResponseDTO>;
/**
* @summary Get waterfall view for a trace
*/
export const useGetWaterfallV4 = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof getWaterfallV4>>,
TError,
{
pathParams: GetWaterfallV4PathParameters;
data?: BodyType<SpantypesPostableWaterfallDTO>;
},
TContext
>;
}): UseMutationResult<
Awaited<ReturnType<typeof getWaterfallV4>>,
TError,
{
pathParams: GetWaterfallV4PathParameters;
data?: BodyType<SpantypesPostableWaterfallDTO>;
},
TContext
> => {
return useMutation(getGetWaterfallV4MutationOptions(options));
};

View File

@@ -29,5 +29,42 @@ func (provider *provider) addTraceDetailRoutes(router *mux.Router) error {
return err
}
if err := router.Handle("/api/v4/traces/{traceID}/waterfall", handler.New(
provider.authzMiddleware.ViewAccess(provider.traceDetailHandler.GetWaterfallV4),
handler.OpenAPIDef{
ID: "GetWaterfallV4",
Tags: []string{"tracedetail"},
Summary: "Get waterfall view for a trace",
Description: "Returns the waterfall view of spans including all spans if total spans are under a limit, a max count otherwise. Aggregations are dropped compared to v3",
Request: new(spantypes.PostableWaterfall),
RequestContentType: "application/json",
Response: new(spantypes.GettableWaterfallTrace),
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusOK,
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound},
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
},
)).Methods(http.MethodPost).GetError(); err != nil {
return err
}
if err := router.Handle("/api/v3/traces/{traceID}/flamegraph", handler.New(
provider.authzMiddleware.ViewAccess(provider.traceDetailHandler.GetFlamegraph),
handler.OpenAPIDef{
ID: "GetFlamegraph",
Tags: []string{"tracedetail"},
Summary: "Get flamegraph view for a trace",
Description: "Returns the flamegraph view of spans for a given trace ID.",
Request: new(spantypes.PostableFlamegraph),
RequestContentType: "application/json",
Response: new(spantypes.GettableFlamegraphTrace),
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusOK,
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound},
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
},
)).Methods(http.MethodPost).GetError(); err != nil {
return err
}
return nil
}

View File

@@ -6,7 +6,16 @@ import (
)
type Config struct {
Waterfall WaterfallConfig `mapstructure:"waterfall"`
Waterfall WaterfallConfig `mapstructure:"waterfall"`
Flamegraph FlamegraphConfig `mapstructure:"flamegraph"`
}
type FlamegraphConfig struct {
MaxSelectedLevels int `mapstructure:"max_selected_levels"`
MaxSpansPerLevel int `mapstructure:"max_spans_per_level"`
SamplingTopLatencySpansCount int `mapstructure:"sampling_top_latency_count"`
SamplingBucketCount int `mapstructure:"sampling_bucket_count"`
SelectAllSpansLimit uint `mapstructure:"select_all_spans_limit"`
}
type WaterfallConfig struct {
@@ -29,6 +38,13 @@ func newConfig() factory.Config {
MaxDepthToAutoExpand: 5,
MaxLimitToSelectAllSpans: 10_000,
},
Flamegraph: FlamegraphConfig{
MaxSelectedLevels: 50,
MaxSpansPerLevel: 100,
SamplingTopLatencySpansCount: 5,
SamplingBucketCount: 50,
SelectAllSpansLimit: 100_000,
},
}
}
@@ -45,5 +61,25 @@ func (c Config) Validate() error {
return errors.NewInvalidInputf(errors.CodeInvalidInput,
"tracedetail.waterfall.max_limit_to_select_all_spans must be positive")
}
if c.Flamegraph.MaxSelectedLevels <= 0 {
return errors.NewInvalidInputf(errors.CodeInvalidInput,
"tracedetail.flamegraph.level_limit must be positive, got %d", c.Flamegraph.MaxSelectedLevels)
}
if c.Flamegraph.MaxSpansPerLevel <= 0 {
return errors.NewInvalidInputf(errors.CodeInvalidInput,
"tracedetail.flamegraph.spans_per_level must be positive, got %d", c.Flamegraph.MaxSpansPerLevel)
}
if c.Flamegraph.SamplingTopLatencySpansCount < 0 {
return errors.NewInvalidInputf(errors.CodeInvalidInput,
"tracedetail.flamegraph.top_latency_count cannot be negative, got %d", c.Flamegraph.SamplingTopLatencySpansCount)
}
if c.Flamegraph.SamplingBucketCount <= 0 {
return errors.NewInvalidInputf(errors.CodeInvalidInput,
"tracedetail.flamegraph.bucket_count must be positive, got %d", c.Flamegraph.SamplingBucketCount)
}
if c.Flamegraph.SelectAllSpansLimit == 0 {
return errors.NewInvalidInputf(errors.CodeInvalidInput,
"tracedetail.flamegraph.max_limit_to_select_all_spans must be positive")
}
return nil
}

View File

@@ -38,3 +38,40 @@ func (h *handler) GetWaterfall(rw http.ResponseWriter, r *http.Request) {
render.Success(rw, http.StatusOK, result)
}
func (h *handler) GetWaterfallV4(rw http.ResponseWriter, r *http.Request) {
req := new(spantypes.PostableWaterfall)
if err := binding.JSON.BindBody(r.Body, req); err != nil {
render.Error(rw, err)
return
}
if err := req.Validate(); err != nil {
render.Error(rw, err)
return
}
result, err := h.module.GetWaterfallV4(r.Context(), mux.Vars(r)["traceID"], req.SelectedSpanID, req.UncollapsedSpans, req.Limit)
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusOK, result)
}
func (h *handler) GetFlamegraph(rw http.ResponseWriter, r *http.Request) {
req := new(spantypes.PostableFlamegraph)
if err := binding.JSON.BindBody(r.Body, req); err != nil {
render.Error(rw, err)
return
}
result, err := h.module.GetFlamegraph(r.Context(), mux.Vars(r)["traceID"], req)
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusOK, result)
}

View File

@@ -2,6 +2,7 @@ package impltracedetail
import (
"context"
"time"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/modules/tracedetail"
@@ -45,7 +46,7 @@ func (m *module) GetWaterfall(ctx context.Context, traceID string, req *spantype
return spantypes.NewGettableWaterfallTrace(waterfallTrace, selectedSpans, uncollapsedSpans, selectedAllSpans, aggregationResults), nil
}
// getTraceData returns the waterfall cache for the given traceID with fallback on DB.
// getTraceData fetches all spans for a trace and builds the WaterfallTrace.
func (m *module) getTraceData(ctx context.Context, traceID string) (*spantypes.WaterfallTrace, error) {
summary, err := m.store.GetTraceSummary(ctx, traceID)
if err != nil {
@@ -61,6 +62,144 @@ func (m *module) getTraceData(ctx context.Context, traceID string) (*spantypes.W
return nil, spantypes.ErrTraceNotFound
}
traceData := spantypes.NewWaterfallTraceFromSpans(spanItems)
return traceData, nil
nodes := make([]*spantypes.WaterfallSpan, len(spanItems))
for i := range spanItems {
nodes[i] = spanItems[i].ToWaterfallSpan(traceID)
}
return spantypes.NewWaterfallTraceFromSpans(nodes), nil
}
// GetWaterfallV4 is the OOM-safe V4 waterfall.
// For large traces (NumSpans > effectiveLimit) it uses a two-step fetch:
// minimal fields for all spans to build the tree, then full fields for the
// visible window only. Aggregations are not returned.
func (m *module) GetWaterfallV4(ctx context.Context, traceID string, selectedSpanID string, uncollapsedSpans []string, selectAllLimit uint) (*spantypes.GettableWaterfallTrace, error) {
summary, err := m.store.GetTraceSummary(ctx, traceID)
if err != nil {
return nil, err
}
effectiveLimit := min(selectAllLimit, m.config.Waterfall.MaxLimitToSelectAllSpans)
if summary.NumSpans > uint64(effectiveLimit) {
return m.getWindowedWaterfall(ctx, traceID, selectedSpanID, uncollapsedSpans, summary.Start, summary.End)
}
return m.getFullWaterfall(ctx, traceID, summary)
}
func (m *module) getFullWaterfall(ctx context.Context, traceID string, summary *spantypes.TraceSummary) (*spantypes.GettableWaterfallTrace, error) {
spanItems, err := m.store.GetTraceSpans(ctx, traceID, summary)
if err != nil {
return nil, err
}
if len(spanItems) == 0 {
return nil, spantypes.ErrTraceNotFound
}
nodes := make([]*spantypes.WaterfallSpan, len(spanItems))
for i := range spanItems {
nodes[i] = spanItems[i].ToWaterfallSpan(traceID)
}
waterfallTrace := spantypes.NewWaterfallTraceFromSpans(nodes)
selectedSpans := waterfallTrace.GetAllSpans()
return spantypes.NewGettableWaterfallTrace(waterfallTrace, selectedSpans, nil, true, nil), nil
}
func (m *module) GetFlamegraph(ctx context.Context, traceID string, req *spantypes.PostableFlamegraph) (*spantypes.GettableFlamegraphTrace, error) {
summary, err := m.store.GetTraceSummary(ctx, traceID)
if err != nil {
return nil, err
}
if summary.NumSpans <= uint64(m.config.Flamegraph.SelectAllSpansLimit) {
return m.getFullFlamegraph(ctx, traceID, summary)
}
return m.getWindowedFlamegraph(ctx, traceID, req.SelectedSpanID, summary)
}
func (m *module) getFullFlamegraph(ctx context.Context, traceID string, summary *spantypes.TraceSummary) (*spantypes.GettableFlamegraphTrace, error) {
fullSpans, err := m.store.GetTraceSpans(ctx, traceID, summary)
if err != nil {
return nil, err
}
if len(fullSpans) == 0 {
return nil, spantypes.ErrTraceNotFound
}
flamegraphTrace := spantypes.NewFlamegraphTraceFromStorable(fullSpans)
return spantypes.NewGettableFlamegraphTrace(
flamegraphTrace.GetAllLevels(),
summary.Start.UnixMilli(), summary.End.UnixMilli(), false,
), nil
}
// getWindowedFlamegraph returns a window of a max levels and max sampled spans per level around the selected span
func (m *module) getWindowedFlamegraph(ctx context.Context, traceID, selectedSpanID string, summary *spantypes.TraceSummary) (*spantypes.GettableFlamegraphTrace, error) {
minimalSpans, err := m.store.GetMinimalSpans(ctx, traceID, summary.Start, summary.End)
if err != nil {
return nil, err
}
if len(minimalSpans) == 0 {
return nil, spantypes.ErrTraceNotFound
}
flamegraphTrace := spantypes.NewFlamegraphTraceFromMinimal(minimalSpans)
minimalSpans = nil
cfg := m.config.Flamegraph
selectedSpans := flamegraphTrace.GetSelectedLevels(selectedSpanID,
cfg.MaxSelectedLevels, cfg.MaxSpansPerLevel, cfg.SamplingTopLatencySpansCount, cfg.SamplingBucketCount)
if len(selectedSpans) == 0 {
return nil, spantypes.ErrTraceNotFound
}
fullSpans, err := m.store.GetTraceSpansByIDs(ctx, traceID, summary.Start, summary.End,
spantypes.FlamegraphWindowSpanIDs(selectedSpans))
if err != nil {
return nil, err
}
return spantypes.NewGettableFlamegraphTrace(
flamegraphTrace.EnrichSelectedSpans(selectedSpans, fullSpans),
summary.Start.UnixMilli(), summary.End.UnixMilli(), true,
), nil
}
// getWindowedWaterfall builds the waterfall tree with minimal data and then returns only a window of full spans.
func (m *module) getWindowedWaterfall(ctx context.Context, traceID, selectedSpanID string, uncollapsedSpans []string, start, end time.Time) (*spantypes.GettableWaterfallTrace, error) {
// Step 1: minimal fetch → build full tree → select visible window
minimalSpans, err := m.store.GetMinimalSpans(ctx, traceID, start, end)
if err != nil {
return nil, err
}
if len(minimalSpans) == 0 {
return nil, spantypes.ErrTraceNotFound
}
nodes := make([]*spantypes.WaterfallSpan, len(minimalSpans))
for i := range minimalSpans {
nodes[i] = minimalSpans[i].ToWaterfallSpan(traceID)
}
waterfallTrace := spantypes.NewWaterfallTraceFromSpans(nodes)
selectedSpans, uncollapsedSpans := waterfallTrace.GetSelectedSpans(
uncollapsedSpans,
selectedSpanID,
m.config.Waterfall.SpanPageSize,
m.config.Waterfall.MaxDepthToAutoExpand,
)
// Step 2: full fetch for the selected window only
spanIDs := make([]string, len(selectedSpans))
for i, s := range selectedSpans {
spanIDs[i] = s.SpanID
}
fullSpans, err := m.store.GetTraceSpansByIDs(ctx, traceID, start, end, spanIDs)
if err != nil {
return nil, err
}
spantypes.EnrichSelectedSpans(selectedSpans, fullSpans)
return spantypes.NewGettableWaterfallTrace(
waterfallTrace, selectedSpans, uncollapsedSpans, false, nil,
), nil
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"time"
sqlbuilder "github.com/huandu/go-sqlbuilder"
@@ -12,6 +13,8 @@ import (
"github.com/SigNoz/signoz/pkg/types/spantypes"
)
const colServiceName = `resource_string_service$$$$name` // $ gets escaped so $$$$ converts to $$.
type traceStore struct {
telemetryStore telemetrystore.TelemetryStore
}
@@ -45,8 +48,8 @@ func (s *traceStore) GetTraceSpans(ctx context.Context, traceID string, summary
// DISTINCT ON (span_id) is ClickHouse-specific syntax not supported by sqlbuilder
query := fmt.Sprintf(`
SELECT DISTINCT ON (span_id)
timestamp, duration_nano, span_id, trace_id, has_error, kind,
resource_string_service$$name, name, links as references,
timestamp, duration_nano, span_id, has_error, kind,
resource_string_service$$name, name,
attributes_string, attributes_number, attributes_bool, resources_string,
events, status_message, status_code_string, kind_string, parent_span_id,
flags, is_remote, trace_state, status_code,
@@ -69,3 +72,64 @@ func (s *traceStore) GetTraceSpans(ctx context.Context, traceID string, summary
}
return spanItems, nil
}
func (s *traceStore) GetMinimalSpans(ctx context.Context, traceID string, start, end time.Time) ([]spantypes.MinimalSpan, error) {
sb := sqlbuilder.NewSelectBuilder()
sb.Select(
"DISTINCT ON (span_id) span_id",
"parent_span_id", "timestamp", "duration_nano", "has_error",
colServiceName,
)
sb.From(fmt.Sprintf("%s.%s", spantypes.TraceDB, spantypes.TraceTable))
sb.Where(
sb.E("trace_id", traceID),
sb.GE("ts_bucket_start", start.Unix()-1800),
sb.LE("ts_bucket_start", end.Unix()),
)
sb.OrderByAsc("timestamp")
sb.OrderByAsc("name")
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
var spans []spantypes.MinimalSpan
if err := s.telemetryStore.ClickhouseDB().Select(ctx, &spans, query, args...); err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "error querying minimal spans")
}
return spans, nil
}
func (s *traceStore) GetTraceSpansByIDs(ctx context.Context, traceID string, start, end time.Time, spanIDs []string) ([]spantypes.StorableSpan, error) {
if len(spanIDs) == 0 {
return []spantypes.StorableSpan{}, nil
}
sb := sqlbuilder.NewSelectBuilder()
sb.Select(
"DISTINCT ON (span_id) timestamp",
"duration_nano", "span_id", "has_error", "kind",
colServiceName, "name",
"attributes_string", "attributes_number", "attributes_bool", "resources_string",
"events", "status_message", "status_code_string", "kind_string", "parent_span_id",
"flags", "is_remote", "trace_state", "status_code",
"db_name", "db_operation", "http_method", "http_url", "http_host",
"external_http_method", "external_http_url", "response_status_code",
)
sb.From(fmt.Sprintf("%s.%s", spantypes.TraceDB, spantypes.TraceTable))
ids := make([]any, len(spanIDs))
for i, id := range spanIDs {
ids[i] = id
}
sb.Where(
sb.E("trace_id", traceID),
sb.In("span_id", ids...),
sb.GE("ts_bucket_start", start.Unix()-1800),
sb.LE("ts_bucket_start", end.Unix()),
)
sb.OrderByAsc("timestamp")
sb.OrderByAsc("name")
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
var spans []spantypes.StorableSpan
if err := s.telemetryStore.ClickhouseDB().Select(ctx, &spans, query, args...); err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "error querying trace spans by IDs")
}
return spans, nil
}

View File

@@ -10,9 +10,13 @@ import (
// Handler exposes HTTP handlers for trace detail APIs.
type Handler interface {
GetWaterfall(http.ResponseWriter, *http.Request)
GetWaterfallV4(http.ResponseWriter, *http.Request)
GetFlamegraph(http.ResponseWriter, *http.Request)
}
// Module defines the business logic for trace detail operations.
type Module interface {
GetWaterfall(ctx context.Context, traceID string, req *spantypes.PostableWaterfall) (*spantypes.GettableWaterfallTrace, error)
GetWaterfallV4(ctx context.Context, traceID string, selectedSpanID string, uncollapsedSpans []string, selectAllLimit uint) (*spantypes.GettableWaterfallTrace, error)
GetFlamegraph(ctx context.Context, traceID string, req *spantypes.PostableFlamegraph) (*spantypes.GettableFlamegraphTrace, error)
}

View File

@@ -19,6 +19,8 @@ import (
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
const traceOutsideRangeWarn = "Query %s references a trace_id that exists between %s and %s (UTC) but lies outside the selected time range; adjust the time range to see results"
type builderQuery[T any] struct {
logger *slog.Logger
telemetryStore telemetrystore.TelemetryStore
@@ -199,7 +201,21 @@ func (q *builderQuery[T]) Execute(ctx context.Context) (*qbtypes.Result, error)
return q.executeWindowList(ctx)
}
stmt, err := q.stmtBuilder.Build(ctx, q.fromMS, q.toMS, q.kind, q.spec, q.variables)
fromMS, toMS := q.fromMS, q.toMS
if q.spec.Signal == telemetrytypes.SignalTraces || q.spec.Signal == telemetrytypes.SignalLogs {
var overlap bool
var warning string
fromMS, toMS, overlap, warning = q.narrowWindowByTraceID(ctx, fromMS, toMS)
if !overlap {
res := emptyResultFor(q.kind, q.spec.Name)
if warning != "" {
res.Warnings = []string{warning}
}
return res, nil
}
}
stmt, err := q.stmtBuilder.Build(ctx, fromMS, toMS, q.kind, q.spec, q.variables)
if err != nil {
return nil, err
}
@@ -215,6 +231,88 @@ func (q *builderQuery[T]) Execute(ctx context.Context) (*qbtypes.Result, error)
return result, nil
}
// narrowWindowByTraceID inspects the filter for trace_id predicates and clamps
// [fromMS,toMS] to the time range stored in signoz_traces.distributed_trace_summary.
// Returns the (possibly narrowed) window, overlap=false when the trace lies
// completely outside the query window (callers should short-circuit), and a
// warning string the caller should attach to the empty result when the trace
// exists but is outside the selected window.
//
// When the trace_id is not present in trace_summary the behaviour differs by
// signal:
// - traces: trace_summary is derived from the spans table, so a missing row
// means no spans exist for that trace_id; we short-circuit to empty.
// - logs: logs can carry a trace_id even when traces are not ingested at all
// (e.g. traces disabled). We must not short-circuit; instead leave the
// window untouched and let the query run.
func (q *builderQuery[T]) narrowWindowByTraceID(ctx context.Context, fromMS, toMS uint64) (uint64, uint64, bool, string) {
if q.spec.Filter == nil || q.spec.Filter.Expression == "" {
return fromMS, toMS, true, ""
}
traceIDs, found := telemetrytraces.ExtractTraceIDsFromFilter(q.spec.Filter.Expression)
if !found || len(traceIDs) == 0 {
return fromMS, toMS, true, ""
}
finder := telemetrytraces.NewTraceTimeRangeFinder(q.telemetryStore)
traceStart, traceEnd, exists, err := finder.GetTraceTimeRangeMulti(ctx, traceIDs)
if err != nil {
return fromMS, toMS, true, ""
}
if !exists {
if q.spec.Signal == telemetrytypes.SignalTraces {
q.logger.DebugContext(ctx, "trace_id not found in trace_summary; short-circuiting traces query to empty",
slog.Any("trace_ids", traceIDs))
return fromMS, toMS, false, ""
}
q.logger.DebugContext(ctx, "trace_id not found in trace_summary; leaving time range untouched for logs",
slog.Any("trace_ids", traceIDs))
return fromMS, toMS, true, ""
}
traceStartMS := uint64(traceStart) / 1_000_000
traceEndMS := uint64(traceEnd) / 1_000_000
if traceStartMS == 0 || traceEndMS == 0 {
return fromMS, toMS, true, ""
}
if traceStartMS > toMS || traceEndMS < fromMS {
traceStartUTC := time.UnixMilli(int64(traceStartMS)).UTC().Format(time.RFC3339)
traceEndUTC := time.UnixMilli(int64(traceEndMS)).UTC().Format(time.RFC3339)
return fromMS, toMS, false, fmt.Sprintf(traceOutsideRangeWarn, q.spec.Name, traceStartUTC, traceEndUTC)
}
if traceStartMS > fromMS {
fromMS = traceStartMS
}
if traceEndMS < toMS {
toMS = traceEndMS
}
q.logger.DebugContext(ctx, "optimized time range using trace_id lookup",
slog.String("signal", q.spec.Signal.StringValue()),
slog.Any("trace_ids", traceIDs),
slog.Uint64("start", fromMS),
slog.Uint64("end", toMS))
return fromMS, toMS, true, ""
}
// emptyResultFor returns an empty result payload appropriate for the given kind.
func emptyResultFor(kind qbtypes.RequestType, queryName string) *qbtypes.Result {
var value any
switch kind {
case qbtypes.RequestTypeTimeSeries:
value = &qbtypes.TimeSeriesData{QueryName: queryName}
case qbtypes.RequestTypeScalar:
value = &qbtypes.ScalarData{QueryName: queryName}
default:
value = &qbtypes.RawData{QueryName: queryName}
}
return &qbtypes.Result{
Type: kind,
Value: value,
}
}
// executeWithContext executes the query with query window and step context for partial value detection.
func (q *builderQuery[T]) executeWithContext(ctx context.Context, query string, args []any) (*qbtypes.Result, error) {
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
@@ -310,42 +408,27 @@ func (q *builderQuery[T]) executeWindowList(ctx context.Context) (*qbtypes.Resul
totalBytes := uint64(0)
start := time.Now()
// Check if filter contains trace_id(s) and optimize time range if needed
if q.spec.Signal == telemetrytypes.SignalTraces &&
q.spec.Filter != nil && q.spec.Filter.Expression != "" {
traceIDs, found := telemetrytraces.ExtractTraceIDsFromFilter(q.spec.Filter.Expression)
if found && len(traceIDs) > 0 {
finder := telemetrytraces.NewTraceTimeRangeFinder(q.telemetryStore)
traceStart, traceEnd, ok := finder.GetTraceTimeRangeMulti(ctx, traceIDs)
traceStartMS := uint64(traceStart) / 1_000_000
traceEndMS := uint64(traceEnd) / 1_000_000
if !ok {
q.logger.DebugContext(ctx, "failed to get trace time range", slog.Any("trace_ids", traceIDs))
} else if traceStartMS > 0 && traceEndMS > 0 {
// no overlap — nothing to return
if uint64(traceStartMS) > toMS || uint64(traceEndMS) < fromMS {
return &qbtypes.Result{
Type: qbtypes.RequestTypeRaw,
Value: &qbtypes.RawData{
QueryName: q.spec.Name,
},
Stats: qbtypes.ExecStats{
DurationMS: uint64(time.Since(start).Milliseconds()),
},
}, nil
}
// clamp window to trace time range before bucketing
if uint64(traceStartMS) > fromMS {
fromMS = uint64(traceStartMS)
}
if uint64(traceEndMS) < toMS {
toMS = uint64(traceEndMS)
}
q.logger.DebugContext(ctx, "optimized time range for traces", slog.Any("trace_ids", traceIDs), slog.Uint64("start", fromMS), slog.Uint64("end", toMS))
// Check if filter contains trace_id(s) and optimize time range if needed.
// Applies to both traces (the listing this branch was built for) and logs
// (which carry trace_id and benefit from the same clamp before bucketing).
if q.spec.Signal == telemetrytypes.SignalTraces || q.spec.Signal == telemetrytypes.SignalLogs {
var overlap bool
var warning string
fromMS, toMS, overlap, warning = q.narrowWindowByTraceID(ctx, fromMS, toMS)
if !overlap {
res := &qbtypes.Result{
Type: qbtypes.RequestTypeRaw,
Value: &qbtypes.RawData{
QueryName: q.spec.Name,
},
Stats: qbtypes.ExecStats{
DurationMS: uint64(time.Since(start).Milliseconds()),
},
}
if warning != "" {
res.Warnings = []string{warning}
}
return res, nil
}
}

View File

@@ -21,19 +21,19 @@ func NewTraceTimeRangeFinder(telemetryStore telemetrystore.TelemetryStore) *Trac
}
}
func (f *TraceTimeRangeFinder) GetTraceTimeRange(ctx context.Context, traceID string) (startNano, endNano int64, ok bool) {
func (f *TraceTimeRangeFinder) GetTraceTimeRange(ctx context.Context, traceID string) (startNano, endNano int64, exists bool, error error) {
traceIDs := []string{traceID}
return f.GetTraceTimeRangeMulti(ctx, traceIDs)
}
func (f *TraceTimeRangeFinder) GetTraceTimeRangeMulti(ctx context.Context, traceIDs []string) (startNano, endNano int64, ok bool) {
func (f *TraceTimeRangeFinder) GetTraceTimeRangeMulti(ctx context.Context, traceIDs []string) (startNano, endNano int64, exists bool, error error) {
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
instrumentationtypes.TelemetrySignal: telemetrytypes.SignalTraces.StringValue(),
instrumentationtypes.CodeNamespace: "trace-time-range",
instrumentationtypes.CodeFunctionName: "GetTraceTimeRangeMulti",
})
if len(traceIDs) == 0 {
return 0, 0, false
return 0, 0, false, nil
}
cleanedIDs := make([]string, len(traceIDs))
@@ -49,7 +49,8 @@ func (f *TraceTimeRangeFinder) GetTraceTimeRangeMulti(ctx context.Context, trace
}
query := fmt.Sprintf(`
SELECT
SELECT
count(),
toUnixTimestamp64Nano(min(start)),
toUnixTimestamp64Nano(max(end))
FROM %s.%s
@@ -58,9 +59,14 @@ func (f *TraceTimeRangeFinder) GetTraceTimeRangeMulti(ctx context.Context, trace
row := f.telemetryStore.ClickhouseDB().QueryRow(ctx, query, args...)
err := row.Scan(&startNano, &endNano)
var rowCount uint64
err := row.Scan(&rowCount, &startNano, &endNano)
if err != nil {
return 0, 0, false
return 0, 0, false, err
}
if rowCount == 0 {
return 0, 0, false, nil
}
if startNano > 1_000_000_000 {
@@ -68,5 +74,5 @@ func (f *TraceTimeRangeFinder) GetTraceTimeRangeMulti(ctx context.Context, trace
}
endNano += 1_000_000_000
return startNano, endNano, true
return startNano, endNano, true, nil
}

View File

@@ -43,7 +43,7 @@ func TestGetTraceTimeRangeMulti(t *testing.T) {
finder := &TraceTimeRangeFinder{telemetryStore: nil}
if !tt.expectOK {
_, _, ok := finder.GetTraceTimeRangeMulti(ctx, tt.traceIDs)
_, _, ok, _ := finder.GetTraceTimeRangeMulti(ctx, tt.traceIDs)
assert.False(t, ok)
}
})

View File

@@ -0,0 +1,81 @@
package spantypes
import (
"maps"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
type FlamegraphSpan struct {
SpanID string `json:"spanId"`
ParentSpanID string `json:"parentSpanId"`
Timestamp uint64 `json:"timestamp"`
DurationNano uint64 `json:"durationNano"`
HasError bool `json:"hasError"`
ServiceName string `json:"serviceName"`
Name string `json:"name"`
Level int64 `json:"level"`
Events []Event `json:"event"`
Attributes map[string]any `json:"attributes,omitempty"`
Resource map[string]string `json:"resource,omitempty"`
Children []*FlamegraphSpan `json:"-"` // internal tree use only
}
// FlamegraphLevel groups span IDs at a single level within the selected window.
type FlamegraphLevel struct {
Level int64
SpanIDs []string
}
type PostableFlamegraph struct {
SelectedSpanID string `json:"selectedSpanId"`
SelectFields []telemetrytypes.TelemetryFieldKey `json:"selectFields,omitempty"`
}
// GettableFlamegraphTrace is the response for the v3 flamegraph API.
type GettableFlamegraphTrace struct {
Spans [][]*FlamegraphSpan `json:"spans"`
StartTimestampMillis int64 `json:"startTimestampMillis"`
EndTimestampMillis int64 `json:"endTimestampMillis"`
HasMore bool `json:"hasMore"`
}
func NewGettableFlamegraphTrace(spans [][]*FlamegraphSpan, startMs, endMs int64, hasMore bool) *GettableFlamegraphTrace {
return &GettableFlamegraphTrace{
Spans: spans,
StartTimestampMillis: startMs,
EndTimestampMillis: endMs,
HasMore: hasMore,
}
}
func NewFlamegraphSpanFromStorable(s *StorableSpan, level int64) *FlamegraphSpan {
resources := make(map[string]string, len(s.ResourcesString))
maps.Copy(resources, s.ResourcesString)
return &FlamegraphSpan{
SpanID: s.SpanID,
ParentSpanID: s.ParentSpanID,
Timestamp: uint64(s.StartTime.UnixNano()),
DurationNano: s.DurationNano,
HasError: s.HasError,
ServiceName: s.ServiceName,
Name: s.Name,
Level: level,
Events: s.UnmarshalledEvents(),
Attributes: s.Attributes(),
Resource: resources,
}
}
// FlamegraphWindowSpanIDs collects all span IDs from a level window into a flat slice.
func FlamegraphWindowSpanIDs(window []FlamegraphLevel) []string {
total := 0
for _, lvl := range window {
total += len(lvl.SpanIDs)
}
ids := make([]string, 0, total)
for _, lvl := range window {
ids = append(ids, lvl.SpanIDs...)
}
return ids
}

View File

@@ -0,0 +1,279 @@
package spantypes
import (
"sort"
)
// FlamegraphTrace holds the level wise tree built from minimal spans.
type FlamegraphTrace struct {
roots []*FlamegraphSpan
nodeByID map[string]*FlamegraphSpan
startTime uint64
endTime uint64
}
func NewFlamegraphTraceFromMinimal(spans []MinimalSpan) *FlamegraphTrace {
t := &FlamegraphTrace{
nodeByID: make(map[string]*FlamegraphSpan, len(spans)),
}
for i := range spans {
node := spans[i].ToFlamegraphSpan()
t.updateTimeRange(node.Timestamp, node.DurationNano)
t.nodeByID[node.SpanID] = node
}
t.wireTree()
return t
}
func NewFlamegraphTraceFromStorable(spans []StorableSpan) *FlamegraphTrace {
t := &FlamegraphTrace{
nodeByID: make(map[string]*FlamegraphSpan, len(spans)),
}
for i := range spans {
node := NewFlamegraphSpanFromStorable(&spans[i], 0) // level is set later by BFS
t.updateTimeRange(node.Timestamp, node.DurationNano)
t.nodeByID[node.SpanID] = node
}
t.wireTree()
return t
}
func (t *FlamegraphTrace) GetAllLevels() [][]*FlamegraphSpan {
allLevels := t.buildAllLevels()
for _, node := range t.nodeByID {
node.Children = nil // children not required after building tree
}
return allLevels
}
// GetSelectedLevels returns the level window for selectedSpanID with sampling applied to
// dense levels. It always applies windowing — callers should only invoke this when the
// trace is known to exceed the select-all limit.
// Children are cleared after traversal so the tree can be GC'd.
func (t *FlamegraphTrace) GetSelectedLevels(
selectedSpanID string,
levelLimit, spansPerLevel, topLatencyCount, bucketCount int,
) []FlamegraphLevel {
allLevels := t.buildAllLevels()
for _, node := range t.nodeByID {
node.Children = nil
}
selectedIndex := 0
if selectedSpanID != "" {
outer:
for i, lvl := range allLevels {
for _, span := range lvl {
if span.SpanID == selectedSpanID {
selectedIndex = i
break outer
}
}
}
}
lowerLimit := selectedIndex - int(float64(levelLimit)*0.4)
upperLimit := selectedIndex + int(float64(levelLimit)*0.6)
if lowerLimit < 0 {
upperLimit -= lowerLimit
lowerLimit = 0
}
if upperLimit > len(allLevels) {
lowerLimit -= upperLimit - len(allLevels)
upperLimit = len(allLevels)
}
if lowerLimit < 0 {
lowerLimit = 0
}
result := make([]FlamegraphLevel, 0, upperLimit-lowerLimit)
for i := lowerLimit; i < upperLimit; i++ {
lvl := allLevels[i]
if len(lvl) == 0 {
continue
}
var sampled []*FlamegraphSpan
if len(lvl) > spansPerLevel {
sampled = sampleFlamegraphLevel(lvl, selectedSpanID, i == selectedIndex,
t.startTime, t.endTime, topLatencyCount, bucketCount)
} else {
sampled = lvl
}
if len(sampled) == 0 {
continue
}
spanIDs := make([]string, len(sampled))
for j, s := range sampled {
spanIDs[j] = s.SpanID
}
result = append(result, FlamegraphLevel{
Level: sampled[0].Level,
SpanIDs: spanIDs,
})
}
return result
}
func (t *FlamegraphTrace) EnrichSelectedSpans(selectedSpans []FlamegraphLevel, fullSpans []StorableSpan) [][]*FlamegraphSpan {
fullByID := make(map[string]*StorableSpan, len(fullSpans))
for i := range fullSpans {
fullByID[fullSpans[i].SpanID] = &fullSpans[i]
}
result := make([][]*FlamegraphSpan, len(selectedSpans))
for i, lvl := range selectedSpans {
result[i] = make([]*FlamegraphSpan, 0, len(lvl.SpanIDs))
for _, spanID := range lvl.SpanIDs {
if full, ok := fullByID[spanID]; ok {
result[i] = append(result[i], NewFlamegraphSpanFromStorable(full, lvl.Level))
} else if lean, ok := t.nodeByID[spanID]; ok {
result[i] = append(result[i], lean)
}
}
}
return result
}
func (t *FlamegraphTrace) updateTimeRange(timestamp, durationNano uint64) {
if t.startTime == 0 || timestamp < t.startTime {
t.startTime = timestamp
}
if end := timestamp + durationNano; end > t.endTime {
t.endTime = end
}
}
func (t *FlamegraphTrace) wireTree() {
for _, node := range t.nodeByID {
if node.ParentSpanID != "" {
if parent, ok := t.nodeByID[node.ParentSpanID]; ok {
parent.Children = append(parent.Children, node)
} else {
missing := &FlamegraphSpan{
SpanID: node.ParentSpanID,
Name: "Missing Span",
Timestamp: node.Timestamp,
DurationNano: node.DurationNano,
Children: []*FlamegraphSpan{node},
}
t.nodeByID[missing.SpanID] = missing
t.roots = append(t.roots, missing)
}
} else if flamegraphSpanIndex(t.roots, node.SpanID) == -1 {
t.roots = append(t.roots, node)
}
}
sort.Slice(t.roots, func(i, j int) bool {
if t.roots[i].Timestamp == t.roots[j].Timestamp {
return t.roots[i].SpanID < t.roots[j].SpanID
}
return t.roots[i].Timestamp < t.roots[j].Timestamp
})
}
func (t *FlamegraphTrace) buildAllLevels() [][]*FlamegraphSpan {
var result [][]*FlamegraphSpan
type entry struct {
node *FlamegraphSpan
depth int64
}
for _, root := range t.roots {
levelMap := make(map[int64][]*FlamegraphSpan)
maxDepth := int64(-1)
queue := []entry{{root, 0}}
for len(queue) > 0 {
curr := queue[0]
queue = queue[1:]
curr.node.Level = curr.depth
levelMap[curr.depth] = append(levelMap[curr.depth], curr.node)
if curr.depth > maxDepth {
maxDepth = curr.depth
}
for _, child := range curr.node.Children {
queue = append(queue, entry{child, curr.depth + 1})
}
}
for depth := int64(0); depth <= maxDepth; depth++ {
if spans, ok := levelMap[depth]; ok {
result = append(result, spans)
}
}
}
return result
}
func sampleFlamegraphLevel(
spans []*FlamegraphSpan,
selectedSpanID string,
isSelectedLevel bool,
startTime, endTime uint64,
topLatencyCount, bucketCount int,
) []*FlamegraphSpan {
sorted := make([]*FlamegraphSpan, len(spans))
copy(sorted, spans)
sort.Slice(sorted, func(i, j int) bool {
return sorted[i].DurationNano > sorted[j].DurationNano
})
var sampled []*FlamegraphSpan
topK := topLatencyCount
if topK > len(sorted) {
topK = len(sorted)
}
sampled = append(sampled, sorted[:topK]...)
if isSelectedLevel {
for _, span := range sorted {
if span.SpanID == selectedSpanID {
sampled = append(sampled, span)
break
}
}
}
bucketSize := (endTime - startTime) / uint64(bucketCount)
if bucketSize == 0 {
bucketSize = 1
}
buckets := make([][]*FlamegraphSpan, bucketCount)
for _, span := range sorted {
if span.Timestamp < startTime || span.Timestamp > endTime {
continue
}
idx := int((span.Timestamp - startTime) / bucketSize)
if idx < 0 {
idx = 0
} else if idx >= bucketCount {
idx = bucketCount - 1
}
buckets[idx] = append(buckets[idx], span)
}
for i := range buckets {
if len(buckets[i]) > 2 {
buckets[i] = buckets[i][:2]
}
}
for _, bucket := range buckets {
sampled = append(sampled, bucket...)
}
return sampled
}
func flamegraphSpanIndex(spans []*FlamegraphSpan, spanID string) int {
for i, s := range spans {
if s != nil && s.SpanID == spanID {
return i
}
}
return -1
}

View File

@@ -2,6 +2,7 @@ package spantypes
import (
"context"
"time"
"github.com/SigNoz/signoz/pkg/valuer"
)
@@ -26,4 +27,6 @@ type SpanMapperStore interface {
type TraceStore interface {
GetTraceSummary(ctx context.Context, traceID string) (*TraceSummary, error)
GetTraceSpans(ctx context.Context, traceID string, summary *TraceSummary) ([]StorableSpan, error)
GetMinimalSpans(ctx context.Context, traceID string, start, end time.Time) ([]MinimalSpan, error)
GetTraceSpansByIDs(ctx context.Context, traceID string, start, end time.Time, spanIDs []string) ([]StorableSpan, error)
}

View File

@@ -103,12 +103,10 @@ type StorableSpan struct {
StartTime time.Time `ch:"timestamp"`
DurationNano uint64 `ch:"duration_nano"`
SpanID string `ch:"span_id"`
TraceID string `ch:"trace_id"`
HasError bool `ch:"has_error"`
Kind int8 `ch:"kind"`
ServiceName string `ch:"resource_string_service$$name"`
Name string `ch:"name"`
References string `ch:"references"`
AttributesString map[string]string `ch:"attributes_string"`
AttributesNumber map[string]float64 `ch:"attributes_number"`
AttributesBool map[string]bool `ch:"attributes_bool"`
@@ -132,6 +130,44 @@ type StorableSpan struct {
ResponseStatusCode string `ch:"response_status_code"`
}
// MinimalSpan with only the fields needed to build the parent-child tree.
type MinimalSpan struct {
SpanID string `ch:"span_id"`
ParentSpanID string `ch:"parent_span_id"`
StartTime time.Time `ch:"timestamp"`
DurationNano uint64 `ch:"duration_nano"`
HasError bool `ch:"has_error"`
ServiceName string `ch:"resource_string_service$$name"`
}
func (item *MinimalSpan) ToWaterfallSpan(traceID string) *WaterfallSpan {
return &WaterfallSpan{
SpanID: item.SpanID,
TraceID: traceID,
ParentSpanID: item.ParentSpanID,
TimeUnix: uint64(item.StartTime.UnixNano()),
DurationNano: item.DurationNano,
HasError: item.HasError,
ServiceName: item.ServiceName,
Resource: map[string]string{"service.name": item.ServiceName},
Children: make([]*WaterfallSpan, 0),
Attributes: make(map[string]any),
Events: make([]Event, 0),
}
}
func (item *MinimalSpan) ToFlamegraphSpan() *FlamegraphSpan {
return &FlamegraphSpan{
SpanID: item.SpanID,
ParentSpanID: item.ParentSpanID,
Timestamp: uint64(item.StartTime.UnixNano()),
DurationNano: item.DurationNano,
HasError: item.HasError,
ServiceName: item.ServiceName,
Children: make([]*FlamegraphSpan, 0),
}
}
// NewMissingWaterfallSpan creates a synthetic placeholder span for a parent that has no recorded data.
func NewMissingWaterfallSpan(spanID, traceID string, timeUnixNano, durationNano uint64) *WaterfallSpan {
return &WaterfallSpan{
@@ -261,7 +297,7 @@ func (item *StorableSpan) UnmarshalledEvents() []Event {
return events
}
func (item *StorableSpan) ToWaterfallSpan() *WaterfallSpan {
func (item *StorableSpan) ToWaterfallSpan(traceID string) *WaterfallSpan {
resources := make(map[string]string)
maps.Copy(resources, item.ResourcesString)
@@ -289,7 +325,7 @@ func (item *StorableSpan) ToWaterfallSpan() *WaterfallSpan {
StatusCode: item.StatusCode,
StatusCodeString: item.StatusCodeString,
StatusMessage: item.StatusMessage,
TraceID: item.TraceID,
TraceID: traceID,
TraceState: item.TraceState,
Children: make([]*WaterfallSpan, 0),
TimeUnix: uint64(item.StartTime.UnixNano()),
@@ -297,6 +333,24 @@ func (item *StorableSpan) ToWaterfallSpan() *WaterfallSpan {
}
}
func EnrichSelectedSpans(window []*WaterfallSpan, fullSpans []StorableSpan) {
fullByID := make(map[string]*StorableSpan, len(fullSpans))
for i := range fullSpans {
fullByID[fullSpans[i].SpanID] = &fullSpans[i]
}
for i, ws := range window {
full, ok := fullByID[ws.SpanID]
if !ok {
continue // synthesized MissingSpan — keep empty shell
}
newWS := full.ToWaterfallSpan(ws.TraceID)
newWS.Level = ws.Level
newWS.HasChildren = ws.HasChildren
newWS.SubTreeNodeCount = ws.SubTreeNodeCount
window[i] = newWS
}
}
// getSpanIndex returns the index of matched span and -1 for no match.
func getSpanIndex(spans []*WaterfallSpan, targetSpanID string) int {
for i, s := range spans {

View File

@@ -62,26 +62,24 @@ func NewWaterfallTrace(
}
}
func NewWaterfallTraceFromSpans(spans []StorableSpan) *WaterfallTrace {
// NewWaterfallTraceFromSpans requires WaterfallSpan nodes with only below fields:
// SpanID, ParentSpanID, TimeUnix, DurationNano, HasError, and ServiceName.
func NewWaterfallTraceFromSpans(nodes []*WaterfallSpan) *WaterfallTrace {
var (
startTime, endTime, totalErrorSpans uint64
spanIDToSpanNodeMap = make(map[string]*WaterfallSpan, len(spans))
spanIDToSpanNodeMap = make(map[string]*WaterfallSpan, len(nodes))
traceRoots []*WaterfallSpan
hasMissingSpans bool
)
for _, item := range spans {
span := item.ToWaterfallSpan()
startTimeUnixNano := uint64(item.StartTime.UnixNano())
if startTime == 0 || startTimeUnixNano < startTime {
startTime = startTimeUnixNano
for _, span := range nodes {
if startTime == 0 || span.TimeUnix < startTime {
startTime = span.TimeUnix
}
endTime = max(endTime, startTimeUnixNano+span.DurationNano)
endTime = max(endTime, span.TimeUnix+span.DurationNano)
if span.HasError {
totalErrorSpans++
}
spanIDToSpanNodeMap[span.SpanID] = span
}
@@ -116,7 +114,7 @@ func NewWaterfallTraceFromSpans(spans []StorableSpan) *WaterfallTrace {
return NewWaterfallTrace(
startTime,
endTime,
uint64(len(spans)),
uint64(len(nodes)),
totalErrorSpans,
spanIDToSpanNodeMap,
traceRoots,

View File

@@ -20,6 +20,7 @@ from fixtures.querier import (
index_series_by_label,
make_query_request,
)
from fixtures.traces import TraceIdGenerator, Traces, TracesKind, TracesStatusCode
def test_logs_list(
@@ -2293,3 +2294,334 @@ def test_logs_formula_orderby_and_limit(
assert len(f3_services) == 3, f"F3: expected 3 rows after limit, got {len(f3_services)}"
assert f3_values == f4_values[:3], f"F3 values {f3_values} do not match F4[:3] values {f4_values[:3]}"
assert set(f3_services) == set(f4_services[:3]), f"F3 services {f3_services} do not match F4[:3] services {f4_services[:3]}"
def test_logs_list_filter_by_trace_id(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[list[Logs]], None],
insert_traces: Callable[[list[Traces]], None],
) -> None:
"""
Tests that filtering logs by trace_id uses the trace_summary lookup to
narrow the query window before scanning the logs table:
1. Returns the matching log (narrow window, single bucket).
2. Does not return duplicate logs when the query window should span multiple
exponential buckets (>1 h). But is clamped to the timerange of trace.
3. Returns no results when the query window does not contain the trace.
4. Logs carrying a trace_id whose trace is NOT in trace_summary (e.g.
traces disabled) are still returned — the lookup miss must not
short-circuit logs queries.
"""
target_trace_id = TraceIdGenerator.trace_id()
orphan_trace_id = TraceIdGenerator.trace_id()
target_root_span_id = TraceIdGenerator.span_id()
target_child_span_id = TraceIdGenerator.span_id()
orphan_span_id = TraceIdGenerator.span_id()
now = datetime.now(tz=UTC).replace(second=0, microsecond=0)
common_resources = {
"deployment.environment": "production",
"service.name": "logs-trace-filter-service",
"cloud.provider": "integration",
}
# Populate signoz_traces.distributed_trace_summary by inserting spans for
# the target trace_id. trace_summary records min/max of span timestamps
# (it ignores span duration), so two spans are inserted to give the trace
# a non-trivial recorded window of [now-10s, now-5s].
insert_traces(
[
Traces(
timestamp=now - timedelta(seconds=10),
duration=timedelta(seconds=1),
trace_id=target_trace_id,
span_id=target_root_span_id,
parent_span_id="",
name="root-span",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources=common_resources,
attributes={},
),
Traces(
timestamp=now - timedelta(seconds=5),
duration=timedelta(seconds=1),
trace_id=target_trace_id,
span_id=target_child_span_id,
parent_span_id=target_root_span_id,
name="child-span",
kind=TracesKind.SPAN_KIND_CLIENT,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources=common_resources,
attributes={},
),
]
)
# Insert logs:
# - one with the target trace_id, at a timestamp within the trace's
# recorded window (now-10s..now-5s, padded ±1s).
# - one with an orphan trace_id whose trace was never ingested — used to
# verify the lookup miss does NOT short-circuit logs queries.
insert_logs(
[
Logs(
timestamp=now - timedelta(seconds=7),
resources=common_resources,
attributes={"http.method": "GET"},
body="log inside the target trace window",
severity_text="INFO",
trace_id=target_trace_id,
span_id=target_root_span_id,
),
Logs(
timestamp=now - timedelta(seconds=2),
resources=common_resources,
attributes={"http.method": "PUT"},
body="log with a trace_id absent from trace_summary",
severity_text="INFO",
trace_id=orphan_trace_id,
span_id=orphan_span_id,
),
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
def _query(start_ms: int, end_ms: int, trace_id: str) -> tuple[list, list[str]]:
response = make_query_request(
signoz,
token,
start_ms=start_ms,
end_ms=end_ms,
request_type="raw",
queries=[
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "logs",
"disabled": False,
"limit": 100,
"offset": 0,
"filter": {"expression": f"trace_id = '{trace_id}'"},
"order": [
{"key": {"name": "timestamp"}, "direction": "desc"},
{"key": {"name": "id"}, "direction": "desc"},
],
},
}
],
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
rows = response.json()["data"]["data"]["results"][0]["rows"] or []
warning = (response.json().get("data") or {}).get("warning") or {}
messages = [w.get("message", "") for w in (warning.get("warnings") or [])]
return rows, messages
outside_range_msg = "lies outside the selected time range"
now_ms = int(now.timestamp() * 1000)
# --- Test 1: narrow window (single bucket, <1 h) ---
narrow_start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
narrow_rows, narrow_warnings = _query(narrow_start_ms, now_ms, target_trace_id)
assert len(narrow_rows) == 1, f"Expected 1 log for trace_id filter (narrow window), got {len(narrow_rows)}"
assert narrow_rows[0]["data"]["trace_id"] == target_trace_id
assert narrow_rows[0]["data"]["span_id"] == target_root_span_id
assert not any(outside_range_msg in m for m in narrow_warnings), f"Did not expect outside-range warning, got {narrow_warnings}"
# --- Test 2: wide window (>1 h, clamp to the timerange from trace_summary) ---
# Should still return exactly one log — no duplicates from multi-bucket scan.
wide_start_ms = int((now - timedelta(hours=12)).timestamp() * 1000)
wide_rows, wide_warnings = _query(wide_start_ms, now_ms, target_trace_id)
assert len(wide_rows) == 1, f"Expected 1 log for trace_id filter (wide window, multi-bucket), got {len(wide_rows)} — possible duplicate-log regression"
assert wide_rows[0]["data"]["trace_id"] == target_trace_id
assert wide_rows[0]["data"]["span_id"] == target_root_span_id
assert not any(outside_range_msg in m for m in wide_warnings), f"Did not expect outside-range warning, got {wide_warnings}"
# --- Test 3: window that does not contain the trace returns no results + warning ---
past_start_ms = int((now - timedelta(hours=6)).timestamp() * 1000)
past_end_ms = int((now - timedelta(hours=2)).timestamp() * 1000)
past_rows, past_warnings = _query(past_start_ms, past_end_ms, target_trace_id)
assert len(past_rows) == 0, f"Expected 0 logs for trace_id filter outside time window, got {len(past_rows)}"
assert any(outside_range_msg in m for m in past_warnings), f"Expected outside-range warning, got warnings={past_warnings}"
# --- Test 4: trace_id not present in trace_summary still returns logs (no warning) ---
orphan_rows, orphan_warnings = _query(narrow_start_ms, now_ms, orphan_trace_id)
assert len(orphan_rows) == 1, f"Expected 1 log for orphan trace_id (no trace_summary entry), got {len(orphan_rows)} — logs query may have been incorrectly short-circuited"
assert orphan_rows[0]["data"]["trace_id"] == orphan_trace_id
assert not any(outside_range_msg in m for m in orphan_warnings), f"Did not expect outside-range warning for orphan trace_id, got {orphan_warnings}"
def test_logs_aggregation_filter_by_trace_id(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[list[Logs]], None],
insert_traces: Callable[[list[Traces]], None],
) -> None:
"""
Tests that the trace_id time-range optimization also applies to
non-window-list (time_series / aggregation) logs queries:
1. Wide query window containing the trace returns the correct count.
2. Query window outside the trace's time range short-circuits to an
empty result.
3. A trace_id with no row in trace_summary (e.g. traces disabled) still
returns the matching logs — the lookup miss must not short-circuit
logs aggregation queries.
"""
target_trace_id = TraceIdGenerator.trace_id()
orphan_trace_id = TraceIdGenerator.trace_id()
target_root_span_id = TraceIdGenerator.span_id()
target_child_span_id = TraceIdGenerator.span_id()
orphan_span_id = TraceIdGenerator.span_id()
now = datetime.now(tz=UTC).replace(second=0, microsecond=0)
common_resources = {
"deployment.environment": "production",
"service.name": "logs-trace-agg-service",
"cloud.provider": "integration",
}
# trace_summary records min/max of span timestamps (it ignores duration),
# so insert two spans to give the trace a recorded window wide enough to
# comfortably contain the log timestamps below.
insert_traces(
[
Traces(
timestamp=now - timedelta(seconds=10),
duration=timedelta(seconds=1),
trace_id=target_trace_id,
span_id=target_root_span_id,
parent_span_id="",
name="root-span",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources=common_resources,
attributes={},
),
Traces(
timestamp=now - timedelta(seconds=5),
duration=timedelta(seconds=1),
trace_id=target_trace_id,
span_id=target_child_span_id,
parent_span_id=target_root_span_id,
name="child-span",
kind=TracesKind.SPAN_KIND_CLIENT,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources=common_resources,
attributes={},
),
]
)
# Two logs for the target trace_id, both inside the recorded trace window.
# One additional log carries an orphan trace_id with no row in
# trace_summary — used to verify that the lookup miss does not
# short-circuit logs aggregations.
insert_logs(
[
Logs(
timestamp=now - timedelta(seconds=7),
resources=common_resources,
attributes={},
body="log A inside trace window",
severity_text="INFO",
trace_id=target_trace_id,
span_id=target_root_span_id,
),
Logs(
timestamp=now - timedelta(seconds=6),
resources=common_resources,
attributes={},
body="log B inside trace window",
severity_text="INFO",
trace_id=target_trace_id,
span_id=target_root_span_id,
),
Logs(
timestamp=now - timedelta(seconds=2),
resources=common_resources,
attributes={},
body="log with a trace_id absent from trace_summary",
severity_text="INFO",
trace_id=orphan_trace_id,
span_id=orphan_span_id,
),
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
def _count(start_ms: int, end_ms: int, trace_id: str) -> tuple[float, list[str]]:
response = make_query_request(
signoz,
token,
start_ms=start_ms,
end_ms=end_ms,
request_type="time_series",
queries=[
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "logs",
"stepInterval": 60,
"disabled": False,
"filter": {"expression": f"trace_id = '{trace_id}'"},
"having": {"expression": ""},
"aggregations": [{"expression": "count()"}],
},
}
],
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
results = response.json()["data"]["data"]["results"]
assert len(results) == 1
warning = (response.json().get("data") or {}).get("warning") or {}
messages = [w.get("message", "") for w in (warning.get("warnings") or [])]
aggregations = results[0].get("aggregations") or []
if not aggregations:
return 0, messages
series = aggregations[0].get("series") or []
if not series:
return 0, messages
return sum(v["value"] for v in series[0]["values"]), messages
outside_range_msg = "lies outside the selected time range"
now_ms = int(now.timestamp() * 1000)
narrow_start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
# --- Test 1: wide window (>1 h) containing the trace returns 2 logs ---
wide_start_ms = int((now - timedelta(hours=12)).timestamp() * 1000)
wide_count, wide_warnings = _count(wide_start_ms, now_ms, target_trace_id)
assert wide_count == 2, f"Expected count=2 for trace_id aggregation (wide window), got {wide_count}"
assert not any(outside_range_msg in m for m in wide_warnings), f"Did not expect outside-range warning, got {wide_warnings}"
# --- Test 2: window outside the trace short-circuits to empty + warning ---
past_start_ms = int((now - timedelta(hours=6)).timestamp() * 1000)
past_end_ms = int((now - timedelta(hours=2)).timestamp() * 1000)
past_count, past_warnings = _count(past_start_ms, past_end_ms, target_trace_id)
assert past_count == 0, f"Expected count=0 for trace_id aggregation outside time window, got {past_count}"
assert any(outside_range_msg in m for m in past_warnings), f"Expected outside-range warning, got warnings={past_warnings}"
# --- Test 3: trace_id not present in trace_summary still returns logs (no warning) ---
orphan_count, orphan_warnings = _count(narrow_start_ms, now_ms, orphan_trace_id)
assert orphan_count == 1, f"Expected count=1 for orphan trace_id aggregation, got {orphan_count} — query may have been incorrectly short-circuited"
assert not any(outside_range_msg in m for m in orphan_warnings), f"Did not expect outside-range warning for orphan trace_id, got {orphan_warnings}"

View File

@@ -2062,7 +2062,7 @@ def test_traces_list_filter_by_trace_id(
trace_filter = f"trace_id = '{target_trace_id}'"
def _query(start_ms: int, end_ms: int) -> list:
def _query(start_ms: int, end_ms: int) -> tuple[list, list[str]]:
response = make_query_request(
signoz,
token,
@@ -2096,30 +2096,157 @@ def test_traces_list_filter_by_trace_id(
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
return response.json()["data"]["data"]["results"][0]["rows"] or []
rows = response.json()["data"]["data"]["results"][0]["rows"] or []
warning = (response.json().get("data") or {}).get("warning") or {}
messages = [w.get("message", "") for w in (warning.get("warnings") or [])]
return rows, messages
outside_range_msg = "lies outside the selected time range"
now_ms = int(now.timestamp() * 1000)
# --- Test 1: narrow window (single bucket, <1 h) ---
narrow_start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
narrow_rows = _query(narrow_start_ms, now_ms)
narrow_rows, narrow_warnings = _query(narrow_start_ms, now_ms)
assert len(narrow_rows) == 1, f"Expected 1 span for trace_id filter (narrow window), got {len(narrow_rows)}"
assert narrow_rows[0]["data"]["span_id"] == span_id_root
assert narrow_rows[0]["data"]["trace_id"] == target_trace_id
assert not any(outside_range_msg in m for m in narrow_warnings), f"Did not expect outside-range warning, got {narrow_warnings}"
# --- Test 2: wide window (>1 h, triggers multiple exponential buckets) ---
# should just return 1 span, not duplicate
wide_start_ms = int((now - timedelta(hours=12)).timestamp() * 1000)
wide_rows = _query(wide_start_ms, now_ms)
wide_rows, wide_warnings = _query(wide_start_ms, now_ms)
assert len(wide_rows) == 1, f"Expected 1 span for trace_id filter (wide window, multi-bucket), got {len(wide_rows)} — possible duplicate-span regression"
assert wide_rows[0]["data"]["span_id"] == span_id_root
assert wide_rows[0]["data"]["trace_id"] == target_trace_id
assert not any(outside_range_msg in m for m in wide_warnings), f"Did not expect outside-range warning, got {wide_warnings}"
# --- Test 3: window that does not contain the trace returns no results ---
# --- Test 3: window that does not contain the trace returns no results + warning ---
past_start_ms = int((now - timedelta(hours=6)).timestamp() * 1000)
past_end_ms = int((now - timedelta(hours=2)).timestamp() * 1000)
past_rows = _query(past_start_ms, past_end_ms)
past_rows, past_warnings = _query(past_start_ms, past_end_ms)
assert len(past_rows) == 0, f"Expected 0 spans for trace_id filter outside time window, got {len(past_rows)}"
assert any(outside_range_msg in m for m in past_warnings), f"Expected outside-range warning, got warnings={past_warnings}"
def test_traces_aggregation_filter_by_trace_id(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[list[Traces]], None],
) -> None:
"""
Tests that the trace_id time-range optimization also applies to
non-window-list (time_series / aggregation) traces queries:
1. Wide query window containing the trace returns the correct count.
2. Query window outside the trace's time range short-circuits to empty.
3. Filter referencing a trace_id with no row in trace_summary
short-circuits to empty (trace_summary is authoritative for traces).
"""
target_trace_id = TraceIdGenerator.trace_id()
target_root_span_id = TraceIdGenerator.span_id()
target_child_span_id = TraceIdGenerator.span_id()
missing_trace_id = TraceIdGenerator.trace_id()
now = datetime.now(tz=UTC).replace(second=0, microsecond=0)
common_resources = {
"deployment.environment": "production",
"service.name": "traces-agg-filter-service",
"cloud.provider": "integration",
}
insert_traces(
[
Traces(
timestamp=now - timedelta(seconds=10),
duration=timedelta(seconds=5),
trace_id=target_trace_id,
span_id=target_root_span_id,
parent_span_id="",
name="root-span",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources=common_resources,
attributes={"http.request.method": "GET"},
),
Traces(
timestamp=now - timedelta(seconds=9),
duration=timedelta(seconds=1),
trace_id=target_trace_id,
span_id=target_child_span_id,
parent_span_id=target_root_span_id,
name="child-span",
kind=TracesKind.SPAN_KIND_CLIENT,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources=common_resources,
attributes={},
),
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
def _count(start_ms: int, end_ms: int, trace_id: str) -> tuple[float, list[str]]:
response = make_query_request(
signoz,
token,
start_ms=start_ms,
end_ms=end_ms,
request_type="time_series",
queries=[
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "traces",
"stepInterval": 60,
"disabled": False,
"filter": {"expression": f"trace_id = '{trace_id}'"},
"aggregations": [{"expression": "count()"}],
},
}
],
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
results = response.json()["data"]["data"]["results"]
assert len(results) == 1
warning = (response.json().get("data") or {}).get("warning") or {}
messages = [w.get("message", "") for w in (warning.get("warnings") or [])]
aggregations = results[0].get("aggregations") or []
if not aggregations:
return 0, messages
series = aggregations[0].get("series") or []
if not series:
return 0, messages
return sum(v["value"] for v in series[0]["values"]), messages
outside_range_msg = "lies outside the selected time range"
now_ms = int(now.timestamp() * 1000)
# --- Test 1: wide window (>1 h) containing the trace returns both spans ---
wide_start_ms = int((now - timedelta(hours=12)).timestamp() * 1000)
wide_count, wide_warnings = _count(wide_start_ms, now_ms, target_trace_id)
assert wide_count == 2, f"Expected count=2 for trace_id aggregation (wide window), got {wide_count}"
assert not any(outside_range_msg in m for m in wide_warnings), f"Did not expect outside-range warning, got {wide_warnings}"
# --- Test 2: window outside the trace short-circuits to empty + warning ---
past_start_ms = int((now - timedelta(hours=6)).timestamp() * 1000)
past_end_ms = int((now - timedelta(hours=2)).timestamp() * 1000)
past_count, past_warnings = _count(past_start_ms, past_end_ms, target_trace_id)
assert past_count == 0, f"Expected count=0 for trace_id aggregation outside time window, got {past_count}"
assert any(outside_range_msg in m for m in past_warnings), f"Expected outside-range warning, got warnings={past_warnings}"
# --- Test 3: trace_id with no entry in trace_summary short-circuits (no warning) ---
missing_start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
missing_count, missing_warnings = _count(missing_start_ms, now_ms, missing_trace_id)
assert missing_count == 0, f"Expected count=0 for trace_id absent from trace_summary, got {missing_count}"
assert not any(outside_range_msg in m for m in missing_warnings), f"Did not expect outside-range warning for missing trace_id, got {missing_warnings}"