Compare commits

..

28 Commits

Author SHA1 Message Date
Piyush Singariya
08008ad813 Merge branch 'main' into traceop-returnspansfrom 2026-05-20 15:52:13 +05:30
Piyush Singariya
1454a96d4d fix: replace JOIN with IN 2026-05-20 15:51:15 +05:30
Piyush Singariya
4c02ee28de fix: tests rewritten 2026-05-20 10:41:55 +05:30
Piyush Singariya
e8befce898 Merge branch 'main' into traceop-returnspansfrom 2026-05-19 13:44:37 +05:30
Piyush Singariya
ec2bcbcbdc fix: integration tests 2026-05-19 13:44:06 +05:30
Piyush Singariya
370db055b3 chore: fmt py 2026-05-19 11:23:19 +05:30
Piyush Singariya
d197212918 chore: fmt py 2026-05-19 11:18:06 +05:30
Piyush Singariya
96b6d8646f chore: tests updated 2026-05-19 11:16:59 +05:30
Piyush Singariya
0aa6165b18 Merge branch 'main' into traceop-returnspansfrom 2026-05-19 11:12:59 +05:30
Piyush Singariya
dafa81f3b4 Merge branch 'main' into traceop-returnspansfrom 2026-05-12 21:03:16 +05:30
Piyush Singariya
a992a13f56 revert: unused test 2026-05-12 20:58:17 +05:30
Piyush Singariya
79b36abbd7 chore: comments and test 2026-05-12 20:57:00 +05:30
Piyush Singariya
181c307d1a Merge branch 'main' into traceop-returnspansfrom 2026-05-12 18:14:09 +05:30
Piyush Singariya
becdd4d3b4 revert: build list query 2026-05-12 18:11:35 +05:30
Piyush Singariya
de0311201a revert: double select 2026-05-12 17:15:41 +05:30
Piyush Singariya
1804bfe802 fix: return spans from 2026-05-12 16:53:31 +05:30
Piyush Singariya
357444c94e Merge branch 'main' into traceop 2026-05-11 20:53:51 +05:30
Piyush Singariya
a8598f3bfa fix: alias all core columns 2026-05-11 20:53:09 +05:30
Piyush Singariya
bca71f9a33 chore: remove comments 2026-05-11 16:04:32 +05:30
Piyush Singariya
c93660357d chore: fmt python 2026-05-11 16:02:18 +05:30
Piyush Singariya
5651e3b7a8 Merge branch 'main' into traceop 2026-05-11 14:28:58 +05:30
Piyush Singariya
cf2cfbc7d4 fix: remove specific of timestamp 2026-05-11 14:27:01 +05:30
Piyush Singariya
a969c38224 chore: fmtlint 2026-05-07 13:53:12 +05:30
Piyush Singariya
b892a0f0a5 chore: file rename 2026-05-07 13:51:22 +05:30
Piyush Singariya
4d47762eba chore: separate e2e test file 2026-05-07 13:50:11 +05:30
Piyush Singariya
77396a0bb3 Merge branch 'main' into traceop 2026-05-07 12:56:59 +05:30
Piyush Singariya
28c05e1bab Merge branch 'main' into traceop 2026-05-04 14:27:19 +05:30
Piyush Singariya
2b9e383994 fix: trace raw export e2e 2026-04-30 15:25:43 +05:30
11 changed files with 408 additions and 167 deletions

View File

@@ -45,15 +45,9 @@ jobs:
(github.event_name == 'pull_request_target' && contains(github.event.pull_request.labels.*.name, 'safe-to-test'))) && contains(github.event.pull_request.labels.*.name, 'safe-to-e2e')
runs-on: ubuntu-latest
timeout-minutes: 30
env:
SIGNOZ_BUILDX_GHA_SCOPE: signoz-e2e
steps:
- name: checkout
uses: actions/checkout@v4
- name: setup-buildx
uses: docker/setup-buildx-action@v3
- name: expose-gha-runtime
uses: crazy-max/ghaction-github-runtime@v3
- name: python
uses: actions/setup-python@v5
with:

View File

