Compare commits

...

33 Commits

Author SHA1 Message Date
Piyush Singariya
c1812ebf29 Merge branch 'main' into traceop-returnspansfrom 2026-05-25 20:15:41 +05:30
Piyush Singariya
331fe3bda8 chore: fix the comments 2026-05-25 20:15:27 +05:30
Piyush Singariya
8f1113c528 fix: use pytest param 2026-05-25 20:12:06 +05:30
Piyush Singariya
b347ce6a28 fix: tests are better 2026-05-25 20:07:16 +05:30
Piyush Singariya
d57281f0bb test: wip rewrite integration tests better 2026-05-25 11:27:49 +05:30
Piyush Singariya
08008ad813 Merge branch 'main' into traceop-returnspansfrom 2026-05-20 15:52:13 +05:30
Piyush Singariya
1454a96d4d fix: replace JOIN with IN 2026-05-20 15:51:15 +05:30
Piyush Singariya
4c02ee28de fix: tests rewritten 2026-05-20 10:41:55 +05:30
Piyush Singariya
e8befce898 Merge branch 'main' into traceop-returnspansfrom 2026-05-19 13:44:37 +05:30
Piyush Singariya
ec2bcbcbdc fix: integration tests 2026-05-19 13:44:06 +05:30
Piyush Singariya
370db055b3 chore: fmt py 2026-05-19 11:23:19 +05:30
Piyush Singariya
d197212918 chore: fmt py 2026-05-19 11:18:06 +05:30
Piyush Singariya
96b6d8646f chore: tests updated 2026-05-19 11:16:59 +05:30
Piyush Singariya
0aa6165b18 Merge branch 'main' into traceop-returnspansfrom 2026-05-19 11:12:59 +05:30
Piyush Singariya
dafa81f3b4 Merge branch 'main' into traceop-returnspansfrom 2026-05-12 21:03:16 +05:30
Piyush Singariya
a992a13f56 revert: unused test 2026-05-12 20:58:17 +05:30
Piyush Singariya
79b36abbd7 chore: comments and test 2026-05-12 20:57:00 +05:30
Piyush Singariya
181c307d1a Merge branch 'main' into traceop-returnspansfrom 2026-05-12 18:14:09 +05:30
Piyush Singariya
becdd4d3b4 revert: build list query 2026-05-12 18:11:35 +05:30
Piyush Singariya
de0311201a revert: double select 2026-05-12 17:15:41 +05:30
Piyush Singariya
1804bfe802 fix: return spans from 2026-05-12 16:53:31 +05:30
Piyush Singariya
357444c94e Merge branch 'main' into traceop 2026-05-11 20:53:51 +05:30
Piyush Singariya
a8598f3bfa fix: alias all core columns 2026-05-11 20:53:09 +05:30
Piyush Singariya
bca71f9a33 chore: remove comments 2026-05-11 16:04:32 +05:30
Piyush Singariya
c93660357d chore: fmt python 2026-05-11 16:02:18 +05:30
Piyush Singariya
5651e3b7a8 Merge branch 'main' into traceop 2026-05-11 14:28:58 +05:30
Piyush Singariya
cf2cfbc7d4 fix: remove specific of timestamp 2026-05-11 14:27:01 +05:30
Piyush Singariya
a969c38224 chore: fmtlint 2026-05-07 13:53:12 +05:30
Piyush Singariya
b892a0f0a5 chore: file rename 2026-05-07 13:51:22 +05:30
Piyush Singariya
4d47762eba chore: separate e2e test file 2026-05-07 13:50:11 +05:30
Piyush Singariya
77396a0bb3 Merge branch 'main' into traceop 2026-05-07 12:56:59 +05:30
Piyush Singariya
28c05e1bab Merge branch 'main' into traceop 2026-05-04 14:27:19 +05:30
Piyush Singariya
2b9e383994 fix: trace raw export e2e 2026-04-30 15:25:43 +05:30
4 changed files with 557 additions and 2 deletions

View File

