mirror of
https://github.com/SigNoz/signoz.git
synced 2026-05-25 19:30:33 +01:00
Compare commits
33 Commits
emdash/tra
...
traceop-re
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c1812ebf29 | ||
|
|
331fe3bda8 | ||
|
|
8f1113c528 | ||
|
|
b347ce6a28 | ||
|
|
d57281f0bb | ||
|
|
08008ad813 | ||
|
|
1454a96d4d | ||
|
|
4c02ee28de | ||
|
|
e8befce898 | ||
|
|
ec2bcbcbdc | ||
|
|
370db055b3 | ||
|
|
d197212918 | ||
|
|
96b6d8646f | ||
|
|
0aa6165b18 | ||
|
|
dafa81f3b4 | ||
|
|
a992a13f56 | ||
|
|
79b36abbd7 | ||
|
|
181c307d1a | ||
|
|
becdd4d3b4 | ||
|
|
de0311201a | ||
|
|
1804bfe802 | ||
|
|
357444c94e | ||
|
|
a8598f3bfa | ||
|
|
bca71f9a33 | ||
|
|
c93660357d | ||
|
|
5651e3b7a8 | ||
|
|
cf2cfbc7d4 | ||
|
|
a969c38224 | ||
|
|
b892a0f0a5 | ||
|
|
4d47762eba | ||
|
|
77396a0bb3 | ||
|
|
28c05e1bab | ||
|
|
2b9e383994 |
@@ -70,12 +70,31 @@ func (b *traceOperatorCTEBuilder) build(ctx context.Context, requestType qbtypes
|
||||
|
||||
selectFromCTE := rootCTEName
|
||||
if b.operator.ReturnSpansFrom != "" {
|
||||
selectFromCTE = b.queryToCTEName[b.operator.ReturnSpansFrom]
|
||||
if selectFromCTE == "" {
|
||||
sourceQueryCTE := b.queryToCTEName[b.operator.ReturnSpansFrom]
|
||||
if sourceQueryCTE == "" {
|
||||
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput,
|
||||
"returnSpansFrom references query '%s' which has no corresponding CTE",
|
||||
b.operator.ReturnSpansFrom)
|
||||
}
|
||||
filteredCTEName := fmt.Sprintf("__return_from_%s", b.operator.ReturnSpansFrom)
|
||||
|
||||
// rootCTEName holds one row per matching *span*, not per *trace*, so it can
|
||||
// contain many rows for the same trace_id. DISTINCT de-duplicates that set
|
||||
// before ClickHouse builds the hash table for the IN check, keeping memory
|
||||
// usage proportional to the number of distinct traces rather than spans.
|
||||
matchingTracedSB := sqlbuilder.NewSelectBuilder()
|
||||
matchingTracedSB.Select("DISTINCT trace_id")
|
||||
matchingTracedSB.From(rootCTEName)
|
||||
matchedTracesSQL, matchedTracesArgs := matchingTracedSB.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
filteredSB := sqlbuilder.NewSelectBuilder()
|
||||
filteredSB.Select("*")
|
||||
filteredSB.From(sourceQueryCTE)
|
||||
filteredSB.Where(fmt.Sprintf("trace_id IN (%s)", matchedTracesSQL))
|
||||
filteredSQL, filteredArgs := filteredSB.BuildWithFlavor(sqlbuilder.ClickHouse, matchedTracesArgs...)
|
||||
|
||||
b.addCTE(filteredCTEName, filteredSQL, filteredArgs, []string{sourceQueryCTE, rootCTEName})
|
||||
selectFromCTE = filteredCTEName
|
||||
}
|
||||
|
||||
finalStmt, err := b.buildFinalQuery(ctx, selectFromCTE, requestType)
|
||||
|
||||
@@ -385,6 +385,82 @@ func TestTraceOperatorStatementBuilder(t *testing.T) {
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
name: "returnSpansFrom B: A -> B return B spans filtered by operator",
|
||||
requestType: qbtypes.RequestTypeRaw,
|
||||
operator: qbtypes.QueryBuilderTraceOperator{
|
||||
Expression: "A -> B",
|
||||
ReturnSpansFrom: "B",
|
||||
Limit: 10,
|
||||
},
|
||||
compositeQuery: &qbtypes.CompositeQuery{
|
||||
Queries: []qbtypes.QueryEnvelope{
|
||||
{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
|
||||
Name: "A",
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
Filter: &qbtypes.Filter{Expression: "service.name = 'gateway'"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
|
||||
Name: "B",
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
Filter: &qbtypes.Filter{Expression: "service.name = 'database'"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), A_INDIR_DESC_B AS (WITH RECURSIVE up AS (SELECT d.trace_id, d.span_id, d.parent_span_id, 0 AS depth FROM B AS d UNION ALL SELECT p.trace_id, p.span_id, p.parent_span_id, up.depth + 1 FROM all_spans AS p JOIN up ON p.trace_id = up.trace_id AND p.span_id = up.parent_span_id WHERE up.depth < 100) SELECT DISTINCT a.* FROM A AS a GLOBAL INNER JOIN (SELECT DISTINCT trace_id, span_id FROM up WHERE depth > 0 ) AS ancestors ON ancestors.trace_id = a.trace_id AND ancestors.span_id = a.span_id), __return_from_B AS (SELECT * FROM B WHERE trace_id IN (SELECT DISTINCT trace_id FROM A_INDIR_DESC_B)) SELECT timestamp, trace_id, span_id, name, duration_nano, parent_span_id FROM __return_from_B ORDER BY timestamp DESC LIMIT ? SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000",
|
||||
Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "gateway", "%service.name%", "%service.name\":\"gateway%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "database", "%service.name%", "%service.name\":\"database%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
name: "returnSpansFrom C: (A -> B) && C return C spans filtered by operator",
|
||||
requestType: qbtypes.RequestTypeRaw,
|
||||
operator: qbtypes.QueryBuilderTraceOperator{
|
||||
Expression: "(A -> B) && C",
|
||||
ReturnSpansFrom: "C",
|
||||
Limit: 10,
|
||||
},
|
||||
compositeQuery: &qbtypes.CompositeQuery{
|
||||
Queries: []qbtypes.QueryEnvelope{
|
||||
{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
|
||||
Name: "A",
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
Filter: &qbtypes.Filter{Expression: "service.name = 'gateway'"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
|
||||
Name: "B",
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
Filter: &qbtypes.Filter{Expression: "service.name = 'database'"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: qbtypes.QueryTypeBuilder,
|
||||
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
|
||||
Name: "C",
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
Filter: &qbtypes.Filter{Expression: "service.name = 'auth'"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), A_INDIR_DESC_B AS (WITH RECURSIVE up AS (SELECT d.trace_id, d.span_id, d.parent_span_id, 0 AS depth FROM B AS d UNION ALL SELECT p.trace_id, p.span_id, p.parent_span_id, up.depth + 1 FROM all_spans AS p JOIN up ON p.trace_id = up.trace_id AND p.span_id = up.parent_span_id WHERE up.depth < 100) SELECT DISTINCT a.* FROM A AS a GLOBAL INNER JOIN (SELECT DISTINCT trace_id, span_id FROM up WHERE depth > 0 ) AS ancestors ON ancestors.trace_id = a.trace_id AND ancestors.span_id = a.span_id), __resource_filter_C AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), C AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_C) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), A_INDIR_DESC_B_AND_C AS (SELECT l.* FROM A_INDIR_DESC_B AS l INNER JOIN C AS r ON l.trace_id = r.trace_id), __return_from_C AS (SELECT * FROM C WHERE trace_id IN (SELECT DISTINCT trace_id FROM A_INDIR_DESC_B_AND_C)) SELECT timestamp, trace_id, span_id, name, duration_nano, parent_span_id FROM __return_from_C ORDER BY timestamp DESC LIMIT ? SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000",
|
||||
Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "gateway", "%service.name%", "%service.name\":\"gateway%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "database", "%service.name%", "%service.name\":\"database%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "auth", "%service.name%", "%service.name\":\"auth%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
}
|
||||
|
||||
fm := NewFieldMapper()
|
||||
|
||||
3
tests/fixtures/querier.py
vendored
3
tests/fixtures/querier.py
vendored
@@ -72,6 +72,7 @@ class TraceOperatorQuery:
|
||||
return_spans_from: str | None = None
|
||||
limit: int | None = None
|
||||
order: list[OrderBy] | None = None
|
||||
select_fields: list[TelemetryFieldKey] | None = None
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
spec: dict[str, Any] = {
|
||||
@@ -84,6 +85,8 @@ class TraceOperatorQuery:
|
||||
spec["limit"] = self.limit
|
||||
if self.order:
|
||||
spec["order"] = [o.to_dict() if hasattr(o, "to_dict") else o for o in self.order]
|
||||
if self.select_fields:
|
||||
spec["selectFields"] = [f.to_dict() for f in self.select_fields]
|
||||
return {"type": "builder_trace_operator", "spec": spec}
|
||||
|
||||
|
||||
|
||||
457
tests/integration/tests/querier/15_trace_operator.py
Normal file
457
tests/integration/tests/querier/15_trace_operator.py
Normal file
@@ -0,0 +1,457 @@
|
||||
"""
|
||||
Integration tests for TraceOperatorQuery (builder_trace_operator) through the
|
||||
/api/v5/query_range endpoint.
|
||||
|
||||
Covers:
|
||||
1. Order-by variants (A -> B, A => B) with returnSpansFrom="A".
|
||||
Guards against the NOT_FOUND_COLUMN_IN_BLOCK regression where ordering by a
|
||||
column absent from an outer SELECT caused a query failure.
|
||||
2. Expression operators (=>, ->, &&, ||, A NOT B) with and without returnSpansFrom.
|
||||
|
||||
returnSpansFrom semantics
|
||||
--------------------------
|
||||
returnSpansFrom="" (default)
|
||||
The final rows come from the expression's root CTE. Only spans that
|
||||
directly satisfy the structural predicate are returned.
|
||||
|
||||
returnSpansFrom="A"
|
||||
The expression is still evaluated in full (the structural relationship
|
||||
must hold), but the final rows are drawn from the A sub-query CTE,
|
||||
filtered to traces that appeared in the expression result. Concretely:
|
||||
the query returns every A span whose trace_id belongs to a trace that
|
||||
matched the expression.
|
||||
"""
|
||||
|
||||
from collections.abc import Callable
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from http import HTTPStatus
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
|
||||
from fixtures import types
|
||||
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
|
||||
from fixtures.traces import TraceIdGenerator, Traces, TracesKind, TracesStatusCode
|
||||
|
||||
|
||||
def _rows(response: requests.Response) -> list:
|
||||
return response.json()["data"]["data"]["results"][0].get("rows") or []
|
||||
|
||||
|
||||
def _names(response: requests.Response) -> set:
|
||||
return {r["data"]["name"] for r in _rows(response)}
|
||||
|
||||
|
||||
def _run_case(signoz: types.SigNoz, token: str, start_ms: int, end_ms: int, case: dict) -> None:
|
||||
spec: dict = {
|
||||
"name": "C",
|
||||
"expression": case["expression"],
|
||||
"returnSpansFrom": case.get("return_spans_from", ""),
|
||||
"limit": case.get("limit", 100),
|
||||
}
|
||||
if case.get("select_fields"):
|
||||
spec["selectFields"] = case["select_fields"]
|
||||
if case.get("order"):
|
||||
spec["order"] = case["order"]
|
||||
|
||||
response = requests.post(
|
||||
signoz.self.host_configs["8080"].get("/api/v5/query_range"),
|
||||
timeout=5,
|
||||
headers={"authorization": f"Bearer {token}"},
|
||||
json={
|
||||
"schemaVersion": "v1",
|
||||
"start": start_ms,
|
||||
"end": end_ms,
|
||||
"requestType": "raw",
|
||||
"compositeQuery": {
|
||||
"queries": [
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {"name": "A", "signal": "traces", "filter": {"expression": case["filter_a"]}, "limit": 100},
|
||||
},
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {"name": "B", "signal": "traces", "filter": {"expression": case["filter_b"]}, "limit": 100},
|
||||
},
|
||||
{"type": "builder_trace_operator", "spec": spec},
|
||||
]
|
||||
},
|
||||
"formatOptions": {"formatTableResultForUI": False, "fillGaps": False},
|
||||
},
|
||||
)
|
||||
assert response.status_code == HTTPStatus.OK, f"HTTP {response.status_code}: {response.text}"
|
||||
assert case["validate"](response), f"validation failed: {response.json()}"
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Dataset — 4 traces using real OTel semantic-convention attributes
|
||||
#
|
||||
# Filter A = "http.method EXISTS" (HTTP entry-point spans)
|
||||
# Filter B = "db.system = 'redis'" (direct Redis cache calls)
|
||||
# / "db.system = 'postgresql'" (deeper DB queries, for indirect tests)
|
||||
# / "messaging.system = 'kafka'" (async consumer, for OR tests)
|
||||
#
|
||||
# T1 checkout-svc [SERVER] POST /checkout (5s) ← structural root, http.method=POST
|
||||
# ├─ proxy-svc [SERVER] api-proxy (3s) ← http.method=POST; no db children
|
||||
# └─ [CLIENT] lookup-cart (redis)
|
||||
# └─ [CLIENT] check-inventory (postgresql)
|
||||
#
|
||||
# T2 catalog-svc [SERVER] GET /catalog (1s) ← http.method=GET
|
||||
# └─ [CLIENT] fetch-catalog (redis)
|
||||
# └─ [CLIENT] read-cache (postgresql)
|
||||
#
|
||||
# T3 standalone-svc [SERVER] standalone-server ← http.method=POST; no db/cache children
|
||||
# T4 isolated-svc [CONSUMER] isolated-worker ← messaging.system=kafka; no http.method → not in A
|
||||
#
|
||||
# T1 has TWO spans matching filter A (http.method EXISTS); returnSpansFrom changes what is returned:
|
||||
# default → only spans that directly satisfy the structural predicate
|
||||
# return_A → all matching A spans from traces where the predicate held
|
||||
#
|
||||
# Expression truth table:
|
||||
# A -> B (indirect) A=http.method EXISTS B=db.system='postgresql' T1✓ T2✓ T3✗ T4✗
|
||||
# A => B (direct) A=http.method EXISTS B=db.system='redis' T1✓ T2✓ T3✗ T4✗
|
||||
# A && B A=http.method EXISTS B=db.system='redis' T1✓ T2✓ T3✗ T4✗
|
||||
# A || B A=http.method EXISTS B=messaging.system='kafka' T1✓ T2✓ T3✓ T4✓
|
||||
# A NOT B A=http.method EXISTS B=db.system='redis' T1✗ T2✗ T3✓ T4✗
|
||||
#
|
||||
# Order-by cases (all use returnSpansFrom=A, 3 rows expected):
|
||||
# ob.indirect A->B order http.method DESC → POST(checkout+proxy), GET(catalog)
|
||||
# ob.duration A=>B order duration_nano DESC → POST/checkout(5s), api-proxy(3s), GET/catalog(1s)
|
||||
# ob.select A=>B order http.method DESC → POST, POST, GET
|
||||
# ============================================================================
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"case",
|
||||
[
|
||||
# ── Order-by: http.method DESC, NOT in selectFields ──────────────────────
|
||||
# Guards against NOT_FOUND_COLUMN_IN_BLOCK: ordering by a column absent from
|
||||
# the outer SELECT used to cause a ClickHouse query failure.
|
||||
# returnSpansFrom="A" returns all T1 http.method spans: POST /checkout and api-proxy
|
||||
# (both http.method="POST", services checkout-svc and proxy-svc), plus
|
||||
# GET /catalog (http.method="GET") from T2.
|
||||
# The two POST spans are tied so their relative order is undefined; catalog-svc
|
||||
# (GET) is guaranteed to sort last.
|
||||
pytest.param(
|
||||
{
|
||||
"filter_a": "http.method EXISTS",
|
||||
"filter_b": "db.system = 'postgresql'",
|
||||
"expression": "A -> B",
|
||||
"return_spans_from": "A",
|
||||
"select_fields": [{"name": "service.name", "fieldDataType": "string", "fieldContext": "resource"}],
|
||||
"order": [{"key": {"name": "http.method", "fieldDataType": "string", "fieldContext": "attribute"}, "direction": "desc"}],
|
||||
"validate": lambda r: (
|
||||
len(_rows(r)) == 3
|
||||
and {_rows(r)[0]["data"]["service.name"], _rows(r)[1]["data"]["service.name"]} == {"checkout-svc", "proxy-svc"}
|
||||
and _rows(r)[2]["data"]["service.name"] == "catalog-svc"
|
||||
),
|
||||
},
|
||||
id="ob.indirect.http_method_not_in_select",
|
||||
),
|
||||
# ── Order-by: duration_nano DESC, core span field ─────────────────────────
|
||||
# returnSpansFrom="A" includes api-proxy (3 s) in T1's result.
|
||||
# Order: POST /checkout (5 s) > api-proxy (3 s) > GET /catalog (1 s).
|
||||
pytest.param(
|
||||
{
|
||||
"filter_a": "http.method EXISTS",
|
||||
"filter_b": "db.system = 'redis'",
|
||||
"expression": "A => B",
|
||||
"return_spans_from": "A",
|
||||
"order": [{"key": {"name": "duration_nano", "fieldContext": "span"}, "direction": "desc"}],
|
||||
"validate": lambda r: (
|
||||
len(_rows(r)) == 3
|
||||
and _rows(r)[0]["data"]["name"] == "POST /checkout"
|
||||
and _rows(r)[1]["data"]["name"] == "api-proxy"
|
||||
and _rows(r)[2]["data"]["name"] == "GET /catalog"
|
||||
),
|
||||
},
|
||||
id="ob.duration.duration_nano_desc",
|
||||
),
|
||||
# ── Order-by: http.method DESC, IS in selectFields ────────────────────────
|
||||
# http.method is selected so it appears in each result row.
|
||||
# Both POST /checkout and api-proxy carry http.method="POST"; their relative
|
||||
# order is undefined. GET /catalog ("GET") always sorts last.
|
||||
pytest.param(
|
||||
{
|
||||
"filter_a": "http.method EXISTS",
|
||||
"filter_b": "db.system = 'redis'",
|
||||
"expression": "A => B",
|
||||
"return_spans_from": "A",
|
||||
"select_fields": [{"name": "http.method", "fieldDataType": "string", "fieldContext": "attribute"}],
|
||||
"order": [{"key": {"name": "http.method", "fieldDataType": "string", "fieldContext": "attribute"}, "direction": "desc"}],
|
||||
"validate": lambda r: (
|
||||
len(_rows(r)) == 3
|
||||
and _rows(r)[0]["data"]["http.method"] == "POST"
|
||||
and _rows(r)[1]["data"]["http.method"] == "POST"
|
||||
and _rows(r)[2]["data"]["http.method"] == "GET"
|
||||
),
|
||||
},
|
||||
id="ob.select.http_method_in_select",
|
||||
),
|
||||
# ── A => B (direct child), returnSpansFrom="" ─────────────────────────────
|
||||
# POST /checkout directly parents lookup-cart (redis); api-proxy has no redis child.
|
||||
# T3 does not match (no redis descendant). Default returns only the satisfying A spans.
|
||||
pytest.param(
|
||||
{
|
||||
"filter_a": "http.method EXISTS",
|
||||
"filter_b": "db.system = 'redis'",
|
||||
"expression": "A => B",
|
||||
"return_spans_from": "",
|
||||
"validate": lambda r: _names(r) == {"POST /checkout", "GET /catalog"},
|
||||
},
|
||||
id="ex.direct_child.default",
|
||||
),
|
||||
# ── A => B (direct child), returnSpansFrom="A" ────────────────────────────
|
||||
# T1 matches; return_A pulls all T1 http.method spans → api-proxy is included too.
|
||||
pytest.param(
|
||||
{
|
||||
"filter_a": "http.method EXISTS",
|
||||
"filter_b": "db.system = 'redis'",
|
||||
"expression": "A => B",
|
||||
"return_spans_from": "A",
|
||||
"validate": lambda r: _names(r) == {"POST /checkout", "GET /catalog", "api-proxy"},
|
||||
},
|
||||
id="ex.direct_child.return_A",
|
||||
),
|
||||
# ── A -> B (indirect descendant), returnSpansFrom="" ──────────────────────
|
||||
# POST /checkout is an ancestor of check-inventory (postgresql) via lookup-cart.
|
||||
# api-proxy has no postgresql descendants. T3 has no postgresql descendant.
|
||||
pytest.param(
|
||||
{
|
||||
"filter_a": "http.method EXISTS",
|
||||
"filter_b": "db.system = 'postgresql'",
|
||||
"expression": "A -> B",
|
||||
"return_spans_from": "",
|
||||
"validate": lambda r: _names(r) == {"POST /checkout", "GET /catalog"},
|
||||
},
|
||||
id="ex.indirect_descendant.default",
|
||||
),
|
||||
# ── A -> B (indirect descendant), returnSpansFrom="A" ────────────────────
|
||||
# T1 matches; return_A pulls all T1 http.method spans → api-proxy is included too.
|
||||
pytest.param(
|
||||
{
|
||||
"filter_a": "http.method EXISTS",
|
||||
"filter_b": "db.system = 'postgresql'",
|
||||
"expression": "A -> B",
|
||||
"return_spans_from": "A",
|
||||
"validate": lambda r: _names(r) == {"POST /checkout", "GET /catalog", "api-proxy"},
|
||||
},
|
||||
id="ex.indirect_descendant.return_A",
|
||||
),
|
||||
# ── A && B (both present in same trace), returnSpansFrom="" ───────────────
|
||||
# T1 and T2 match (each trace has http.method spans AND redis spans); T3 does not.
|
||||
# A && B returns all A spans from matching traces — api-proxy is included
|
||||
# because it shares T1's trace_id with POST /checkout.
|
||||
# (return_A produces the same set by definition; no separate case needed.)
|
||||
pytest.param(
|
||||
{
|
||||
"filter_a": "http.method EXISTS",
|
||||
"filter_b": "db.system = 'redis'",
|
||||
"expression": "A && B",
|
||||
"return_spans_from": "",
|
||||
"validate": lambda r: _names(r) == {"POST /checkout", "GET /catalog", "api-proxy"},
|
||||
},
|
||||
id="ex.and.default",
|
||||
),
|
||||
# ── A || B (either present), returnSpansFrom="" ───────────────────────────
|
||||
# T1, T2, T3 match via A (http.method spans); T4 matches via B (kafka span).
|
||||
# Default returns UNION of all A and B spans from matching traces.
|
||||
pytest.param(
|
||||
{
|
||||
"filter_a": "http.method EXISTS",
|
||||
"filter_b": "messaging.system = 'kafka'",
|
||||
"expression": "A || B",
|
||||
"return_spans_from": "",
|
||||
"validate": lambda r: _names(r) == {"POST /checkout", "GET /catalog", "api-proxy", "standalone-server", "isolated-worker"},
|
||||
},
|
||||
id="ex.or.default",
|
||||
),
|
||||
# ── A || B, returnSpansFrom="A" ───────────────────────────────────────────
|
||||
# All four traces match; only A spans are returned.
|
||||
# T4 has no http.method span, so it contributes nothing to A.
|
||||
pytest.param(
|
||||
{
|
||||
"filter_a": "http.method EXISTS",
|
||||
"filter_b": "messaging.system = 'kafka'",
|
||||
"expression": "A || B",
|
||||
"return_spans_from": "A",
|
||||
"validate": lambda r: _names(r) == {"POST /checkout", "GET /catalog", "api-proxy", "standalone-server"},
|
||||
},
|
||||
id="ex.or.return_A",
|
||||
),
|
||||
# ── A NOT B (A present, B absent from trace), returnSpansFrom="" ─────────
|
||||
# T1 and T2 do NOT match: their traces contain redis spans.
|
||||
# T3 MATCHES: has http.method span but no redis span in its trace.
|
||||
# T4 has no http.method span, so it cannot contribute an A span.
|
||||
# (return_A produces the same set; no separate case needed.)
|
||||
pytest.param(
|
||||
{
|
||||
"filter_a": "http.method EXISTS",
|
||||
"filter_b": "db.system = 'redis'",
|
||||
"expression": "A NOT B",
|
||||
"return_spans_from": "",
|
||||
"validate": lambda r: _names(r) == {"standalone-server"},
|
||||
},
|
||||
id="ex.not.default",
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_trace_operator(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_traces: Callable[[list[Traces]], None],
|
||||
case: dict,
|
||||
) -> None:
|
||||
t1_trace_id = TraceIdGenerator.trace_id()
|
||||
t1_checkout_span_id = TraceIdGenerator.span_id() # POST /checkout — structural root of T1
|
||||
t1_child_span_id = TraceIdGenerator.span_id() # lookup-cart
|
||||
|
||||
t2_trace_id = TraceIdGenerator.trace_id()
|
||||
t2_root_span_id = TraceIdGenerator.span_id()
|
||||
t2_child_span_id = TraceIdGenerator.span_id()
|
||||
|
||||
now = datetime.now(tz=UTC).replace(second=0, microsecond=0)
|
||||
|
||||
insert_traces(
|
||||
[
|
||||
# T1 — two http.method spans in the same trace, modelling a real proxy+service pair.
|
||||
# POST /checkout (checkout-svc) is the single structural root (parent_span_id="").
|
||||
# api-proxy (proxy-svc) is a structural child of POST /checkout but also has
|
||||
# http.method set, so it matches filter A alongside POST /checkout.
|
||||
# Both carry http.method="POST" — they differ only in service.name.
|
||||
# This is what makes returnSpansFrom="" and returnSpansFrom="A" distinct:
|
||||
# default → only POST /checkout satisfies A => B or A -> B
|
||||
# return_A → api-proxy is pulled in too (all A spans from the matching trace)
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=10),
|
||||
duration=timedelta(seconds=5),
|
||||
trace_id=t1_trace_id,
|
||||
span_id=t1_checkout_span_id,
|
||||
parent_span_id="",
|
||||
name="POST /checkout",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={"service.name": "checkout-svc"},
|
||||
attributes={"http.method": "POST", "http.route": "/checkout"},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=10),
|
||||
duration=timedelta(seconds=3),
|
||||
trace_id=t1_trace_id,
|
||||
span_id=TraceIdGenerator.span_id(),
|
||||
parent_span_id=t1_checkout_span_id,
|
||||
name="api-proxy",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={"service.name": "proxy-svc"},
|
||||
attributes={"http.method": "POST", "http.route": "/proxy"},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=9),
|
||||
duration=timedelta(seconds=2),
|
||||
trace_id=t1_trace_id,
|
||||
span_id=t1_child_span_id,
|
||||
parent_span_id=t1_checkout_span_id,
|
||||
name="lookup-cart",
|
||||
kind=TracesKind.SPAN_KIND_CLIENT,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={"service.name": "checkout-svc"},
|
||||
attributes={"db.system": "redis", "db.operation": "GET"},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=8),
|
||||
duration=timedelta(seconds=1),
|
||||
trace_id=t1_trace_id,
|
||||
span_id=TraceIdGenerator.span_id(),
|
||||
parent_span_id=t1_child_span_id,
|
||||
name="check-inventory",
|
||||
kind=TracesKind.SPAN_KIND_CLIENT,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={"service.name": "checkout-svc"},
|
||||
attributes={"db.system": "postgresql", "db.operation": "SELECT"},
|
||||
),
|
||||
# T2 — catalog-svc: GET /catalog (1 s root) → fetch-catalog (redis) → read-cache (postgresql)
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=10),
|
||||
duration=timedelta(seconds=1),
|
||||
trace_id=t2_trace_id,
|
||||
span_id=t2_root_span_id,
|
||||
parent_span_id="",
|
||||
name="GET /catalog",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={"service.name": "catalog-svc"},
|
||||
attributes={"http.method": "GET", "http.route": "/catalog"},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=9),
|
||||
duration=timedelta(seconds=2),
|
||||
trace_id=t2_trace_id,
|
||||
span_id=t2_child_span_id,
|
||||
parent_span_id=t2_root_span_id,
|
||||
name="fetch-catalog",
|
||||
kind=TracesKind.SPAN_KIND_CLIENT,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={"service.name": "catalog-svc"},
|
||||
attributes={"db.system": "redis", "db.operation": "GET"},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=8),
|
||||
duration=timedelta(seconds=1),
|
||||
trace_id=t2_trace_id,
|
||||
span_id=TraceIdGenerator.span_id(),
|
||||
parent_span_id=t2_child_span_id,
|
||||
name="read-cache",
|
||||
kind=TracesKind.SPAN_KIND_CLIENT,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={"service.name": "catalog-svc"},
|
||||
attributes={"db.system": "postgresql", "db.operation": "SELECT"},
|
||||
),
|
||||
# T3 — standalone-svc: HTTP entry span with no downstream calls.
|
||||
# Fails A => B / A -> B / A && B (no redis/postgresql descendant).
|
||||
# Matches A NOT B (has http.method span, no redis child).
|
||||
# Contributes to A || B via A.
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=10),
|
||||
duration=timedelta(seconds=3),
|
||||
trace_id=TraceIdGenerator.trace_id(),
|
||||
span_id=TraceIdGenerator.span_id(),
|
||||
parent_span_id="",
|
||||
name="standalone-server",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={"service.name": "standalone-svc"},
|
||||
attributes={"http.method": "POST", "http.route": "/"},
|
||||
),
|
||||
# T4 — isolated-svc: Kafka consumer; no http.method so it never matches filter A.
|
||||
# Used only as the B side of A || B to prove the OR operator matches via B.
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=10),
|
||||
duration=timedelta(seconds=1),
|
||||
trace_id=TraceIdGenerator.trace_id(),
|
||||
span_id=TraceIdGenerator.span_id(),
|
||||
parent_span_id="",
|
||||
name="isolated-worker",
|
||||
kind=TracesKind.SPAN_KIND_CONSUMER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={"service.name": "isolated-svc"},
|
||||
attributes={"messaging.system": "kafka", "messaging.destination": "orders"},
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
|
||||
end_ms = int(now.timestamp() * 1000)
|
||||
|
||||
_run_case(signoz, token, start_ms, end_ms, case)
|
||||
Reference in New Issue
Block a user