mirror of
https://github.com/SigNoz/signoz.git
synced 2026-05-27 04:10:28 +01:00
Compare commits
6 Commits
traceop-re
...
ns/flamegr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
168b2eaa9c | ||
|
|
6b613f18a3 | ||
|
|
1b0447181d | ||
|
|
20edff4771 | ||
|
|
2048ef3d2f | ||
|
|
53c551359e |
@@ -434,6 +434,17 @@ tracedetail:
|
||||
max_depth_to_auto_expand: 5
|
||||
# Threshold below which all spans are returned without windowing.
|
||||
max_limit_to_select_all_spans: 10000
|
||||
flamegraph:
|
||||
# Maximum number of BFS depth levels included in a windowed response.
|
||||
max_selected_levels: 50
|
||||
# Maximum spans per level before sampling is applied.
|
||||
max_spans_per_level: 100
|
||||
# Number of highest-latency spans always included when sampling a level.
|
||||
sampling_top_latency_count: 5
|
||||
# Number of timestamp buckets used for uniform sampling within a level.
|
||||
sampling_bucket_count: 50
|
||||
# Threshold below which all spans are returned without windowing or sampling.
|
||||
select_all_spans_limit: 100000
|
||||
|
||||
##################### Authz #################################
|
||||
authz:
|
||||
|
||||
@@ -48,5 +48,23 @@ func (provider *provider) addTraceDetailRoutes(router *mux.Router) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := router.Handle("/api/v3/traces/{traceID}/flamegraph", handler.New(
|
||||
provider.authzMiddleware.ViewAccess(provider.traceDetailHandler.GetFlamegraph),
|
||||
handler.OpenAPIDef{
|
||||
ID: "GetFlamegraph",
|
||||
Tags: []string{"tracedetail"},
|
||||
Summary: "Get flamegraph view for a trace",
|
||||
Description: "Returns the flamegraph view of spans for a given trace ID.",
|
||||
Request: new(spantypes.PostableFlamegraph),
|
||||
RequestContentType: "application/json",
|
||||
Response: new(spantypes.GettableFlamegraphTrace),
|
||||
ResponseContentType: "application/json",
|
||||
SuccessStatusCode: http.StatusOK,
|
||||
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound},
|
||||
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
|
||||
},
|
||||
)).Methods(http.MethodPost).GetError(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -6,7 +6,16 @@ import (
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
Waterfall WaterfallConfig `mapstructure:"waterfall"`
|
||||
Waterfall WaterfallConfig `mapstructure:"waterfall"`
|
||||
Flamegraph FlamegraphConfig `mapstructure:"flamegraph"`
|
||||
}
|
||||
|
||||
type FlamegraphConfig struct {
|
||||
MaxSelectedLevels int `mapstructure:"max_selected_levels"`
|
||||
MaxSpansPerLevel int `mapstructure:"max_spans_per_level"`
|
||||
SamplingTopLatencySpansCount int `mapstructure:"sampling_top_latency_count"`
|
||||
SamplingBucketCount int `mapstructure:"sampling_bucket_count"`
|
||||
SelectAllSpansLimit uint `mapstructure:"select_all_spans_limit"`
|
||||
}
|
||||
|
||||
type WaterfallConfig struct {
|
||||
@@ -29,6 +38,13 @@ func newConfig() factory.Config {
|
||||
MaxDepthToAutoExpand: 5,
|
||||
MaxLimitToSelectAllSpans: 10_000,
|
||||
},
|
||||
Flamegraph: FlamegraphConfig{
|
||||
MaxSelectedLevels: 50,
|
||||
MaxSpansPerLevel: 100,
|
||||
SamplingTopLatencySpansCount: 5,
|
||||
SamplingBucketCount: 50,
|
||||
SelectAllSpansLimit: 100_000,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,5 +61,25 @@ func (c Config) Validate() error {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput,
|
||||
"tracedetail.waterfall.max_limit_to_select_all_spans must be positive")
|
||||
}
|
||||
if c.Flamegraph.MaxSelectedLevels <= 0 {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput,
|
||||
"tracedetail.flamegraph.level_limit must be positive, got %d", c.Flamegraph.MaxSelectedLevels)
|
||||
}
|
||||
if c.Flamegraph.MaxSpansPerLevel <= 0 {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput,
|
||||
"tracedetail.flamegraph.spans_per_level must be positive, got %d", c.Flamegraph.MaxSpansPerLevel)
|
||||
}
|
||||
if c.Flamegraph.SamplingTopLatencySpansCount < 0 {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput,
|
||||
"tracedetail.flamegraph.top_latency_count cannot be negative, got %d", c.Flamegraph.SamplingTopLatencySpansCount)
|
||||
}
|
||||
if c.Flamegraph.SamplingBucketCount <= 0 {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput,
|
||||
"tracedetail.flamegraph.bucket_count must be positive, got %d", c.Flamegraph.SamplingBucketCount)
|
||||
}
|
||||
if c.Flamegraph.SelectAllSpansLimit == 0 {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput,
|
||||
"tracedetail.flamegraph.max_limit_to_select_all_spans must be positive")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -59,3 +59,19 @@ func (h *handler) GetWaterfallV4(rw http.ResponseWriter, r *http.Request) {
|
||||
|
||||
render.Success(rw, http.StatusOK, result)
|
||||
}
|
||||
|
||||
func (h *handler) GetFlamegraph(rw http.ResponseWriter, r *http.Request) {
|
||||
req := new(spantypes.PostableFlamegraph)
|
||||
if err := binding.JSON.BindBody(r.Body, req); err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
result, err := h.module.GetFlamegraph(r.Context(), mux.Vars(r)["traceID"], req)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
render.Success(rw, http.StatusOK, result)
|
||||
}
|
||||
|
||||
@@ -105,6 +105,64 @@ func (m *module) getFullWaterfall(ctx context.Context, traceID string, summary *
|
||||
return spantypes.NewGettableWaterfallTrace(waterfallTrace, selectedSpans, nil, true, nil), nil
|
||||
}
|
||||
|
||||
func (m *module) GetFlamegraph(ctx context.Context, traceID string, req *spantypes.PostableFlamegraph) (*spantypes.GettableFlamegraphTrace, error) {
|
||||
summary, err := m.store.GetTraceSummary(ctx, traceID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if summary.NumSpans <= uint64(m.config.Flamegraph.SelectAllSpansLimit) {
|
||||
return m.getFullFlamegraph(ctx, traceID, summary)
|
||||
}
|
||||
return m.getWindowedFlamegraph(ctx, traceID, req.SelectedSpanID, summary)
|
||||
}
|
||||
|
||||
func (m *module) getFullFlamegraph(ctx context.Context, traceID string, summary *spantypes.TraceSummary) (*spantypes.GettableFlamegraphTrace, error) {
|
||||
fullSpans, err := m.store.GetTraceSpans(ctx, traceID, summary)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(fullSpans) == 0 {
|
||||
return nil, spantypes.ErrTraceNotFound
|
||||
}
|
||||
flamegraphTrace := spantypes.NewFlamegraphTraceFromStorable(fullSpans)
|
||||
return spantypes.NewGettableFlamegraphTrace(
|
||||
flamegraphTrace.GetAllLevels(),
|
||||
summary.Start.UnixMilli(), summary.End.UnixMilli(), false,
|
||||
), nil
|
||||
}
|
||||
|
||||
// getWindowedFlamegraph returns a window of a max levels and max sampled spans per level around the selected span
|
||||
func (m *module) getWindowedFlamegraph(ctx context.Context, traceID, selectedSpanID string, summary *spantypes.TraceSummary) (*spantypes.GettableFlamegraphTrace, error) {
|
||||
minimalSpans, err := m.store.GetMinimalSpans(ctx, traceID, summary.Start, summary.End)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(minimalSpans) == 0 {
|
||||
return nil, spantypes.ErrTraceNotFound
|
||||
}
|
||||
|
||||
flamegraphTrace := spantypes.NewFlamegraphTraceFromMinimal(minimalSpans)
|
||||
minimalSpans = nil
|
||||
|
||||
cfg := m.config.Flamegraph
|
||||
selectedSpans := flamegraphTrace.GetSelectedLevels(selectedSpanID,
|
||||
cfg.MaxSelectedLevels, cfg.MaxSpansPerLevel, cfg.SamplingTopLatencySpansCount, cfg.SamplingBucketCount)
|
||||
if len(selectedSpans) == 0 {
|
||||
return nil, spantypes.ErrTraceNotFound
|
||||
}
|
||||
|
||||
fullSpans, err := m.store.GetTraceSpansByIDs(ctx, traceID, summary.Start, summary.End,
|
||||
spantypes.FlamegraphWindowSpanIDs(selectedSpans))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return spantypes.NewGettableFlamegraphTrace(
|
||||
flamegraphTrace.EnrichSelectedSpans(selectedSpans, fullSpans),
|
||||
summary.Start.UnixMilli(), summary.End.UnixMilli(), true,
|
||||
), nil
|
||||
}
|
||||
|
||||
// getWindowedWaterfall builds the waterfall tree with minimal data and then returns only a window of full spans.
|
||||
func (m *module) getWindowedWaterfall(ctx context.Context, traceID, selectedSpanID string, uncollapsedSpans []string, start, end time.Time) (*spantypes.GettableWaterfallTrace, error) {
|
||||
// Step 1: minimal fetch → build full tree → select visible window
|
||||
|
||||
@@ -11,10 +11,12 @@ import (
|
||||
type Handler interface {
|
||||
GetWaterfall(http.ResponseWriter, *http.Request)
|
||||
GetWaterfallV4(http.ResponseWriter, *http.Request)
|
||||
GetFlamegraph(http.ResponseWriter, *http.Request)
|
||||
}
|
||||
|
||||
// Module defines the business logic for trace detail operations.
|
||||
type Module interface {
|
||||
GetWaterfall(ctx context.Context, traceID string, req *spantypes.PostableWaterfall) (*spantypes.GettableWaterfallTrace, error)
|
||||
GetWaterfallV4(ctx context.Context, traceID string, selectedSpanID string, uncollapsedSpans []string, selectAllLimit uint) (*spantypes.GettableWaterfallTrace, error)
|
||||
GetFlamegraph(ctx context.Context, traceID string, req *spantypes.PostableFlamegraph) (*spantypes.GettableFlamegraphTrace, error)
|
||||
}
|
||||
|
||||
@@ -70,31 +70,12 @@ func (b *traceOperatorCTEBuilder) build(ctx context.Context, requestType qbtypes
|
||||
|
||||
selectFromCTE := rootCTEName
|
||||
if b.operator.ReturnSpansFrom != "" {
|
||||
sourceQueryCTE := b.queryToCTEName[b.operator.ReturnSpansFrom]
|
||||
if sourceQueryCTE == "" {
|
||||
selectFromCTE = b.queryToCTEName[b.operator.ReturnSpansFrom]
|
||||
if selectFromCTE == "" {
|
||||
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput,
|
||||
"returnSpansFrom references query '%s' which has no corresponding CTE",
|
||||
b.operator.ReturnSpansFrom)
|
||||
}
|
||||
filteredCTEName := fmt.Sprintf("__return_from_%s", b.operator.ReturnSpansFrom)
|
||||
|
||||
// rootCTEName holds one row per matching *span*, not per *trace*, so it can
|
||||
// contain many rows for the same trace_id. DISTINCT de-duplicates that set
|
||||
// before ClickHouse builds the hash table for the IN check, keeping memory
|
||||
// usage proportional to the number of distinct traces rather than spans.
|
||||
matchingTracedSB := sqlbuilder.NewSelectBuilder()
|
||||
matchingTracedSB.Select("DISTINCT trace_id")
|
||||
matchingTracedSB.From(rootCTEName)
|
||||
matchedTracesSQL, matchedTracesArgs := matchingTracedSB.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
filteredSB := sqlbuilder.NewSelectBuilder()
|
||||
filteredSB.Select("*")
|
||||
filteredSB.From(sourceQueryCTE)
|
||||
filteredSB.Where(fmt.Sprintf("trace_id IN (%s)", matchedTracesSQL))
|
||||
filteredSQL, filteredArgs := filteredSB.BuildWithFlavor(sqlbuilder.ClickHouse, matchedTracesArgs...)
|
||||
|
||||
b.addCTE(filteredCTEName, filteredSQL, filteredArgs, []string{sourceQueryCTE, rootCTEName})
|
||||
selectFromCTE = filteredCTEName
|
||||
}
|
||||
|
||||
finalStmt, err := b.buildFinalQuery(ctx, selectFromCTE, requestType)
|
||||
|
||||
@@ -385,82 +385,6 @@ func TestTraceOperatorStatementBuilder(t *testing.T) {
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
name: "returnSpansFrom B: A -> B return B spans filtered by operator",
|
||||
requestType: qbtypes.RequestTypeRaw,
|
||||
operator: qbtypes.QueryBuilderTraceOperator{
|
||||
Expression: "A -> B",
|
||||
ReturnSpansFrom: "B",
|
||||
Limit: 10,
|
||||
},
|
||||
compositeQuery: &qbtypes.CompositeQuery{
|
||||
Queries: []qbtypes.QueryEnvelope{
|
||||
{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
|
||||
Name: "A",
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
Filter: &qbtypes.Filter{Expression: "service.name = 'gateway'"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
|
||||
Name: "B",
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
Filter: &qbtypes.Filter{Expression: "service.name = 'database'"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), A_INDIR_DESC_B AS (WITH RECURSIVE up AS (SELECT d.trace_id, d.span_id, d.parent_span_id, 0 AS depth FROM B AS d UNION ALL SELECT p.trace_id, p.span_id, p.parent_span_id, up.depth + 1 FROM all_spans AS p JOIN up ON p.trace_id = up.trace_id AND p.span_id = up.parent_span_id WHERE up.depth < 100) SELECT DISTINCT a.* FROM A AS a GLOBAL INNER JOIN (SELECT DISTINCT trace_id, span_id FROM up WHERE depth > 0 ) AS ancestors ON ancestors.trace_id = a.trace_id AND ancestors.span_id = a.span_id), __return_from_B AS (SELECT * FROM B WHERE trace_id IN (SELECT DISTINCT trace_id FROM A_INDIR_DESC_B)) SELECT timestamp, trace_id, span_id, name, duration_nano, parent_span_id FROM __return_from_B ORDER BY timestamp DESC LIMIT ? SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000",
|
||||
Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "gateway", "%service.name%", "%service.name\":\"gateway%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "database", "%service.name%", "%service.name\":\"database%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
name: "returnSpansFrom C: (A -> B) && C return C spans filtered by operator",
|
||||
requestType: qbtypes.RequestTypeRaw,
|
||||
operator: qbtypes.QueryBuilderTraceOperator{
|
||||
Expression: "(A -> B) && C",
|
||||
ReturnSpansFrom: "C",
|
||||
Limit: 10,
|
||||
},
|
||||
compositeQuery: &qbtypes.CompositeQuery{
|
||||
Queries: []qbtypes.QueryEnvelope{
|
||||
{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
|
||||
Name: "A",
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
Filter: &qbtypes.Filter{Expression: "service.name = 'gateway'"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
|
||||
Name: "B",
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
Filter: &qbtypes.Filter{Expression: "service.name = 'database'"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
|
||||
Name: "C",
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
Filter: &qbtypes.Filter{Expression: "service.name = 'auth'"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), A_INDIR_DESC_B AS (WITH RECURSIVE up AS (SELECT d.trace_id, d.span_id, d.parent_span_id, 0 AS depth FROM B AS d UNION ALL SELECT p.trace_id, p.span_id, p.parent_span_id, up.depth + 1 FROM all_spans AS p JOIN up ON p.trace_id = up.trace_id AND p.span_id = up.parent_span_id WHERE up.depth < 100) SELECT DISTINCT a.* FROM A AS a GLOBAL INNER JOIN (SELECT DISTINCT trace_id, span_id FROM up WHERE depth > 0 ) AS ancestors ON ancestors.trace_id = a.trace_id AND ancestors.span_id = a.span_id), __resource_filter_C AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), C AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_C) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), A_INDIR_DESC_B_AND_C AS (SELECT l.* FROM A_INDIR_DESC_B AS l INNER JOIN C AS r ON l.trace_id = r.trace_id), __return_from_C AS (SELECT * FROM C WHERE trace_id IN (SELECT DISTINCT trace_id FROM A_INDIR_DESC_B_AND_C)) SELECT timestamp, trace_id, span_id, name, duration_nano, parent_span_id FROM __return_from_C ORDER BY timestamp DESC LIMIT ? SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000",
|
||||
Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "gateway", "%service.name%", "%service.name\":\"gateway%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "database", "%service.name%", "%service.name\":\"database%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "auth", "%service.name%", "%service.name\":\"auth%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
}
|
||||
|
||||
fm := NewFieldMapper()
|
||||
|
||||
81
pkg/types/spantypes/flamegraph_span.go
Normal file
81
pkg/types/spantypes/flamegraph_span.go
Normal file
@@ -0,0 +1,81 @@
|
||||
package spantypes
|
||||
|
||||
import (
|
||||
"maps"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
)
|
||||
|
||||
type FlamegraphSpan struct {
|
||||
SpanID string `json:"spanId"`
|
||||
ParentSpanID string `json:"parentSpanId"`
|
||||
Timestamp uint64 `json:"timestamp"`
|
||||
DurationNano uint64 `json:"durationNano"`
|
||||
HasError bool `json:"hasError"`
|
||||
ServiceName string `json:"serviceName"`
|
||||
Name string `json:"name"`
|
||||
Level int64 `json:"level"`
|
||||
Events []Event `json:"event"`
|
||||
Attributes map[string]any `json:"attributes,omitempty"`
|
||||
Resource map[string]string `json:"resource,omitempty"`
|
||||
Children []*FlamegraphSpan `json:"-"` // internal tree use only
|
||||
}
|
||||
|
||||
// FlamegraphLevel groups span IDs at a single level within the selected window.
|
||||
type FlamegraphLevel struct {
|
||||
Level int64
|
||||
SpanIDs []string
|
||||
}
|
||||
|
||||
type PostableFlamegraph struct {
|
||||
SelectedSpanID string `json:"selectedSpanId"`
|
||||
SelectFields []telemetrytypes.TelemetryFieldKey `json:"selectFields,omitempty"`
|
||||
}
|
||||
|
||||
// GettableFlamegraphTrace is the response for the v3 flamegraph API.
|
||||
type GettableFlamegraphTrace struct {
|
||||
Spans [][]*FlamegraphSpan `json:"spans"`
|
||||
StartTimestampMillis int64 `json:"startTimestampMillis"`
|
||||
EndTimestampMillis int64 `json:"endTimestampMillis"`
|
||||
HasMore bool `json:"hasMore"`
|
||||
}
|
||||
|
||||
func NewGettableFlamegraphTrace(spans [][]*FlamegraphSpan, startMs, endMs int64, hasMore bool) *GettableFlamegraphTrace {
|
||||
return &GettableFlamegraphTrace{
|
||||
Spans: spans,
|
||||
StartTimestampMillis: startMs,
|
||||
EndTimestampMillis: endMs,
|
||||
HasMore: hasMore,
|
||||
}
|
||||
}
|
||||
|
||||
func NewFlamegraphSpanFromStorable(s *StorableSpan, level int64) *FlamegraphSpan {
|
||||
resources := make(map[string]string, len(s.ResourcesString))
|
||||
maps.Copy(resources, s.ResourcesString)
|
||||
return &FlamegraphSpan{
|
||||
SpanID: s.SpanID,
|
||||
ParentSpanID: s.ParentSpanID,
|
||||
Timestamp: uint64(s.StartTime.UnixNano()),
|
||||
DurationNano: s.DurationNano,
|
||||
HasError: s.HasError,
|
||||
ServiceName: s.ServiceName,
|
||||
Name: s.Name,
|
||||
Level: level,
|
||||
Events: s.UnmarshalledEvents(),
|
||||
Attributes: s.Attributes(),
|
||||
Resource: resources,
|
||||
}
|
||||
}
|
||||
|
||||
// FlamegraphWindowSpanIDs collects all span IDs from a level window into a flat slice.
|
||||
func FlamegraphWindowSpanIDs(window []FlamegraphLevel) []string {
|
||||
total := 0
|
||||
for _, lvl := range window {
|
||||
total += len(lvl.SpanIDs)
|
||||
}
|
||||
ids := make([]string, 0, total)
|
||||
for _, lvl := range window {
|
||||
ids = append(ids, lvl.SpanIDs...)
|
||||
}
|
||||
return ids
|
||||
}
|
||||
279
pkg/types/spantypes/flamegraph_trace.go
Normal file
279
pkg/types/spantypes/flamegraph_trace.go
Normal file
@@ -0,0 +1,279 @@
|
||||
package spantypes
|
||||
|
||||
import (
|
||||
"sort"
|
||||
)
|
||||
|
||||
// FlamegraphTrace holds the level wise tree built from minimal spans.
|
||||
type FlamegraphTrace struct {
|
||||
roots []*FlamegraphSpan
|
||||
nodeByID map[string]*FlamegraphSpan
|
||||
startTime uint64
|
||||
endTime uint64
|
||||
}
|
||||
|
||||
func NewFlamegraphTraceFromMinimal(spans []MinimalSpan) *FlamegraphTrace {
|
||||
t := &FlamegraphTrace{
|
||||
nodeByID: make(map[string]*FlamegraphSpan, len(spans)),
|
||||
}
|
||||
for i := range spans {
|
||||
node := spans[i].ToFlamegraphSpan()
|
||||
t.updateTimeRange(node.Timestamp, node.DurationNano)
|
||||
t.nodeByID[node.SpanID] = node
|
||||
}
|
||||
t.wireTree()
|
||||
return t
|
||||
}
|
||||
|
||||
func NewFlamegraphTraceFromStorable(spans []StorableSpan) *FlamegraphTrace {
|
||||
t := &FlamegraphTrace{
|
||||
nodeByID: make(map[string]*FlamegraphSpan, len(spans)),
|
||||
}
|
||||
for i := range spans {
|
||||
node := NewFlamegraphSpanFromStorable(&spans[i], 0) // level is set later by BFS
|
||||
t.updateTimeRange(node.Timestamp, node.DurationNano)
|
||||
t.nodeByID[node.SpanID] = node
|
||||
}
|
||||
t.wireTree()
|
||||
return t
|
||||
}
|
||||
|
||||
func (t *FlamegraphTrace) GetAllLevels() [][]*FlamegraphSpan {
|
||||
allLevels := t.buildAllLevels()
|
||||
for _, node := range t.nodeByID {
|
||||
node.Children = nil // children not required after building tree
|
||||
}
|
||||
return allLevels
|
||||
}
|
||||
|
||||
// GetSelectedLevels returns the level window for selectedSpanID with sampling applied to
|
||||
// dense levels. It always applies windowing — callers should only invoke this when the
|
||||
// trace is known to exceed the select-all limit.
|
||||
// Children are cleared after traversal so the tree can be GC'd.
|
||||
func (t *FlamegraphTrace) GetSelectedLevels(
|
||||
selectedSpanID string,
|
||||
levelLimit, spansPerLevel, topLatencyCount, bucketCount int,
|
||||
) []FlamegraphLevel {
|
||||
allLevels := t.buildAllLevels()
|
||||
for _, node := range t.nodeByID {
|
||||
node.Children = nil
|
||||
}
|
||||
|
||||
selectedIndex := 0
|
||||
if selectedSpanID != "" {
|
||||
outer:
|
||||
for i, lvl := range allLevels {
|
||||
for _, span := range lvl {
|
||||
if span.SpanID == selectedSpanID {
|
||||
selectedIndex = i
|
||||
break outer
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
lowerLimit := selectedIndex - int(float64(levelLimit)*0.4)
|
||||
upperLimit := selectedIndex + int(float64(levelLimit)*0.6)
|
||||
|
||||
if lowerLimit < 0 {
|
||||
upperLimit -= lowerLimit
|
||||
lowerLimit = 0
|
||||
}
|
||||
if upperLimit > len(allLevels) {
|
||||
lowerLimit -= upperLimit - len(allLevels)
|
||||
upperLimit = len(allLevels)
|
||||
}
|
||||
if lowerLimit < 0 {
|
||||
lowerLimit = 0
|
||||
}
|
||||
|
||||
result := make([]FlamegraphLevel, 0, upperLimit-lowerLimit)
|
||||
for i := lowerLimit; i < upperLimit; i++ {
|
||||
lvl := allLevels[i]
|
||||
if len(lvl) == 0 {
|
||||
continue
|
||||
}
|
||||
var sampled []*FlamegraphSpan
|
||||
if len(lvl) > spansPerLevel {
|
||||
sampled = sampleFlamegraphLevel(lvl, selectedSpanID, i == selectedIndex,
|
||||
t.startTime, t.endTime, topLatencyCount, bucketCount)
|
||||
} else {
|
||||
sampled = lvl
|
||||
}
|
||||
if len(sampled) == 0 {
|
||||
continue
|
||||
}
|
||||
spanIDs := make([]string, len(sampled))
|
||||
for j, s := range sampled {
|
||||
spanIDs[j] = s.SpanID
|
||||
}
|
||||
result = append(result, FlamegraphLevel{
|
||||
Level: sampled[0].Level,
|
||||
SpanIDs: spanIDs,
|
||||
})
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func (t *FlamegraphTrace) EnrichSelectedSpans(selectedSpans []FlamegraphLevel, fullSpans []StorableSpan) [][]*FlamegraphSpan {
|
||||
fullByID := make(map[string]*StorableSpan, len(fullSpans))
|
||||
for i := range fullSpans {
|
||||
fullByID[fullSpans[i].SpanID] = &fullSpans[i]
|
||||
}
|
||||
|
||||
result := make([][]*FlamegraphSpan, len(selectedSpans))
|
||||
for i, lvl := range selectedSpans {
|
||||
result[i] = make([]*FlamegraphSpan, 0, len(lvl.SpanIDs))
|
||||
for _, spanID := range lvl.SpanIDs {
|
||||
if full, ok := fullByID[spanID]; ok {
|
||||
result[i] = append(result[i], NewFlamegraphSpanFromStorable(full, lvl.Level))
|
||||
} else if lean, ok := t.nodeByID[spanID]; ok {
|
||||
result[i] = append(result[i], lean)
|
||||
}
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func (t *FlamegraphTrace) updateTimeRange(timestamp, durationNano uint64) {
|
||||
if t.startTime == 0 || timestamp < t.startTime {
|
||||
t.startTime = timestamp
|
||||
}
|
||||
if end := timestamp + durationNano; end > t.endTime {
|
||||
t.endTime = end
|
||||
}
|
||||
}
|
||||
|
||||
func (t *FlamegraphTrace) wireTree() {
|
||||
for _, node := range t.nodeByID {
|
||||
if node.ParentSpanID != "" {
|
||||
if parent, ok := t.nodeByID[node.ParentSpanID]; ok {
|
||||
parent.Children = append(parent.Children, node)
|
||||
} else {
|
||||
missing := &FlamegraphSpan{
|
||||
SpanID: node.ParentSpanID,
|
||||
Name: "Missing Span",
|
||||
Timestamp: node.Timestamp,
|
||||
DurationNano: node.DurationNano,
|
||||
Children: []*FlamegraphSpan{node},
|
||||
}
|
||||
t.nodeByID[missing.SpanID] = missing
|
||||
t.roots = append(t.roots, missing)
|
||||
}
|
||||
} else if flamegraphSpanIndex(t.roots, node.SpanID) == -1 {
|
||||
t.roots = append(t.roots, node)
|
||||
}
|
||||
}
|
||||
|
||||
sort.Slice(t.roots, func(i, j int) bool {
|
||||
if t.roots[i].Timestamp == t.roots[j].Timestamp {
|
||||
return t.roots[i].SpanID < t.roots[j].SpanID
|
||||
}
|
||||
return t.roots[i].Timestamp < t.roots[j].Timestamp
|
||||
})
|
||||
}
|
||||
|
||||
func (t *FlamegraphTrace) buildAllLevels() [][]*FlamegraphSpan {
|
||||
var result [][]*FlamegraphSpan
|
||||
|
||||
type entry struct {
|
||||
node *FlamegraphSpan
|
||||
depth int64
|
||||
}
|
||||
|
||||
for _, root := range t.roots {
|
||||
levelMap := make(map[int64][]*FlamegraphSpan)
|
||||
maxDepth := int64(-1)
|
||||
|
||||
queue := []entry{{root, 0}}
|
||||
for len(queue) > 0 {
|
||||
curr := queue[0]
|
||||
queue = queue[1:]
|
||||
curr.node.Level = curr.depth
|
||||
levelMap[curr.depth] = append(levelMap[curr.depth], curr.node)
|
||||
if curr.depth > maxDepth {
|
||||
maxDepth = curr.depth
|
||||
}
|
||||
for _, child := range curr.node.Children {
|
||||
queue = append(queue, entry{child, curr.depth + 1})
|
||||
}
|
||||
}
|
||||
|
||||
for depth := int64(0); depth <= maxDepth; depth++ {
|
||||
if spans, ok := levelMap[depth]; ok {
|
||||
result = append(result, spans)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func sampleFlamegraphLevel(
|
||||
spans []*FlamegraphSpan,
|
||||
selectedSpanID string,
|
||||
isSelectedLevel bool,
|
||||
startTime, endTime uint64,
|
||||
topLatencyCount, bucketCount int,
|
||||
) []*FlamegraphSpan {
|
||||
sorted := make([]*FlamegraphSpan, len(spans))
|
||||
copy(sorted, spans)
|
||||
sort.Slice(sorted, func(i, j int) bool {
|
||||
return sorted[i].DurationNano > sorted[j].DurationNano
|
||||
})
|
||||
|
||||
var sampled []*FlamegraphSpan
|
||||
|
||||
topK := topLatencyCount
|
||||
if topK > len(sorted) {
|
||||
topK = len(sorted)
|
||||
}
|
||||
sampled = append(sampled, sorted[:topK]...)
|
||||
|
||||
if isSelectedLevel {
|
||||
for _, span := range sorted {
|
||||
if span.SpanID == selectedSpanID {
|
||||
sampled = append(sampled, span)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bucketSize := (endTime - startTime) / uint64(bucketCount)
|
||||
if bucketSize == 0 {
|
||||
bucketSize = 1
|
||||
}
|
||||
buckets := make([][]*FlamegraphSpan, bucketCount)
|
||||
for _, span := range sorted {
|
||||
if span.Timestamp < startTime || span.Timestamp > endTime {
|
||||
continue
|
||||
}
|
||||
idx := int((span.Timestamp - startTime) / bucketSize)
|
||||
if idx < 0 {
|
||||
idx = 0
|
||||
} else if idx >= bucketCount {
|
||||
idx = bucketCount - 1
|
||||
}
|
||||
buckets[idx] = append(buckets[idx], span)
|
||||
}
|
||||
for i := range buckets {
|
||||
if len(buckets[i]) > 2 {
|
||||
buckets[i] = buckets[i][:2]
|
||||
}
|
||||
}
|
||||
for _, bucket := range buckets {
|
||||
sampled = append(sampled, bucket...)
|
||||
}
|
||||
|
||||
return sampled
|
||||
}
|
||||
|
||||
func flamegraphSpanIndex(spans []*FlamegraphSpan, spanID string) int {
|
||||
for i, s := range spans {
|
||||
if s != nil && s.SpanID == spanID {
|
||||
return i
|
||||
}
|
||||
}
|
||||
return -1
|
||||
}
|
||||
@@ -156,6 +156,18 @@ func (item *MinimalSpan) ToWaterfallSpan(traceID string) *WaterfallSpan {
|
||||
}
|
||||
}
|
||||
|
||||
func (item *MinimalSpan) ToFlamegraphSpan() *FlamegraphSpan {
|
||||
return &FlamegraphSpan{
|
||||
SpanID: item.SpanID,
|
||||
ParentSpanID: item.ParentSpanID,
|
||||
Timestamp: uint64(item.StartTime.UnixNano()),
|
||||
DurationNano: item.DurationNano,
|
||||
HasError: item.HasError,
|
||||
ServiceName: item.ServiceName,
|
||||
Children: make([]*FlamegraphSpan, 0),
|
||||
}
|
||||
}
|
||||
|
||||
// NewMissingWaterfallSpan creates a synthetic placeholder span for a parent that has no recorded data.
|
||||
func NewMissingWaterfallSpan(spanID, traceID string, timeUnixNano, durationNano uint64) *WaterfallSpan {
|
||||
return &WaterfallSpan{
|
||||
|
||||
3
tests/fixtures/querier.py
vendored
3
tests/fixtures/querier.py
vendored
@@ -72,7 +72,6 @@ class TraceOperatorQuery:
|
||||
return_spans_from: str | None = None
|
||||
limit: int | None = None
|
||||
order: list[OrderBy] | None = None
|
||||
select_fields: list[TelemetryFieldKey] | None = None
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
spec: dict[str, Any] = {
|
||||
@@ -85,8 +84,6 @@ class TraceOperatorQuery:
|
||||
spec["limit"] = self.limit
|
||||
if self.order:
|
||||
spec["order"] = [o.to_dict() if hasattr(o, "to_dict") else o for o in self.order]
|
||||
if self.select_fields:
|
||||
spec["selectFields"] = [f.to_dict() for f in self.select_fields]
|
||||
return {"type": "builder_trace_operator", "spec": spec}
|
||||
|
||||
|
||||
|
||||
@@ -1,436 +0,0 @@
|
||||
"""
|
||||
Integration tests for TraceOperatorQuery (builder_trace_operator) through the
|
||||
/api/v5/query_range endpoint.
|
||||
|
||||
Covers:
|
||||
1. Order-by variants (A -> B, A => B) with returnSpansFrom="A".
|
||||
Guards against the NOT_FOUND_COLUMN_IN_BLOCK regression where ordering by a
|
||||
column absent from an outer SELECT caused a query failure.
|
||||
2. Expression operators (=>, ->, &&, ||, A NOT B) with and without returnSpansFrom.
|
||||
|
||||
returnSpansFrom semantics
|
||||
--------------------------
|
||||
returnSpansFrom="" (default)
|
||||
The final rows come from the expression's root CTE. Only spans that
|
||||
directly satisfy the structural predicate are returned.
|
||||
|
||||
returnSpansFrom="A"
|
||||
The expression is still evaluated in full (the structural relationship
|
||||
must hold), but the final rows are drawn from the A sub-query CTE,
|
||||
filtered to traces that appeared in the expression result. Concretely:
|
||||
the query returns every A span whose trace_id belongs to a trace that
|
||||
matched the expression.
|
||||
"""
|
||||
|
||||
from collections.abc import Callable
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from http import HTTPStatus
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
|
||||
from fixtures import types
|
||||
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
|
||||
from fixtures.querier import get_rows
|
||||
from fixtures.traces import TraceIdGenerator, Traces, TracesKind, TracesStatusCode
|
||||
|
||||
|
||||
def _names(response: requests.Response) -> set:
|
||||
return {r["data"]["name"] for r in get_rows(response)}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Dataset — 4 traces using real OTel semantic-convention attributes
|
||||
#
|
||||
# Filter A = "http.method EXISTS" (HTTP entry-point spans)
|
||||
# Filter B = "db.system = 'redis'" (direct Redis cache calls)
|
||||
# / "db.system = 'postgresql'" (deeper DB queries, for indirect tests)
|
||||
# / "messaging.system = 'kafka'" (async consumer, for OR tests)
|
||||
#
|
||||
# T1 checkout-svc [SERVER] POST /checkout (5s) ← structural root, http.method=POST
|
||||
# ├─ proxy-svc [SERVER] api-proxy (3s) ← http.method=POST; no db children
|
||||
# └─ [CLIENT] lookup-cart (redis)
|
||||
# └─ [CLIENT] check-inventory (postgresql)
|
||||
#
|
||||
# T2 catalog-svc [SERVER] GET /catalog (1s) ← http.method=GET
|
||||
# └─ [CLIENT] fetch-catalog (redis)
|
||||
# └─ [CLIENT] read-cache (postgresql)
|
||||
#
|
||||
# T3 standalone-svc [SERVER] standalone-server ← http.method=POST; no db/cache children
|
||||
# T4 isolated-svc [CONSUMER] isolated-worker ← messaging.system=kafka; no http.method → not in A
|
||||
#
|
||||
# T1 has TWO spans matching filter A (http.method EXISTS); returnSpansFrom changes what is returned:
|
||||
# default → only spans that directly satisfy the structural predicate
|
||||
# return_A → all matching A spans from traces where the predicate held
|
||||
#
|
||||
# Expression truth table:
|
||||
# A -> B (indirect) A=http.method EXISTS B=db.system='postgresql' T1✓ T2✓ T3✗ T4✗
|
||||
# A => B (direct) A=http.method EXISTS B=db.system='redis' T1✓ T2✓ T3✗ T4✗
|
||||
# A && B A=http.method EXISTS B=db.system='redis' T1✓ T2✓ T3✗ T4✗
|
||||
# A || B A=http.method EXISTS B=messaging.system='kafka' T1✓ T2✓ T3✓ T4✓
|
||||
# A NOT B A=http.method EXISTS B=db.system='redis' T1✗ T2✗ T3✓ T4✗
|
||||
#
|
||||
# Order-by cases (all use returnSpansFrom=A, 3 rows expected):
|
||||
# ob.indirect A->B order http.method DESC → POST(checkout+proxy), GET(catalog)
|
||||
# ob.duration A=>B order duration_nano DESC → POST/checkout(5s), api-proxy(3s), GET/catalog(1s)
|
||||
# ob.select A=>B order http.method DESC → POST, POST, GET
|
||||
# ============================================================================
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"case",
|
||||
[
|
||||
# ── Order-by: http.method DESC, NOT in selectFields ──────────────────────
|
||||
# Guards against NOT_FOUND_COLUMN_IN_BLOCK: ordering by a column absent from
|
||||
# the outer SELECT used to cause a ClickHouse query failure.
|
||||
# returnSpansFrom="A" returns all T1 http.method spans: POST /checkout and api-proxy
|
||||
# (both http.method="POST", services checkout-svc and proxy-svc), plus
|
||||
# GET /catalog (http.method="GET") from T2.
|
||||
# The two POST spans are tied so their relative order is undefined; catalog-svc
|
||||
# (GET) is guaranteed to sort last.
|
||||
pytest.param(
|
||||
{
|
||||
"filter_a": "http.method EXISTS",
|
||||
"filter_b": "db.system = 'postgresql'",
|
||||
"expression": "A -> B",
|
||||
"return_spans_from": "A",
|
||||
"select_fields": [{"name": "service.name", "fieldDataType": "string", "fieldContext": "resource"}],
|
||||
"order": [{"key": {"name": "http.method", "fieldDataType": "string", "fieldContext": "attribute"}, "direction": "desc"}],
|
||||
"validate": lambda r: len(get_rows(r)) == 3 and {get_rows(r)[0]["data"]["service.name"], get_rows(r)[1]["data"]["service.name"]} == {"checkout-svc", "proxy-svc"} and get_rows(r)[2]["data"]["service.name"] == "catalog-svc",
|
||||
},
|
||||
id="ob.indirect.http_method_not_in_select",
|
||||
),
|
||||
# ── Order-by: duration_nano DESC, core span field ─────────────────────────
|
||||
# returnSpansFrom="A" includes api-proxy (3 s) in T1's result.
|
||||
# Order: POST /checkout (5 s) > api-proxy (3 s) > GET /catalog (1 s).
|
||||
pytest.param(
|
||||
{
|
||||
"filter_a": "http.method EXISTS",
|
||||
"filter_b": "db.system = 'redis'",
|
||||
"expression": "A => B",
|
||||
"return_spans_from": "A",
|
||||
"order": [{"key": {"name": "duration_nano", "fieldContext": "span"}, "direction": "desc"}],
|
||||
"validate": lambda r: len(get_rows(r)) == 3 and get_rows(r)[0]["data"]["name"] == "POST /checkout" and get_rows(r)[1]["data"]["name"] == "api-proxy" and get_rows(r)[2]["data"]["name"] == "GET /catalog",
|
||||
},
|
||||
id="ob.duration.duration_nano_desc",
|
||||
),
|
||||
# ── Order-by: http.method DESC, IS in selectFields ────────────────────────
|
||||
# http.method is selected so it appears in each result row.
|
||||
# Both POST /checkout and api-proxy carry http.method="POST"; their relative
|
||||
# order is undefined. GET /catalog ("GET") always sorts last.
|
||||
pytest.param(
|
||||
{
|
||||
"filter_a": "http.method EXISTS",
|
||||
"filter_b": "db.system = 'redis'",
|
||||
"expression": "A => B",
|
||||
"return_spans_from": "A",
|
||||
"select_fields": [{"name": "http.method", "fieldDataType": "string", "fieldContext": "attribute"}],
|
||||
"order": [{"key": {"name": "http.method", "fieldDataType": "string", "fieldContext": "attribute"}, "direction": "desc"}],
|
||||
"validate": lambda r: len(get_rows(r)) == 3 and get_rows(r)[0]["data"]["http.method"] == "POST" and get_rows(r)[1]["data"]["http.method"] == "POST" and get_rows(r)[2]["data"]["http.method"] == "GET",
|
||||
},
|
||||
id="ob.select.http_method_in_select",
|
||||
),
|
||||
# ── A => B (direct child), returnSpansFrom="" ─────────────────────────────
|
||||
# POST /checkout directly parents lookup-cart (redis); api-proxy has no redis child.
|
||||
# T3 does not match (no redis descendant). Default returns only the satisfying A spans.
|
||||
pytest.param(
|
||||
{
|
||||
"filter_a": "http.method EXISTS",
|
||||
"filter_b": "db.system = 'redis'",
|
||||
"expression": "A => B",
|
||||
"return_spans_from": "",
|
||||
"validate": lambda r: len(get_rows(r)) == 2 and _names(r) == {"POST /checkout", "GET /catalog"},
|
||||
},
|
||||
id="ex.direct_child.default",
|
||||
),
|
||||
# ── A => B (direct child), returnSpansFrom="A" ────────────────────────────
|
||||
# T1 matches; return_A pulls all T1 http.method spans → api-proxy is included too.
|
||||
pytest.param(
|
||||
{
|
||||
"filter_a": "http.method EXISTS",
|
||||
"filter_b": "db.system = 'redis'",
|
||||
"expression": "A => B",
|
||||
"return_spans_from": "A",
|
||||
"validate": lambda r: len(get_rows(r)) == 3 and _names(r) == {"POST /checkout", "GET /catalog", "api-proxy"},
|
||||
},
|
||||
id="ex.direct_child.return_A",
|
||||
),
|
||||
# ── A -> B (indirect descendant), returnSpansFrom="" ──────────────────────
|
||||
# POST /checkout is an ancestor of check-inventory (postgresql) via lookup-cart.
|
||||
# api-proxy has no postgresql descendants. T3 has no postgresql descendant.
|
||||
pytest.param(
|
||||
{
|
||||
"filter_a": "http.method EXISTS",
|
||||
"filter_b": "db.system = 'postgresql'",
|
||||
"expression": "A -> B",
|
||||
"return_spans_from": "",
|
||||
"validate": lambda r: len(get_rows(r)) == 2 and _names(r) == {"POST /checkout", "GET /catalog"},
|
||||
},
|
||||
id="ex.indirect_descendant.default",
|
||||
),
|
||||
# ── A -> B (indirect descendant), returnSpansFrom="A" ────────────────────
|
||||
# T1 matches; return_A pulls all T1 http.method spans → api-proxy is included too.
|
||||
pytest.param(
|
||||
{
|
||||
"filter_a": "http.method EXISTS",
|
||||
"filter_b": "db.system = 'postgresql'",
|
||||
"expression": "A -> B",
|
||||
"return_spans_from": "A",
|
||||
"validate": lambda r: len(get_rows(r)) == 3 and _names(r) == {"POST /checkout", "GET /catalog", "api-proxy"},
|
||||
},
|
||||
id="ex.indirect_descendant.return_A",
|
||||
),
|
||||
# ── A && B (both present in same trace), returnSpansFrom="" ───────────────
|
||||
# T1 and T2 match (each trace has http.method spans AND redis spans); T3 does not.
|
||||
# A && B returns all A spans from matching traces — api-proxy is included
|
||||
# because it shares T1's trace_id with POST /checkout.
|
||||
# (return_A produces the same set by definition; no separate case needed.)
|
||||
pytest.param(
|
||||
{
|
||||
"filter_a": "http.method EXISTS",
|
||||
"filter_b": "db.system = 'redis'",
|
||||
"expression": "A && B",
|
||||
"return_spans_from": "",
|
||||
"validate": lambda r: len(get_rows(r)) == 3 and _names(r) == {"POST /checkout", "GET /catalog", "api-proxy"},
|
||||
},
|
||||
id="ex.and.default",
|
||||
),
|
||||
# ── A || B (either present), returnSpansFrom="" ───────────────────────────
|
||||
# T1, T2, T3 match via A (http.method spans); T4 matches via B (kafka span).
|
||||
# Default returns UNION of all A and B spans from matching traces.
|
||||
pytest.param(
|
||||
{
|
||||
"filter_a": "http.method EXISTS",
|
||||
"filter_b": "messaging.system = 'kafka'",
|
||||
"expression": "A || B",
|
||||
"return_spans_from": "",
|
||||
"validate": lambda r: len(get_rows(r)) == 5 and _names(r) == {"POST /checkout", "GET /catalog", "api-proxy", "standalone-server", "isolated-worker"},
|
||||
},
|
||||
id="ex.or.default",
|
||||
),
|
||||
# ── A || B, returnSpansFrom="A" ───────────────────────────────────────────
|
||||
# All four traces match; only A spans are returned.
|
||||
# T4 has no http.method span, so it contributes nothing to A.
|
||||
pytest.param(
|
||||
{
|
||||
"filter_a": "http.method EXISTS",
|
||||
"filter_b": "messaging.system = 'kafka'",
|
||||
"expression": "A || B",
|
||||
"return_spans_from": "A",
|
||||
"validate": lambda r: len(get_rows(r)) == 4 and _names(r) == {"POST /checkout", "GET /catalog", "api-proxy", "standalone-server"},
|
||||
},
|
||||
id="ex.or.return_A",
|
||||
),
|
||||
# ── A NOT B (A present, B absent from trace), returnSpansFrom="" ─────────
|
||||
# T1 and T2 do NOT match: their traces contain redis spans.
|
||||
# T3 MATCHES: has http.method span but no redis span in its trace.
|
||||
# T4 has no http.method span, so it cannot contribute an A span.
|
||||
# (return_A produces the same set; no separate case needed.)
|
||||
pytest.param(
|
||||
{
|
||||
"filter_a": "http.method EXISTS",
|
||||
"filter_b": "db.system = 'redis'",
|
||||
"expression": "A NOT B",
|
||||
"return_spans_from": "",
|
||||
"validate": lambda r: len(get_rows(r)) == 1 and _names(r) == {"standalone-server"},
|
||||
},
|
||||
id="ex.not.default",
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_trace_operator(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_traces: Callable[[list[Traces]], None],
|
||||
case: dict,
|
||||
) -> None:
|
||||
t1_trace_id = TraceIdGenerator.trace_id()
|
||||
t1_checkout_span_id = TraceIdGenerator.span_id() # POST /checkout — structural root of T1
|
||||
t1_child_span_id = TraceIdGenerator.span_id() # lookup-cart
|
||||
|
||||
t2_trace_id = TraceIdGenerator.trace_id()
|
||||
t2_root_span_id = TraceIdGenerator.span_id()
|
||||
t2_child_span_id = TraceIdGenerator.span_id()
|
||||
|
||||
now = datetime.now(tz=UTC).replace(second=0, microsecond=0)
|
||||
|
||||
insert_traces(
|
||||
[
|
||||
# T1 — two http.method spans in the same trace, modelling a real proxy+service pair.
|
||||
# POST /checkout (checkout-svc) is the root (parent_span_id="").
|
||||
# api-proxy (proxy-svc) is a structural child of POST /checkout but also has
|
||||
# http.method set, so it matches filter A alongside POST /checkout.
|
||||
# Both carry http.method="POST" — they differ only in service.name.
|
||||
# This is what makes returnSpansFrom="" and returnSpansFrom="A" distinct:
|
||||
# default → only POST /checkout satisfies A => B or A -> B
|
||||
# return_A → api-proxy is pulled in too (all A spans from the matching trace)
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=10),
|
||||
duration=timedelta(seconds=5),
|
||||
trace_id=t1_trace_id,
|
||||
span_id=t1_checkout_span_id,
|
||||
parent_span_id="",
|
||||
name="POST /checkout",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={"service.name": "checkout-svc"},
|
||||
attributes={"http.method": "POST", "http.route": "/checkout"},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=10),
|
||||
duration=timedelta(seconds=3),
|
||||
trace_id=t1_trace_id,
|
||||
span_id=TraceIdGenerator.span_id(),
|
||||
parent_span_id=t1_checkout_span_id,
|
||||
name="api-proxy",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={"service.name": "proxy-svc"},
|
||||
attributes={"http.method": "POST", "http.route": "/proxy"},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=9),
|
||||
duration=timedelta(seconds=2),
|
||||
trace_id=t1_trace_id,
|
||||
span_id=t1_child_span_id,
|
||||
parent_span_id=t1_checkout_span_id,
|
||||
name="lookup-cart",
|
||||
kind=TracesKind.SPAN_KIND_CLIENT,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={"service.name": "checkout-svc"},
|
||||
attributes={"db.system": "redis", "db.operation": "GET"},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=8),
|
||||
duration=timedelta(seconds=1),
|
||||
trace_id=t1_trace_id,
|
||||
span_id=TraceIdGenerator.span_id(),
|
||||
parent_span_id=t1_child_span_id,
|
||||
name="check-inventory",
|
||||
kind=TracesKind.SPAN_KIND_CLIENT,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={"service.name": "checkout-svc"},
|
||||
attributes={"db.system": "postgresql", "db.operation": "SELECT"},
|
||||
),
|
||||
# T2 — catalog-svc: GET /catalog (1 s root) → fetch-catalog (redis) → read-cache (postgresql)
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=10),
|
||||
duration=timedelta(seconds=1),
|
||||
trace_id=t2_trace_id,
|
||||
span_id=t2_root_span_id,
|
||||
parent_span_id="",
|
||||
name="GET /catalog",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={"service.name": "catalog-svc"},
|
||||
attributes={"http.method": "GET", "http.route": "/catalog"},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=9),
|
||||
duration=timedelta(seconds=2),
|
||||
trace_id=t2_trace_id,
|
||||
span_id=t2_child_span_id,
|
||||
parent_span_id=t2_root_span_id,
|
||||
name="fetch-catalog",
|
||||
kind=TracesKind.SPAN_KIND_CLIENT,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={"service.name": "catalog-svc"},
|
||||
attributes={"db.system": "redis", "db.operation": "GET"},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=8),
|
||||
duration=timedelta(seconds=1),
|
||||
trace_id=t2_trace_id,
|
||||
span_id=TraceIdGenerator.span_id(),
|
||||
parent_span_id=t2_child_span_id,
|
||||
name="read-cache",
|
||||
kind=TracesKind.SPAN_KIND_CLIENT,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={"service.name": "catalog-svc"},
|
||||
attributes={"db.system": "postgresql", "db.operation": "SELECT"},
|
||||
),
|
||||
# T3 — standalone-svc: HTTP entry span with no downstream calls.
|
||||
# Fails A => B / A -> B / A && B (no redis/postgresql descendant).
|
||||
# Matches A NOT B (has http.method span, no redis child).
|
||||
# Contributes to A || B via A.
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=10),
|
||||
duration=timedelta(seconds=3),
|
||||
trace_id=TraceIdGenerator.trace_id(),
|
||||
span_id=TraceIdGenerator.span_id(),
|
||||
parent_span_id="",
|
||||
name="standalone-server",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={"service.name": "standalone-svc"},
|
||||
attributes={"http.method": "POST", "http.route": "/"},
|
||||
),
|
||||
# T4 — isolated-svc: Kafka consumer; no http.method so it never matches filter A.
|
||||
# Used only as the B side of A || B to prove the OR operator matches via B.
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=10),
|
||||
duration=timedelta(seconds=1),
|
||||
trace_id=TraceIdGenerator.trace_id(),
|
||||
span_id=TraceIdGenerator.span_id(),
|
||||
parent_span_id="",
|
||||
name="isolated-worker",
|
||||
kind=TracesKind.SPAN_KIND_CONSUMER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={"service.name": "isolated-svc"},
|
||||
attributes={"messaging.system": "kafka", "messaging.destination": "orders"},
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
|
||||
end_ms = int(now.timestamp() * 1000)
|
||||
|
||||
spec: dict = {
|
||||
"name": "C",
|
||||
"expression": case["expression"],
|
||||
"returnSpansFrom": case.get("return_spans_from", ""),
|
||||
"limit": case.get("limit", 100),
|
||||
}
|
||||
if case.get("select_fields"):
|
||||
spec["selectFields"] = case["select_fields"]
|
||||
if case.get("order"):
|
||||
spec["order"] = case["order"]
|
||||
|
||||
response = requests.post(
|
||||
signoz.self.host_configs["8080"].get("/api/v5/query_range"),
|
||||
timeout=5,
|
||||
headers={"authorization": f"Bearer {token}"},
|
||||
json={
|
||||
"schemaVersion": "v1",
|
||||
"start": start_ms,
|
||||
"end": end_ms,
|
||||
"requestType": "raw",
|
||||
"compositeQuery": {
|
||||
"queries": [
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {"name": "A", "signal": "traces", "filter": {"expression": case["filter_a"]}, "limit": 100},
|
||||
},
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {"name": "B", "signal": "traces", "filter": {"expression": case["filter_b"]}, "limit": 100},
|
||||
},
|
||||
{"type": "builder_trace_operator", "spec": spec},
|
||||
]
|
||||
},
|
||||
"formatOptions": {"formatTableResultForUI": False, "fillGaps": False},
|
||||
},
|
||||
)
|
||||
assert response.status_code == HTTPStatus.OK, f"HTTP {response.status_code}: {response.text}"
|
||||
assert case["validate"](response), f"validation failed: {response.json()}"
|
||||
Reference in New Issue
Block a user