@@ -70,12 +70,31 @@ func (b *traceOperatorCTEBuilder) build(ctx context.Context, requestType qbtypes
selectFromCTE := rootCTEName
if b.operator.ReturnSpansFrom != "" {
selectFromCTE = b.queryToCTEName[b.operator.ReturnSpansFrom]
if selectFromCTE == "" {
sourceQueryCTE := b.queryToCTEName[b.operator.ReturnSpansFrom]
if sourceQueryCTE == "" {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput,
"returnSpansFrom references query '%s' which has no corresponding CTE",
b.operator.ReturnSpansFrom)
}
filteredCTEName := fmt.Sprintf("__return_from_%s", b.operator.ReturnSpansFrom)
// rootCTEName holds one row per matching *span*, not per *trace*, so it can
// contain many rows for the same trace_id. DISTINCT de-duplicates that set
// before ClickHouse builds the hash table for the IN check, keeping memory
// usage proportional to the number of distinct traces rather than spans.
matchingTracedSB := sqlbuilder.NewSelectBuilder()
matchingTracedSB.Select("DISTINCT trace_id")
matchingTracedSB.From(rootCTEName)
matchedTracesSQL, matchedTracesArgs := matchingTracedSB.BuildWithFlavor(sqlbuilder.ClickHouse)
filteredSB := sqlbuilder.NewSelectBuilder()
filteredSB.Select("*")
filteredSB.From(sourceQueryCTE)
filteredSB.Where(fmt.Sprintf("trace_id IN (%s)", matchedTracesSQL))
filteredSQL, filteredArgs := filteredSB.BuildWithFlavor(sqlbuilder.ClickHouse, matchedTracesArgs...)
b.addCTE(filteredCTEName, filteredSQL, filteredArgs, []string{sourceQueryCTE, rootCTEName})
selectFromCTE = filteredCTEName
}
finalStmt, err := b.buildFinalQuery(ctx, selectFromCTE, requestType)

View File

@@ -385,6 +385,82 @@ func TestTraceOperatorStatementBuilder(t *testing.T) {
},
expectedErr: nil,
},
{
name: "returnSpansFrom B: A -> B return B spans filtered by operator",
requestType: qbtypes.RequestTypeRaw,
operator: qbtypes.QueryBuilderTraceOperator{
Expression: "A -> B",
ReturnSpansFrom: "B",
Limit: 10,
},
compositeQuery: &qbtypes.CompositeQuery{
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Name: "A",
Signal: telemetrytypes.SignalTraces,
Filter: &qbtypes.Filter{Expression: "service.name = 'gateway'"},
},
},
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Name: "B",
Signal: telemetrytypes.SignalTraces,
Filter: &qbtypes.Filter{Expression: "service.name = 'database'"},
},
},
},
},
expected: qbtypes.Statement{
Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), A_INDIR_DESC_B AS (WITH RECURSIVE up AS (SELECT d.trace_id, d.span_id, d.parent_span_id, 0 AS depth FROM B AS d UNION ALL SELECT p.trace_id, p.span_id, p.parent_span_id, up.depth + 1 FROM all_spans AS p JOIN up ON p.trace_id = up.trace_id AND p.span_id = up.parent_span_id WHERE up.depth < 100) SELECT DISTINCT a.* FROM A AS a GLOBAL INNER JOIN (SELECT DISTINCT trace_id, span_id FROM up WHERE depth > 0 ) AS ancestors ON ancestors.trace_id = a.trace_id AND ancestors.span_id = a.span_id), __return_from_B AS (SELECT * FROM B WHERE trace_id IN (SELECT DISTINCT trace_id FROM A_INDIR_DESC_B)) SELECT timestamp, trace_id, span_id, name, duration_nano, parent_span_id FROM __return_from_B ORDER BY timestamp DESC LIMIT ? SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000",
Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "gateway", "%service.name%", "%service.name\":\"gateway%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "database", "%service.name%", "%service.name\":\"database%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
},
expectedErr: nil,
},
{
name: "returnSpansFrom C: (A -> B) && C return C spans filtered by operator",
requestType: qbtypes.RequestTypeRaw,
operator: qbtypes.QueryBuilderTraceOperator{
Expression: "(A -> B) && C",
ReturnSpansFrom: "C",
Limit: 10,
},
compositeQuery: &qbtypes.CompositeQuery{
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Name: "A",
Signal: telemetrytypes.SignalTraces,
Filter: &qbtypes.Filter{Expression: "service.name = 'gateway'"},
},
},
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Name: "B",
Signal: telemetrytypes.SignalTraces,
Filter: &qbtypes.Filter{Expression: "service.name = 'database'"},
},
},
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Name: "C",
Signal: telemetrytypes.SignalTraces,
Filter: &qbtypes.Filter{Expression: "service.name = 'auth'"},
},
},
},
},
expected: qbtypes.Statement{
Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), A_INDIR_DESC_B AS (WITH RECURSIVE up AS (SELECT d.trace_id, d.span_id, d.parent_span_id, 0 AS depth FROM B AS d UNION ALL SELECT p.trace_id, p.span_id, p.parent_span_id, up.depth + 1 FROM all_spans AS p JOIN up ON p.trace_id = up.trace_id AND p.span_id = up.parent_span_id WHERE up.depth < 100) SELECT DISTINCT a.* FROM A AS a GLOBAL INNER JOIN (SELECT DISTINCT trace_id, span_id FROM up WHERE depth > 0 ) AS ancestors ON ancestors.trace_id = a.trace_id AND ancestors.span_id = a.span_id), __resource_filter_C AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), C AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_C) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), A_INDIR_DESC_B_AND_C AS (SELECT l.* FROM A_INDIR_DESC_B AS l INNER JOIN C AS r ON l.trace_id = r.trace_id), __return_from_C AS (SELECT * FROM C WHERE trace_id IN (SELECT DISTINCT trace_id FROM A_INDIR_DESC_B_AND_C)) SELECT timestamp, trace_id, span_id, name, duration_nano, parent_span_id FROM __return_from_C ORDER BY timestamp DESC LIMIT ? SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000",
Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "gateway", "%service.name%", "%service.name\":\"gateway%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "database", "%service.name%", "%service.name\":\"database%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "auth", "%service.name%", "%service.name\":\"auth%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
},
expectedErr: nil,
},
}
fm := NewFieldMapper()