@@ -69,15 +69,9 @@ jobs:
((github.event_name == 'pull_request' && ! github.event.pull_request.head.repo.fork && github.event.pull_request.user.login != 'dependabot[bot]' && ! contains(github.event.pull_request.labels.*.name, 'safe-to-test')) ||
(github.event_name == 'pull_request_target' && contains(github.event.pull_request.labels.*.name, 'safe-to-test'))) && contains(github.event.pull_request.labels.*.name, 'safe-to-integrate')
runs-on: ubuntu-latest
env:
SIGNOZ_BUILDX_GHA_SCOPE: signoz-integration
steps:
- name: checkout
uses: actions/checkout@v4
- name: setup-buildx
uses: docker/setup-buildx-action@v3
- name: expose-gha-runtime
uses: crazy-max/ghaction-github-runtime@v3
- name: python
uses: actions/setup-python@v5
with:

View File

@@ -34,7 +34,6 @@ DOCKER_BUILD_ARCHS_ENTERPRISE = $(addprefix docker-build-enterprise-,$(ARCHS))
DOCKERFILE_ENTERPRISE = $(SRC)/cmd/enterprise/Dockerfile
DOCKER_REGISTRY_ENTERPRISE ?= docker.io/signoz/signoz
JS_BUILD_CONTEXT = $(SRC)/frontend
DOCKER_BUILDX_PRUNE_FLAGS ?= --force
##############################################################
# directories
@@ -230,21 +229,6 @@ py-clean: ## Clear all pycache and pytest cache from tests directory recursively
@find tests -type f -name "*.pyo" -delete 2>/dev/null || true
@echo ">> python cache cleaned"
.PHONY: py-docker-clean
py-docker-clean: ## Remove Docker image and build caches used by python integration tests
@echo ">> removing SigNoz integration test image"
@docker image rm -f signoz:integration 2>/dev/null || true
@echo ">> removing local integration buildx cache directories"
@rm -rf /tmp/signoz-integration-buildx-cache /tmp/signoz-integration-buildx-cache-next /tmp/signoz-e2e-buildx-cache /tmp/signoz-e2e-buildx-cache-next
@echo ">> pruning docker buildx cache with flags: $(DOCKER_BUILDX_PRUNE_FLAGS)"
@docker buildx prune $(DOCKER_BUILDX_PRUNE_FLAGS)
.PHONY: py-test-clean
py-test-clean: ## Tear down python test stack and remove python/Docker test caches
@$(MAKE) py-test-teardown || true
@$(MAKE) py-clean
@$(MAKE) py-docker-clean
##############################################################
# generate commands

View File

@@ -1,5 +1,3 @@
# syntax=docker/dockerfile:1.13
FROM golang:1.25-bookworm
ARG OS="linux"
@@ -23,8 +21,7 @@ RUN set -eux; \
COPY go.mod go.sum ./
RUN --mount=type=cache,target=/go/pkg/mod \
go mod download
RUN go mod download
COPY ./cmd/ ./cmd/
COPY ./ee/ ./ee/
@@ -32,9 +29,7 @@ COPY ./pkg/ ./pkg/
COPY ./templates/email /root/templates
COPY Makefile Makefile
RUN --mount=type=cache,target=/go/pkg/mod \
--mount=type=cache,target=/root/.cache/go-build \
TARGET_DIR=/root ARCHS=${TARGETARCH} ZEUS_URL=${ZEUSURL} LICENSE_URL=${ZEUSURL}/api/v1 make go-build-enterprise-race
RUN TARGET_DIR=/root ARCHS=${TARGETARCH} ZEUS_URL=${ZEUSURL} LICENSE_URL=${ZEUSURL}/api/v1 make go-build-enterprise-race
RUN mv /root/linux-${TARGETARCH}/signoz /root/signoz
RUN chmod 755 /root /root/signoz

View File

@@ -1,5 +1,3 @@
# syntax=docker/dockerfile:1.13
FROM node:22-bookworm AS build
WORKDIR /opt/
@@ -32,8 +30,7 @@ RUN set -eux; \
COPY go.mod go.sum ./
RUN --mount=type=cache,target=/go/pkg/mod \
go mod download
RUN go mod download
COPY ./cmd/ ./cmd/
COPY ./ee/ ./ee/
@@ -41,9 +38,7 @@ COPY ./pkg/ ./pkg/
COPY ./templates/email /root/templates
COPY Makefile Makefile
RUN --mount=type=cache,target=/go/pkg/mod \
--mount=type=cache,target=/root/.cache/go-build \
TARGET_DIR=/root ARCHS=${TARGETARCH} ZEUS_URL=${ZEUSURL} LICENSE_URL=${ZEUSURL}/api/v1 make go-build-enterprise-race
RUN TARGET_DIR=/root ARCHS=${TARGETARCH} ZEUS_URL=${ZEUSURL} LICENSE_URL=${ZEUSURL}/api/v1 make go-build-enterprise-race
RUN mv /root/linux-${TARGETARCH}/signoz /root/signoz
COPY --from=build /opt/build ./web/