View File

@@ -72,6 +72,7 @@ class TraceOperatorQuery:
return_spans_from: str | None = None
limit: int | None = None
order: list[OrderBy] | None = None
select_fields: list[TelemetryFieldKey] | None = None
def to_dict(self) -> dict:
spec: dict[str, Any] = {
@@ -84,6 +85,8 @@ class TraceOperatorQuery:
spec["limit"] = self.limit
if self.order:
spec["order"] = [o.to_dict() if hasattr(o, "to_dict") else o for o in self.order]
if self.select_fields:
spec["selectFields"] = [f.to_dict() for f in self.select_fields]
return {"type": "builder_trace_operator", "spec": spec}

View File

@@ -0,0 +1,457 @@
"""
Integration tests for TraceOperatorQuery (builder_trace_operator) through the
/api/v5/query_range endpoint.
Covers:
1. Order-by variants (A -> B, A => B) with returnSpansFrom="A".
Guards against the NOT_FOUND_COLUMN_IN_BLOCK regression where ordering by a
column absent from an outer SELECT caused a query failure.
2. Expression operators (=>, ->, &&, ||, A NOT B) with and without returnSpansFrom.
returnSpansFrom semantics
--------------------------
returnSpansFrom="" (default)
The final rows come from the expression's root CTE. Only spans that
directly satisfy the structural predicate are returned.
returnSpansFrom="A"
The expression is still evaluated in full (the structural relationship
must hold), but the final rows are drawn from the A sub-query CTE,
filtered to traces that appeared in the expression result. Concretely:
the query returns every A span whose trace_id belongs to a trace that
matched the expression.
"""
from collections.abc import Callable
from datetime import UTC, datetime, timedelta
from http import HTTPStatus
import pytest
import requests
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.traces import TraceIdGenerator, Traces, TracesKind, TracesStatusCode
def _rows(response: requests.Response) -> list:
return response.json()["data"]["data"]["results"][0].get("rows") or []
def _names(response: requests.Response) -> set:
return {r["data"]["name"] for r in _rows(response)}
def _run_case(signoz: types.SigNoz, token: str, start_ms: int, end_ms: int, case: dict) -> None:
spec: dict = {
"name": "C",
"expression": case["expression"],
"returnSpansFrom": case.get("return_spans_from", ""),
"limit": case.get("limit", 100),
}
if case.get("select_fields"):
spec["selectFields"] = case["select_fields"]
if case.get("order"):
spec["order"] = case["order"]
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v5/query_range"),
timeout=5,
headers={"authorization": f"Bearer {token}"},
json={
"schemaVersion": "v1",
"start": start_ms,
"end": end_ms,
"requestType": "raw",
"compositeQuery": {
"queries": [
{
"type": "builder_query",
"spec": {"name": "A", "signal": "traces", "filter": {"expression": case["filter_a"]}, "limit": 100},
},
{
"type": "builder_query",
"spec": {"name": "B", "signal": "traces", "filter": {"expression": case["filter_b"]}, "limit": 100},
},
{"type": "builder_trace_operator", "spec": spec},
]
},
"formatOptions": {"formatTableResultForUI": False, "fillGaps": False},
},
)
assert response.status_code == HTTPStatus.OK, f"HTTP {response.status_code}: {response.text}"
assert case["validate"](response), f"validation failed: {response.json()}"
# ============================================================================
# Dataset — 4 traces using real OTel semantic-convention attributes
#
# Filter A = "http.method EXISTS" (HTTP entry-point spans)
# Filter B = "db.system = 'redis'" (direct Redis cache calls)
# / "db.system = 'postgresql'" (deeper DB queries, for indirect tests)
# / "messaging.system = 'kafka'" (async consumer, for OR tests)
#
# T1 checkout-svc [SERVER] POST /checkout (5s) ← structural root, http.method=POST
# ├─ proxy-svc [SERVER] api-proxy (3s) ← http.method=POST; no db children
# └─ [CLIENT] lookup-cart (redis)
# └─ [CLIENT] check-inventory (postgresql)
#
# T2 catalog-svc [SERVER] GET /catalog (1s) ← http.method=GET
# └─ [CLIENT] fetch-catalog (redis)
# └─ [CLIENT] read-cache (postgresql)
#
# T3 standalone-svc [SERVER] standalone-server ← http.method=POST; no db/cache children
# T4 isolated-svc [CONSUMER] isolated-worker ← messaging.system=kafka; no http.method → not in A
#
# T1 has TWO spans matching filter A (http.method EXISTS); returnSpansFrom changes what is returned:
# default → only spans that directly satisfy the structural predicate
# return_A → all matching A spans from traces where the predicate held
#
# Expression truth table:
# A -> B (indirect) A=http.method EXISTS B=db.system='postgresql' T1✓ T2✓ T3✗ T4✗
# A => B (direct) A=http.method EXISTS B=db.system='redis' T1✓ T2✓ T3✗ T4✗
# A && B A=http.method EXISTS B=db.system='redis' T1✓ T2✓ T3✗ T4✗
# A || B A=http.method EXISTS B=messaging.system='kafka' T1✓ T2✓ T3✓ T4✓
# A NOT B A=http.method EXISTS B=db.system='redis' T1✗ T2✗ T3✓ T4✗
#
# Order-by cases (all use returnSpansFrom=A, 3 rows expected):
# ob.indirect A->B order http.method DESC → POST(checkout+proxy), GET(catalog)
# ob.duration A=>B order duration_nano DESC → POST/checkout(5s), api-proxy(3s), GET/catalog(1s)
# ob.select A=>B order http.method DESC → POST, POST, GET
# ============================================================================
@pytest.mark.parametrize(
"case",
[
# ── Order-by: http.method DESC, NOT in selectFields ──────────────────────
# Guards against NOT_FOUND_COLUMN_IN_BLOCK: ordering by a column absent from
# the outer SELECT used to cause a ClickHouse query failure.
# returnSpansFrom="A" returns all T1 http.method spans: POST /checkout and api-proxy
# (both http.method="POST", services checkout-svc and proxy-svc), plus
# GET /catalog (http.method="GET") from T2.
# The two POST spans are tied so their relative order is undefined; catalog-svc
# (GET) is guaranteed to sort last.
pytest.param(
{
"filter_a": "http.method EXISTS",
"filter_b": "db.system = 'postgresql'",
"expression": "A -> B",
"return_spans_from": "A",
"select_fields": [{"name": "service.name", "fieldDataType": "string", "fieldContext": "resource"}],
"order": [{"key": {"name": "http.method", "fieldDataType": "string", "fieldContext": "attribute"}, "direction": "desc"}],
"validate": lambda r: (
len(_rows(r)) == 3
and {_rows(r)[0]["data"]["service.name"], _rows(r)[1]["data"]["service.name"]} == {"checkout-svc", "proxy-svc"}
and _rows(r)[2]["data"]["service.name"] == "catalog-svc"
),
},
id="ob.indirect.http_method_not_in_select",
),
# ── Order-by: duration_nano DESC, core span field ─────────────────────────
# returnSpansFrom="A" includes api-proxy (3 s) in T1's result.
# Order: POST /checkout (5 s) > api-proxy (3 s) > GET /catalog (1 s).
pytest.param(
{
"filter_a": "http.method EXISTS",
"filter_b": "db.system = 'redis'",
"expression": "A => B",
"return_spans_from": "A",
"order": [{"key": {"name": "duration_nano", "fieldContext": "span"}, "direction": "desc"}],
"validate": lambda r: (
len(_rows(r)) == 3
and _rows(r)[0]["data"]["name"] == "POST /checkout"
and _rows(r)[1]["data"]["name"] == "api-proxy"
and _rows(r)[2]["data"]["name"] == "GET /catalog"
),
},
id="ob.duration.duration_nano_desc",
),
# ── Order-by: http.method DESC, IS in selectFields ────────────────────────
# http.method is selected so it appears in each result row.
# Both POST /checkout and api-proxy carry http.method="POST"; their relative
# order is undefined. GET /catalog ("GET") always sorts last.
pytest.param(
{
"filter_a": "http.method EXISTS",
"filter_b": "db.system = 'redis'",
"expression": "A => B",
"return_spans_from": "A",
"select_fields": [{"name": "http.method", "fieldDataType": "string", "fieldContext": "attribute"}],
"order": [{"key": {"name": "http.method", "fieldDataType": "string", "fieldContext": "attribute"}, "direction": "desc"}],
"validate": lambda r: (
len(_rows(r)) == 3
and _rows(r)[0]["data"]["http.method"] == "POST"
and _rows(r)[1]["data"]["http.method"] == "POST"
and _rows(r)[2]["data"]["http.method"] == "GET"
),
},
id="ob.select.http_method_in_select",
),
# ── A => B (direct child), returnSpansFrom="" ─────────────────────────────
# POST /checkout directly parents lookup-cart (redis); api-proxy has no redis child.
# T3 does not match (no redis descendant). Default returns only the satisfying A spans.
pytest.param(
{
"filter_a": "http.method EXISTS",
"filter_b": "db.system = 'redis'",
"expression": "A => B",
"return_spans_from": "",
"validate": lambda r: _names(r) == {"POST /checkout", "GET /catalog"},
},
id="ex.direct_child.default",
),
# ── A => B (direct child), returnSpansFrom="A" ────────────────────────────
# T1 matches; return_A pulls all T1 http.method spans → api-proxy is included too.
pytest.param(
{
"filter_a": "http.method EXISTS",
"filter_b": "db.system = 'redis'",
"expression": "A => B",
"return_spans_from": "A",
"validate": lambda r: _names(r) == {"POST /checkout", "GET /catalog", "api-proxy"},
},
id="ex.direct_child.return_A",
),
# ── A -> B (indirect descendant), returnSpansFrom="" ──────────────────────
# POST /checkout is an ancestor of check-inventory (postgresql) via lookup-cart.
# api-proxy has no postgresql descendants. T3 has no postgresql descendant.
pytest.param(
{
"filter_a": "http.method EXISTS",
"filter_b": "db.system = 'postgresql'",
"expression": "A -> B",
"return_spans_from": "",
"validate": lambda r: _names(r) == {"POST /checkout", "GET /catalog"},
},
id="ex.indirect_descendant.default",
),
# ── A -> B (indirect descendant), returnSpansFrom="A" ────────────────────
# T1 matches; return_A pulls all T1 http.method spans → api-proxy is included too.
pytest.param(
{
"filter_a": "http.method EXISTS",
"filter_b": "db.system = 'postgresql'",
"expression": "A -> B",
"return_spans_from": "A",
"validate": lambda r: _names(r) == {"POST /checkout", "GET /catalog", "api-proxy"},
},
id="ex.indirect_descendant.return_A",
),
# ── A && B (both present in same trace), returnSpansFrom="" ───────────────
# T1 and T2 match (each trace has http.method spans AND redis spans); T3 does not.
# A && B returns all A spans from matching traces — api-proxy is included
# because it shares T1's trace_id with POST /checkout.
# (return_A produces the same set by definition; no separate case needed.)
pytest.param(
{
"filter_a": "http.method EXISTS",
"filter_b": "db.system = 'redis'",
"expression": "A && B",
"return_spans_from": "",
"validate": lambda r: _names(r) == {"POST /checkout", "GET /catalog", "api-proxy"},
},
id="ex.and.default",
),
# ── A || B (either present), returnSpansFrom="" ───────────────────────────
# T1, T2, T3 match via A (http.method spans); T4 matches via B (kafka span).
# Default returns UNION of all A and B spans from matching traces.
pytest.param(
{
"filter_a": "http.method EXISTS",
"filter_b": "messaging.system = 'kafka'",
"expression": "A || B",
"return_spans_from": "",
"validate": lambda r: _names(r) == {"POST /checkout", "GET /catalog", "api-proxy", "standalone-server", "isolated-worker"},
},
id="ex.or.default",
),
# ── A || B, returnSpansFrom="A" ───────────────────────────────────────────
# All four traces match; only A spans are returned.
# T4 has no http.method span, so it contributes nothing to A.
pytest.param(
{
"filter_a": "http.method EXISTS",
"filter_b": "messaging.system = 'kafka'",
"expression": "A || B",
"return_spans_from": "A",
"validate": lambda r: _names(r) == {"POST /checkout", "GET /catalog", "api-proxy", "standalone-server"},
},
id="ex.or.return_A",
),
# ── A NOT B (A present, B absent from trace), returnSpansFrom="" ─────────
# T1 and T2 do NOT match: their traces contain redis spans.
# T3 MATCHES: has http.method span but no redis span in its trace.
# T4 has no http.method span, so it cannot contribute an A span.
# (return_A produces the same set; no separate case needed.)
pytest.param(
{
"filter_a": "http.method EXISTS",
"filter_b": "db.system = 'redis'",
"expression": "A NOT B",
"return_spans_from": "",
"validate": lambda r: _names(r) == {"standalone-server"},
},
id="ex.not.default",
),
],
)
def test_trace_operator(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[list[Traces]], None],
case: dict,
) -> None:
t1_trace_id = TraceIdGenerator.trace_id()
t1_checkout_span_id = TraceIdGenerator.span_id() # POST /checkout — structural root of T1
t1_child_span_id = TraceIdGenerator.span_id() # lookup-cart
t2_trace_id = TraceIdGenerator.trace_id()
t2_root_span_id = TraceIdGenerator.span_id()
t2_child_span_id = TraceIdGenerator.span_id()
now = datetime.now(tz=UTC).replace(second=0, microsecond=0)
insert_traces(
[
# T1 — two http.method spans in the same trace, modelling a real proxy+service pair.
# POST /checkout (checkout-svc) is the single structural root (parent_span_id="").
# api-proxy (proxy-svc) is a structural child of POST /checkout but also has
# http.method set, so it matches filter A alongside POST /checkout.
# Both carry http.method="POST" — they differ only in service.name.
# This is what makes returnSpansFrom="" and returnSpansFrom="A" distinct:
# default → only POST /checkout satisfies A => B or A -> B
# return_A → api-proxy is pulled in too (all A spans from the matching trace)
Traces(
timestamp=now - timedelta(seconds=10),
duration=timedelta(seconds=5),
trace_id=t1_trace_id,
span_id=t1_checkout_span_id,
parent_span_id="",
name="POST /checkout",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "checkout-svc"},
attributes={"http.method": "POST", "http.route": "/checkout"},
),
Traces(
timestamp=now - timedelta(seconds=10),
duration=timedelta(seconds=3),
trace_id=t1_trace_id,
span_id=TraceIdGenerator.span_id(),
parent_span_id=t1_checkout_span_id,
name="api-proxy",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "proxy-svc"},
attributes={"http.method": "POST", "http.route": "/proxy"},
),
Traces(
timestamp=now - timedelta(seconds=9),
duration=timedelta(seconds=2),
trace_id=t1_trace_id,
span_id=t1_child_span_id,
parent_span_id=t1_checkout_span_id,
name="lookup-cart",
kind=TracesKind.SPAN_KIND_CLIENT,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "checkout-svc"},
attributes={"db.system": "redis", "db.operation": "GET"},
),
Traces(
timestamp=now - timedelta(seconds=8),
duration=timedelta(seconds=1),
trace_id=t1_trace_id,
span_id=TraceIdGenerator.span_id(),
parent_span_id=t1_child_span_id,
name="check-inventory",
kind=TracesKind.SPAN_KIND_CLIENT,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "checkout-svc"},
attributes={"db.system": "postgresql", "db.operation": "SELECT"},
),
# T2 — catalog-svc: GET /catalog (1 s root) → fetch-catalog (redis) → read-cache (postgresql)
Traces(
timestamp=now - timedelta(seconds=10),
duration=timedelta(seconds=1),
trace_id=t2_trace_id,
span_id=t2_root_span_id,
parent_span_id="",
name="GET /catalog",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "catalog-svc"},
attributes={"http.method": "GET", "http.route": "/catalog"},
),
Traces(
timestamp=now - timedelta(seconds=9),
duration=timedelta(seconds=2),
trace_id=t2_trace_id,
span_id=t2_child_span_id,
parent_span_id=t2_root_span_id,
name="fetch-catalog",
kind=TracesKind.SPAN_KIND_CLIENT,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "catalog-svc"},
attributes={"db.system": "redis", "db.operation": "GET"},
),
Traces(
timestamp=now - timedelta(seconds=8),
duration=timedelta(seconds=1),
trace_id=t2_trace_id,
span_id=TraceIdGenerator.span_id(),
parent_span_id=t2_child_span_id,
name="read-cache",
kind=TracesKind.SPAN_KIND_CLIENT,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "catalog-svc"},
attributes={"db.system": "postgresql", "db.operation": "SELECT"},
),
# T3 — standalone-svc: HTTP entry span with no downstream calls.
# Fails A => B / A -> B / A && B (no redis/postgresql descendant).
# Matches A NOT B (has http.method span, no redis child).
# Contributes to A || B via A.
Traces(
timestamp=now - timedelta(seconds=10),
duration=timedelta(seconds=3),
trace_id=TraceIdGenerator.trace_id(),
span_id=TraceIdGenerator.span_id(),
parent_span_id="",
name="standalone-server",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "standalone-svc"},
attributes={"http.method": "POST", "http.route": "/"},
),
# T4 — isolated-svc: Kafka consumer; no http.method so it never matches filter A.
# Used only as the B side of A || B to prove the OR operator matches via B.
Traces(
timestamp=now - timedelta(seconds=10),
duration=timedelta(seconds=1),
trace_id=TraceIdGenerator.trace_id(),
span_id=TraceIdGenerator.span_id(),
parent_span_id="",
name="isolated-worker",
kind=TracesKind.SPAN_KIND_CONSUMER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "isolated-svc"},
attributes={"messaging.system": "kafka", "messaging.destination": "orders"},
),
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
_run_case(signoz, token, start_ms, end_ms, case)