View File

@@ -70,12 +70,31 @@ func (b *traceOperatorCTEBuilder) build(ctx context.Context, requestType qbtypes
selectFromCTE := rootCTEName
if b.operator.ReturnSpansFrom != "" {
selectFromCTE = b.queryToCTEName[b.operator.ReturnSpansFrom]
if selectFromCTE == "" {
sourceQueryCTE := b.queryToCTEName[b.operator.ReturnSpansFrom]
if sourceQueryCTE == "" {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput,
"returnSpansFrom references query '%s' which has no corresponding CTE",
b.operator.ReturnSpansFrom)
}
filteredCTEName := fmt.Sprintf("__return_from_%s", b.operator.ReturnSpansFrom)
// rootCTEName holds one row per matching *span*, not per *trace*, so it can
// contain many rows for the same trace_id. DISTINCT de-duplicates that set
// before ClickHouse builds the hash table for the IN check, keeping memory
// usage proportional to the number of distinct traces rather than spans.
matchingTracedSB := sqlbuilder.NewSelectBuilder()
matchingTracedSB.Select("DISTINCT trace_id")
matchingTracedSB.From(rootCTEName)
matchedTracesSQL, matchedTracesArgs := matchingTracedSB.BuildWithFlavor(sqlbuilder.ClickHouse)
filteredSB := sqlbuilder.NewSelectBuilder()
filteredSB.Select("*")
filteredSB.From(sourceQueryCTE)
filteredSB.Where(fmt.Sprintf("trace_id IN (%s)", matchedTracesSQL))
filteredSQL, filteredArgs := filteredSB.BuildWithFlavor(sqlbuilder.ClickHouse, matchedTracesArgs...)
b.addCTE(filteredCTEName, filteredSQL, filteredArgs, []string{sourceQueryCTE, rootCTEName})
selectFromCTE = filteredCTEName
}
finalStmt, err := b.buildFinalQuery(ctx, selectFromCTE, requestType)

View File

@@ -385,6 +385,82 @@ func TestTraceOperatorStatementBuilder(t *testing.T) {
},
expectedErr: nil,
},
{
name: "returnSpansFrom B: A -> B return B spans filtered by operator",
requestType: qbtypes.RequestTypeRaw,
operator: qbtypes.QueryBuilderTraceOperator{
Expression: "A -> B",
ReturnSpansFrom: "B",
Limit: 10,
},
compositeQuery: &qbtypes.CompositeQuery{
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Name: "A",
Signal: telemetrytypes.SignalTraces,
Filter: &qbtypes.Filter{Expression: "service.name = 'gateway'"},
},
},
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Name: "B",
Signal: telemetrytypes.SignalTraces,
Filter: &qbtypes.Filter{Expression: "service.name = 'database'"},
},
},
},
},
expected: qbtypes.Statement{
Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), A_INDIR_DESC_B AS (WITH RECURSIVE up AS (SELECT d.trace_id, d.span_id, d.parent_span_id, 0 AS depth FROM B AS d UNION ALL SELECT p.trace_id, p.span_id, p.parent_span_id, up.depth + 1 FROM all_spans AS p JOIN up ON p.trace_id = up.trace_id AND p.span_id = up.parent_span_id WHERE up.depth < 100) SELECT DISTINCT a.* FROM A AS a GLOBAL INNER JOIN (SELECT DISTINCT trace_id, span_id FROM up WHERE depth > 0 ) AS ancestors ON ancestors.trace_id = a.trace_id AND ancestors.span_id = a.span_id), __return_from_B AS (SELECT * FROM B WHERE trace_id IN (SELECT DISTINCT trace_id FROM A_INDIR_DESC_B)) SELECT timestamp, trace_id, span_id, name, duration_nano, parent_span_id FROM __return_from_B ORDER BY timestamp DESC LIMIT ? SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000",
Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "gateway", "%service.name%", "%service.name\":\"gateway%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "database", "%service.name%", "%service.name\":\"database%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
},
expectedErr: nil,
},
{
name: "returnSpansFrom C: (A -> B) && C return C spans filtered by operator",
requestType: qbtypes.RequestTypeRaw,
operator: qbtypes.QueryBuilderTraceOperator{
Expression: "(A -> B) && C",
ReturnSpansFrom: "C",
Limit: 10,
},
compositeQuery: &qbtypes.CompositeQuery{
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Name: "A",
Signal: telemetrytypes.SignalTraces,
Filter: &qbtypes.Filter{Expression: "service.name = 'gateway'"},
},
},
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Name: "B",
Signal: telemetrytypes.SignalTraces,
Filter: &qbtypes.Filter{Expression: "service.name = 'database'"},
},
},
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Name: "C",
Signal: telemetrytypes.SignalTraces,
Filter: &qbtypes.Filter{Expression: "service.name = 'auth'"},
},
},
},
},
expected: qbtypes.Statement{
Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), A_INDIR_DESC_B AS (WITH RECURSIVE up AS (SELECT d.trace_id, d.span_id, d.parent_span_id, 0 AS depth FROM B AS d UNION ALL SELECT p.trace_id, p.span_id, p.parent_span_id, up.depth + 1 FROM all_spans AS p JOIN up ON p.trace_id = up.trace_id AND p.span_id = up.parent_span_id WHERE up.depth < 100) SELECT DISTINCT a.* FROM A AS a GLOBAL INNER JOIN (SELECT DISTINCT trace_id, span_id FROM up WHERE depth > 0 ) AS ancestors ON ancestors.trace_id = a.trace_id AND ancestors.span_id = a.span_id), __resource_filter_C AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), C AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_C) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), A_INDIR_DESC_B_AND_C AS (SELECT l.* FROM A_INDIR_DESC_B AS l INNER JOIN C AS r ON l.trace_id = r.trace_id), __return_from_C AS (SELECT * FROM C WHERE trace_id IN (SELECT DISTINCT trace_id FROM A_INDIR_DESC_B_AND_C)) SELECT timestamp, trace_id, span_id, name, duration_nano, parent_span_id FROM __return_from_C ORDER BY timestamp DESC LIMIT ? SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000",
Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "gateway", "%service.name%", "%service.name\":\"gateway%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "database", "%service.name%", "%service.name\":\"database%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "auth", "%service.name%", "%service.name\":\"auth%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
},
expectedErr: nil,
},
}
fm := NewFieldMapper()

View File

@@ -17,8 +17,6 @@ from fixtures.logger import setup_logger
logger = setup_logger(__name__)
ZEUS_NETWORK_ALIAS = "signoz-zeus"
@pytest.fixture(name="zeus", scope="package")
def zeus(
@@ -33,7 +31,6 @@ def zeus(
def create() -> types.TestContainerDocker:
container = WireMockContainer(image="wiremock/wiremock:2.35.1-1", secure=False)
container.with_network(network)
container.with_network_aliases(ZEUS_NETWORK_ALIAS)
container.start()
return types.TestContainerDocker(
@@ -45,7 +42,7 @@ def zeus(
container.get_exposed_port(8080),
)
},
container_configs={"8080": types.TestContainerUrlConfig("http", ZEUS_NETWORK_ALIAS, 8080)},
container_configs={"8080": types.TestContainerUrlConfig("http", container.get_wrapped_container().name, 8080)},
)
def delete(container: types.TestContainerDocker):

View File

@@ -72,6 +72,7 @@ class TraceOperatorQuery:
return_spans_from: str | None = None
limit: int | None = None
order: list[OrderBy] | None = None
select_fields: list[TelemetryFieldKey] | None = None
def to_dict(self) -> dict:
spec: dict[str, Any] = {
@@ -84,6 +85,8 @@ class TraceOperatorQuery:
spec["limit"] = self.limit
if self.order:
spec["order"] = [o.to_dict() if hasattr(o, "to_dict") else o for o in self.order]
if self.select_fields:
spec["selectFields"] = [f.to_dict() for f in self.select_fields]
return {"type": "builder_trace_operator", "spec": spec}

View File

@@ -1,19 +1,14 @@
import os
import platform
import shutil
import subprocess
import threading
import time
from dataclasses import dataclass
from http import HTTPStatus
from os import path
from pathlib import Path
import docker
import docker.errors
import pytest
import requests
from testcontainers.core.container import DockerContainer, Network
from testcontainers.core.image import DockerImage
from fixtures import reuse, types
from fixtures.logger import setup_logger
@@ -21,110 +16,6 @@ from fixtures.logger import setup_logger
logger = setup_logger(__name__)
@dataclass
class SigNozImageBuild:
process: subprocess.Popen
command: list[str]
cache_path: Path | None = None
next_cache_path: Path | None = None
reader: threading.Thread | None = None
def _stream_build_output(pipe, log) -> None:
try:
for line in iter(pipe.readline, ""):
log.info("buildx: %s", line.rstrip())
finally:
pipe.close()
def start_signoz_image_build(pytestconfig: pytest.Config, dockerfile_path: str, arch: str, zeus_url: str) -> SigNozImageBuild:
root = pytestconfig.rootpath.parent
command = [
"docker",
"buildx",
"build",
"--load",
"--progress",
"plain",
"--tag",
"signoz:integration",
"--file",
dockerfile_path,
"--build-arg",
f"TARGETARCH={arch}",
"--build-arg",
f"ZEUSURL={zeus_url}",
str(root),
]
cache_path = None
next_cache_path = None
if os.environ.get("ACTIONS_RUNTIME_TOKEN"):
# Running in GitHub Actions — use BuildKit's native GHA cache backend.
# Avoids the local-write races and partial exports seen with type=local.
scope = os.environ.get("SIGNOZ_BUILDX_GHA_SCOPE", "signoz-integration")
command.extend(["--cache-from", f"type=gha,scope={scope}"])
command.extend(["--cache-to", f"type=gha,scope={scope},mode=max"])
elif build_cache_dir := os.environ.get("SIGNOZ_INTEGRATION_BUILD_CACHE_DIR"):
# Local cache for developer machines / non-GHA CI.
cache_path = Path(build_cache_dir)
next_cache_path = Path(f"{build_cache_dir}-next")
cache_path.parent.mkdir(parents=True, exist_ok=True)
shutil.rmtree(next_cache_path, ignore_errors=True)
if cache_path.exists():
command.extend(["--cache-from", f"type=local,src={cache_path}"])
command.extend(["--cache-to", f"type=local,dest={next_cache_path},mode=max"])
logger.info("Building SigNoz integration image with %s", " ".join(command))
process = subprocess.Popen(
command,
cwd=root,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
)
reader = threading.Thread(target=_stream_build_output, args=(process.stdout, logger), daemon=True)
reader.start()
return SigNozImageBuild(
process=process,
command=command,
cache_path=cache_path,
next_cache_path=next_cache_path,
reader=reader,
)
def wait_for_signoz_image_build(build: SigNozImageBuild) -> None:
returncode = build.process.wait()
if build.reader is not None:
build.reader.join(timeout=5)
if returncode != 0:
raise subprocess.CalledProcessError(returncode, build.command)
if build.cache_path and build.next_cache_path and build.next_cache_path.exists():
shutil.rmtree(build.cache_path, ignore_errors=True)
shutil.move(build.next_cache_path, build.cache_path)
def stop_signoz_image_build(build: SigNozImageBuild) -> None:
if build.process.poll() is not None:
return
build.process.terminate()
try:
build.process.wait(timeout=10)
except subprocess.TimeoutExpired:
build.process.kill()
build.process.wait()
if build.reader is not None:
build.reader.join(timeout=5)
def create_signoz(
network: Network,
zeus: types.TestContainerDocker,
@@ -142,6 +33,9 @@ def create_signoz(
"""
def create() -> types.SigNoz:
# Run the migrations for clickhouse
request.getfixturevalue("migrator")
# Get the no-web flag
with_web = pytestconfig.getoption("--with-web")
@@ -154,15 +48,19 @@ def create_signoz(
if with_web:
dockerfile_path = "cmd/enterprise/Dockerfile.with-web.integration"
# The SigNoz image build does not depend on ClickHouse migrations, so
# build it while the migrator container runs.
image_build = start_signoz_image_build(pytestconfig, dockerfile_path, arch, zeus.container_configs["8080"].base())
try:
request.getfixturevalue("migrator")
wait_for_signoz_image_build(image_build)
except Exception: # pylint: disable=broad-exception-caught
stop_signoz_image_build(image_build)
raise
# Docker build context is the repo root — one up from pytest's
# rootdir (tests/).
self = DockerImage(
path=str(pytestconfig.rootpath.parent),
dockerfile_path=dockerfile_path,
tag="signoz:integration",
buildargs={
"TARGETARCH": arch,
"ZEUSURL": zeus.container_configs["8080"].base(),
},
)
self.build()
env = (
{

View File

@@ -0,0 +1,286 @@
"""
Integration tests for TraceOperatorQuery (builder_trace_operator) through the
/api/v5/query_range endpoint.
Covers:
1. Order-by variants for trace operator (A -> B, A => B) with returnSpansFrom="A".
Guards against the NOT_FOUND_COLUMN_IN_BLOCK regression where ordering by a
column absent from an outer SELECT caused a query failure.
2. Expression operators (=>, ->, &&, ||, A NOT B) with and without returnSpansFrom.
returnSpansFrom semantics
--------------------------
returnSpansFrom="" (default)
The final rows come from the expression's root CTE. Only spans that
directly satisfy the structural predicate are returned.
returnSpansFrom="A"
The expression is still evaluated in full (the structural relationship
must hold), but the final rows are drawn from the A sub-query CTE,
filtered to traces that appeared in the expression result. Concretely:
the query returns every A span whose trace_id belongs to a trace that
matched the expression.
"""
from collections.abc import Callable
from datetime import UTC, datetime, timedelta
from http import HTTPStatus
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.querier import OrderBy, TelemetryFieldKey, TraceOperatorQuery, make_query_request
from fixtures.traces import TraceIdGenerator, Traces, TracesKind, TracesStatusCode
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _chain_trace(now: datetime, *spans: tuple) -> list[Traces]:
"""
Build a single trace as a linear chain.
Each span tuple is (name, service, op_type, duration_s[, extra_attrs]).
The first span is the root; each subsequent span is a child of the previous.
"""
trace_id = TraceIdGenerator.trace_id()
ids = [TraceIdGenerator.span_id() for _ in spans]
result = []
for i, s in enumerate(spans):
name, service, op_type, duration_s = s[0], s[1], s[2], s[3]
extra = s[4] if len(s) > 4 else {}
result.append(
Traces(
timestamp=now - timedelta(seconds=10 - i),
duration=timedelta(seconds=duration_s),
trace_id=trace_id,
span_id=ids[i],
parent_span_id="" if i == 0 else ids[i - 1],
name=name,
kind=TracesKind.SPAN_KIND_SERVER if i == 0 else TracesKind.SPAN_KIND_INTERNAL,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": service},
attributes={"operation.type": op_type, **extra},
)
)
return result
def _builder_query(name: str, filter_expr: str, limit: int = 100) -> dict:
return {
"type": "builder_query",
"spec": {
"name": name,
"signal": "traces",
"filter": {"expression": filter_expr},
"limit": limit,
},
}
# ---------------------------------------------------------------------------
# Order-by test
# ---------------------------------------------------------------------------
def test_trace_operator_query_order_by(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[list[Traces]], None],
) -> None:
"""
Verifies order-by behaviour for three sub-cases, all inserted once:
field_not_in_select
Order by an attribute absent from selectFields.
Guards against the NOT_FOUND_COLUMN_IN_BLOCK ClickHouse regression.
core_span_field
Order by duration_nano with no explicit selectFields.
non_core_field_in_select
Order by an attribute that IS in selectFields.
"""
now = datetime.now(tz=UTC).replace(second=0, microsecond=0)
insert_traces(
[
# field_not_in_select — two 3-level chains; differ only by http.method
*_chain_trace(
now,
("fnis-gp", "svc-a", "fnis-grandparent", 5, {"http.method": "POST"}),
("fnis-mid", "svc-a", "fnis-middle", 3),
("fnis-gc", "svc-a", "fnis-grandchild", 1),
),
*_chain_trace(
now,
("fnis-gp", "svc-b", "fnis-grandparent", 5, {"http.method": "GET"}),
("fnis-mid", "svc-b", "fnis-middle", 3),
("fnis-gc", "svc-b", "fnis-grandchild", 1),
),
# core_span_field — two parent→child chains; differ by duration
*_chain_trace(now, ("csf-parent-long", "svc-long", "csf-parent", 5), ("csf-child-long", "svc-long", "csf-child", 1)),
*_chain_trace(now, ("csf-parent-short", "svc-short", "csf-parent", 1), ("csf-child-short", "svc-short", "csf-child", 1)),
# non_core_field_in_select — two parent→child chains; differ by http.method
*_chain_trace(now, ("ncis-parent-post", "svc-post", "ncis-parent", 3, {"http.method": "POST"}), ("ncis-child-post", "svc-post", "ncis-child", 1)),
*_chain_trace(now, ("ncis-parent-get", "svc-get", "ncis-parent", 3, {"http.method": "GET"}), ("ncis-child-get", "svc-get", "ncis-child", 1)),
# noise
*_chain_trace(now, ("noise-span", "svc-noise", "noise-op", 1)),
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
def check_order(case_id, filter_a, filter_b, expression, select_fields, order, expected_rows):
resp = make_query_request(
signoz,
token,
start_ms=start_ms,
end_ms=end_ms,
request_type="raw",
queries=[
_builder_query("A", filter_a),
_builder_query("B", filter_b),
TraceOperatorQuery(name="C", expression=expression, return_spans_from="A", limit=100, select_fields=select_fields, order=order).to_dict(),
],
)
assert resp.status_code == HTTPStatus.OK, f"[{case_id}] {resp.text}"
assert resp.json()["status"] == "success"
rows = resp.json()["data"]["data"]["results"][0].get("rows") or []
assert len(rows) == len(expected_rows), f"[{case_id}] expected {len(expected_rows)} rows, got {len(rows)}"
for i, (row, expected) in enumerate(zip(rows, expected_rows)):
for key, value in expected.items():
assert row["data"].get(key) == value, f"[{case_id}] row {i}: {key}={value!r} expected, got {row['data'].get(key)!r}"
# POST > GET in DESC; order key is absent from selectFields
check_order(
"field_not_in_select",
"operation.type = 'fnis-grandparent'",
"operation.type = 'fnis-grandchild'",
"A -> B",
[TelemetryFieldKey(name="service.name", field_data_type="string", field_context="resource")],
[OrderBy(key=TelemetryFieldKey(name="http.method", field_data_type="string", field_context="attribute"), direction="desc")],
[{"service.name": "svc-a"}, {"service.name": "svc-b"}],
)
# 5 s parent before 1 s parent in DESC
check_order(
"core_span_field",
"operation.type = 'csf-parent'",
"operation.type = 'csf-child'",
"A => B",
None,
[OrderBy(key=TelemetryFieldKey(name="duration_nano", field_context="span"), direction="desc")],
[{"name": "csf-parent-long"}, {"name": "csf-parent-short"}],
)
# POST > GET in DESC; order key is in selectFields so it appears in each row
check_order(
"non_core_field_in_select",
"operation.type = 'ncis-parent'",
"operation.type = 'ncis-child'",
"A => B",
[TelemetryFieldKey(name="http.method", field_data_type="string", field_context="attribute")],
[OrderBy(key=TelemetryFieldKey(name="http.method", field_data_type="string", field_context="attribute"), direction="desc")],
[{"http.method": "POST"}, {"http.method": "GET"}],
)
# ---------------------------------------------------------------------------
# Expression × returnSpansFrom test
# ---------------------------------------------------------------------------
def test_trace_operator_expressions(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[list[Traces]], None],
) -> None:
"""
Covers all five operators × two returnSpansFrom settings in a single pass.
All test spans are inserted once; each operator uses a unique op_type prefix
so queries never interfere with each other.
For each operator:
default (returnSpansFrom="") — only spans satisfying the structural predicate
return_A (returnSpansFrom="A") — A spans from traces where the predicate held
Unary NOT A is skipped: its root CTE reads from all_spans (unbounded by any
filter), making row counts non-deterministic across a shared ClickHouse session.
"""
now = datetime.now(tz=UTC).replace(second=0, microsecond=0)
insert_traces(
[
# A => B: trace 1 matches (dc-root directly parents dc-leaf); trace 2 does not
*_chain_trace(now, ("dc-root", "svc-dc-a", "dc-root", 5), ("dc-leaf", "svc-dc-a", "dc-leaf", 2)),
*_chain_trace(now, ("dc-root-only", "svc-dc-b", "dc-root", 2)),
# A -> B: trace 1 matches (id-gp is an indirect ancestor of id-gc); trace 2 does not
*_chain_trace(
now,
("id-gp", "svc-id-a", "id-gp", 5),
("id-mid", "svc-id-a", "id-mid", 3),
("id-gc", "svc-id-a", "id-gc", 1),
),
*_chain_trace(now, ("id-gp-only", "svc-id-b", "id-gp", 2)),
# A && B: trace 1 matches (contains both A and B); trace 2 does not (no B)
*_chain_trace(now, ("and-root", "svc-and-a", "and-root", 5), ("and-leaf", "svc-and-a", "and-leaf", 2)),
*_chain_trace(now, ("and-root-only", "svc-and-b", "and-root", 2)),
# A || B: trace 1 has A only, trace 2 has B only (both match A || B)
*_chain_trace(now, ("or-a-span", "svc-or-a", "or-a", 5)),
*_chain_trace(now, ("or-b-span", "svc-or-b", "or-b", 2)),
# A NOT B: trace 1 has A + B child (does NOT match); trace 2 has A only (matches)
*_chain_trace(now, ("not-root-with-child", "svc-not-a", "not-root", 5), ("not-child", "svc-not-a", "not-child", 2)),
*_chain_trace(now, ("not-root-no-child", "svc-not-b", "not-root", 2)),
# noise — must not surface in any query below
*_chain_trace(now, ("noise-span", "svc-noise", "noise-op", 1)),
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
def check(case_id, filter_a, filter_b, expression, return_spans_from, expected_names):
resp = make_query_request(
signoz,
token,
start_ms=start_ms,
end_ms=end_ms,
request_type="raw",
queries=[
_builder_query("A", filter_a),
_builder_query("B", filter_b),
TraceOperatorQuery(name="C", expression=expression, return_spans_from=return_spans_from, limit=100).to_dict(),
],
)
assert resp.status_code == HTTPStatus.OK, f"[{case_id}] {resp.text}"
rows = resp.json()["data"]["data"]["results"][0].get("rows") or []
actual = {r["data"]["name"] for r in rows}
assert actual == expected_names, f"[{case_id}] expected {expected_names!r}, got {actual!r}"
# ── A => B (direct child) ────────────────────────────────────────────────
check("direct_child_default", "operation.type = 'dc-root'", "operation.type = 'dc-leaf'", "A => B", "", {"dc-root"})
check("direct_child_return_A", "operation.type = 'dc-root'", "operation.type = 'dc-leaf'", "A => B", "A", {"dc-root"})
# ── A -> B (indirect descendant) ─────────────────────────────────────────
check("indirect_descendant_default", "operation.type = 'id-gp'", "operation.type = 'id-gc'", "A -> B", "", {"id-gp"})
check("indirect_descendant_return_A", "operation.type = 'id-gp'", "operation.type = 'id-gc'", "A -> B", "A", {"id-gp"})
# ── A && B ────────────────────────────────────────────────────────────────
check("and_default", "operation.type = 'and-root'", "operation.type = 'and-leaf'", "A && B", "", {"and-root"})
check("and_return_A", "operation.type = 'and-root'", "operation.type = 'and-leaf'", "A && B", "A", {"and-root"})
# ── A || B ────────────────────────────────────────────────────────────────
# default returns UNION of A and B; return_A returns only A spans from matching traces
check("or_default", "operation.type = 'or-a'", "operation.type = 'or-b'", "A || B", "", {"or-a-span", "or-b-span"})
check("or_return_A", "operation.type = 'or-a'", "operation.type = 'or-b'", "A || B", "A", {"or-a-span"})
# ── A NOT B (binary not) ──────────────────────────────────────────────────
check("not_binary_default", "operation.type = 'not-root'", "operation.type = 'not-child'", "A NOT B", "", {"not-root-no-child"})
check("not_binary_return_A", "operation.type = 'not-root'", "operation.type = 'not-child'", "A NOT B", "A", {"not-root-no-child"})