Compare commits

..

6 Commits

Author SHA1 Message Date
Piyush Singariya
abbb6f7137 revert: var name change 2026-05-26 21:11:48 +05:30
Piyush Singariya
c1f3dacc0b fix: remove old pipelines 2026-05-26 21:10:55 +05:30
Piyush Singariya
2067b0f634 revert: test name 2026-05-26 20:58:11 +05:30
Piyush Singariya
75df0c00ef Merge branch 'main' into order-pipelines 2026-05-26 19:09:15 +05:30
Piyush Singariya
d03ff055bb fix: preserve order of pipelines between memory_limiter and batch 2026-05-26 19:08:07 +05:30
Piyush Singariya
2b10fbd4dd chore: var names changed 2026-05-26 17:38:27 +05:30
6 changed files with 132 additions and 683 deletions

View File

@@ -2,14 +2,11 @@ package logparsingpipeline
import (
"encoding/json"
"fmt"
"strings"
"sync"
"gopkg.in/yaml.v3"
"log/slog"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/constants"
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
@@ -26,6 +23,13 @@ var (
CodeCollectorConfigLogsPipelineNotFound = errors.MustNewCode("collector_config_logs_pipeline_not_found")
)
const (
memoryLimiterProcessor = "memory_limiter"
memoryLimiterProcessorPrefix = "memory_limiter/"
batchProcessor = "batch"
batchProcessorPrefix = "batch/"
)
// check if the processors already exist
// if yes then update the processor.
// if something doesn't exists then remove it.
@@ -79,6 +83,14 @@ func getOtelPipelineFromConfig(config map[string]interface{}) (*otelPipeline, er
return &p, nil
}
// buildCollectorPipelineProcessorsList assembles the final processor list in the
// required order:
//
// 1. memory_limiter processors (any processor named "memory_limiter" or "memory_limiter/<id>")
// 2. other existing processors (in their original order), which may include signoz processors
// that are not user-pipeline processors
// 3. signoz user-pipeline processors
// 4. batch processors (any processor named "batch" or "batch/<id>")
func buildCollectorPipelineProcessorsList(
currentCollectorProcessors []string,
signozPipelineProcessorNames []string,
@@ -86,90 +98,59 @@ func buildCollectorPipelineProcessorsList(
lockLogsPipelineSpec.Lock()
defer lockLogsPipelineSpec.Unlock()
exists := map[string]struct{}{}
for _, v := range signozPipelineProcessorNames {
exists[v] = struct{}{}
// Build a set of the desired signoz processors so we can drop any stale version
// of them (regardless of how they got into the current config) without
// accidentally duplicating them in the output.
desiredUserPipelineSet := make(map[string]struct{}, len(signozPipelineProcessorNames))
for _, p := range signozPipelineProcessorNames {
desiredUserPipelineSet[p] = struct{}{}
}
// removed the old processors which are not used
var pipeline []string
for _, procName := range currentCollectorProcessors {
_, isInDesiredPipelineProcs := exists[procName]
if isInDesiredPipelineProcs || !hasSignozPipelineProcessorPrefix(procName) {
pipeline = append(pipeline, procName)
}
}
result := make([]string, 0, len(currentCollectorProcessors)+len(signozPipelineProcessorNames))
// create a reverse map of existing config processors and their position
existing := map[string]int{}
for i, p := range pipeline {
name := p
existing[name] = i
}
// create mapping from our logsParserPipeline to position in existing processors (from current config)
// this means, if "batch" holds position 3 in the current effective config, and 2 in our config, the map will be [2]: 3
specVsExistingMap := map[int]int{}
existingVsSpec := map[int]int{}
// go through plan and map its elements to current positions in effective config
for i, m := range signozPipelineProcessorNames {
if loc, ok := existing[m]; ok {
specVsExistingMap[i] = loc
existingVsSpec[loc] = i
}
}
lastMatched := 0
newPipeline := []string{}
for i := 0; i < len(signozPipelineProcessorNames); i++ {
m := signozPipelineProcessorNames[i]
if loc, ok := specVsExistingMap[i]; ok {
for j := lastMatched; j < loc; j++ {
if hasSignozPipelineProcessorPrefix(pipeline[j]) {
delete(specVsExistingMap, existingVsSpec[j])
} else {
newPipeline = append(newPipeline, pipeline[j])
}
// Note: logic assumes there'll be only one batch processor
var batchProcIdx int
var batchProcFound bool
iteration:
for idx, p := range currentCollectorProcessors {
_, inDesiredSet := desiredUserPipelineSet[p]
switch {
// same processor exist; retain the location of pre-existing location
case p == memoryLimiterProcessor || strings.HasPrefix(p, memoryLimiterProcessorPrefix):
result = append(result, p)
case hasSignozPipelineProcessorPrefix(p):
// this processor has been dropped
if !inDesiredSet {
continue iteration
} else {
result = append(result, p)
}
newPipeline = append(newPipeline, pipeline[loc])
lastMatched = loc + 1
} else {
newPipeline = append(newPipeline, m)
case p == batchProcessor || strings.HasPrefix(p, batchProcessorPrefix):
batchProcIdx = idx
batchProcFound = true
break iteration
default:
result = append(result, p)
}
}
if lastMatched < len(pipeline) {
newPipeline = append(newPipeline, pipeline[lastMatched:]...)
}
if checkDuplicateString(newPipeline) {
// duplicates are most likely because the processor sequence in effective config conflicts
// with the planned sequence as per planned pipeline
return pipeline, fmt.Errorf("the effective config has an unexpected processor sequence: %v", pipeline)
}
return newPipeline, nil
}
func checkDuplicateString(pipeline []string) bool {
exists := make(map[string]bool, len(pipeline))
slog.Debug("checking duplicate processors in the pipeline", "pipeline", pipeline)
for _, processor := range pipeline {
name := processor
if _, ok := exists[name]; ok {
slog.Error(
"duplicate processor name detected in generated collector config for log pipelines",
"processor", processor,
"pipeline", pipeline,
)
return true
if inDesiredSet {
// delete from desired pipeline set so they're not added twice
delete(desiredUserPipelineSet, p)
}
exists[name] = true
}
return false
// add user pipelines
for _, proc := range signozPipelineProcessorNames {
_, add := desiredUserPipelineSet[proc]
if add {
result = append(result, proc)
}
}
// add batch processor and rest
if batchProcFound {
result = append(result, currentCollectorProcessors[batchProcIdx:]...)
}
return result, nil
}
func GenerateCollectorConfigWithPipelines(config []byte, pipelines []pipelinetypes.GettablePipeline) ([]byte, error) {

View File

@@ -106,107 +106,109 @@ func TestBuildLogParsingProcessors(t *testing.T) {
}
var BuildLogsPipelineTestData = []struct {
Name string
currentPipeline []string
logsPipeline []string
expectedPipeline []string
Name string
fromCollector []string
userPipelines []string
finalOutput []string
}{
{
Name: "Add new pipelines",
currentPipeline: []string{"processor1", "processor2"},
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b"},
expectedPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", "processor1", "processor2"},
Name: "Add new pipelines",
fromCollector: []string{"processor1", "processor2"},
userPipelines: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b"},
finalOutput: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b"},
},
{
Name: "Add new pipeline and respect custom processors",
currentPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2"},
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c"},
expectedPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", "processor2"},
Name: "Add new pipeline and respect custom processors",
fromCollector: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2"},
userPipelines: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c"},
finalOutput: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2", constants.LogsPPLPfx + "c"},
},
{
Name: "Add new pipeline and respect custom processors",
currentPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2"},
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "d"},
expectedPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "d", "processor2"},
Name: "Add new pipeline and respect custom processors",
fromCollector: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2"},
userPipelines: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "d"},
finalOutput: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "d"},
},
{
Name: "Add new pipeline and respect custom processors in the beginning and middle",
currentPipeline: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", "batch"},
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c"},
expectedPipeline: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", "batch"},
Name: "Add new pipeline and respect custom processors in the beginning and middle",
fromCollector: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", "batch"},
userPipelines: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c"},
finalOutput: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", "batch"},
},
{
Name: "Remove old pipeline add add new",
currentPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", "processor1", "processor2"},
logsPipeline: []string{constants.LogsPPLPfx + "a"},
expectedPipeline: []string{constants.LogsPPLPfx + "a", "processor1", "processor2"},
Name: "Remove old pipeline add add new",
fromCollector: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", "processor1", "processor2"},
userPipelines: []string{constants.LogsPPLPfx + "a"},
finalOutput: []string{constants.LogsPPLPfx + "a", "processor1", "processor2"},
},
{
Name: "Remove old pipeline from middle",
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", constants.LogsPPLPfx + "b", "batch"},
logsPipeline: []string{constants.LogsPPLPfx + "a"},
expectedPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", "batch"},
Name: "Remove old pipeline from middle",
fromCollector: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", constants.LogsPPLPfx + "b", "batch"},
userPipelines: []string{constants.LogsPPLPfx + "a"},
finalOutput: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", "batch"},
},
{
Name: "Remove old pipeline from middle and add new pipeline",
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", constants.LogsPPLPfx + "b", "batch"},
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "c"},
expectedPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "c", "processor3", "batch"},
Name: "Remove old pipeline from middle and add new pipeline",
fromCollector: []string{"memory_limiter", "processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", constants.LogsPPLPfx + "b", "batch"},
userPipelines: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "c"},
finalOutput: []string{"memory_limiter", "processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", constants.LogsPPLPfx + "c", "batch"},
},
{
Name: "Remove multiple old pipelines from middle and add multiple new ones",
currentPipeline: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", "processor3", constants.LogsPPLPfx + "c", "processor4", constants.LogsPPLPfx + "d", "processor5", "batch"},
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "a1", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "c1"},
expectedPipeline: []string{"processor1", constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "a1", "processor2", "processor3", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "c1", "processor4", "processor5", "batch"},
},
// working
{
Name: "rearrange pipelines",
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
logsPipeline: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a"},
expectedPipeline: []string{"processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", "batch"},
Name: "Remove multiple old pipelines from middle and add multiple new ones",
fromCollector: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", "processor3", constants.LogsPPLPfx + "c", "processor4", constants.LogsPPLPfx + "d", "processor5", "batch"},
userPipelines: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "a1", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "c1"},
finalOutput: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", "processor3", constants.LogsPPLPfx + "c", "processor4", "processor5", constants.LogsPPLPfx + "a1", constants.LogsPPLPfx + "c1", "batch"},
},
{
Name: "rearrange pipelines with new processor",
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
logsPipeline: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c"},
expectedPipeline: []string{"processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c", "batch"},
// expectedPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_b", "processor3", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c", "batch"},
Name: "rearrange pipelines",
fromCollector: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
userPipelines: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a"},
finalOutput: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
},
{
Name: "delete processor",
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
logsPipeline: []string{},
expectedPipeline: []string{"processor1", "processor2", "processor3", "batch"},
Name: "rearrange pipelines with new processor",
fromCollector: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
userPipelines: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c"},
finalOutput: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_c", "batch"},
},
{
Name: "last to first",
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", "processor4", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c"},
logsPipeline: []string{constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b"},
expectedPipeline: []string{"processor1", "processor2", "processor3", "processor4", "batch", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b"},
Name: "delete processor",
fromCollector: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
userPipelines: []string{},
finalOutput: []string{"processor1", "processor2", "processor3", "batch"},
},
{
Name: "multiple rearrange pipelines",
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"},
logsPipeline: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e"},
expectedPipeline: []string{"processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", "batch", "processor4", "processor5", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e", "processor6", "processor7"},
Name: "last to first",
fromCollector: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", "processor4", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c"},
userPipelines: []string{constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b"},
finalOutput: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", "processor4", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_c", "batch", constants.LogsPPLPfx + "_c"},
},
{
Name: "multiple rearrange with new pipelines",
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"},
logsPipeline: []string{constants.LogsPPLPfx + "_z", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e"},
expectedPipeline: []string{constants.LogsPPLPfx + "_z", "processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", "batch", "processor4", "processor5", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e", "processor6", "processor7"},
Name: "multiple rearrange pipelines",
fromCollector: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"},
userPipelines: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e"},
finalOutput: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"},
},
{
Name: "multiple rearrange with new pipelines",
fromCollector: []string{"memory_limiter", "processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"},
userPipelines: []string{constants.LogsPPLPfx + "_z", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e"},
finalOutput: []string{"memory_limiter", "processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_z", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"},
},
{
Name: "Prefixed proc in desired set not duplicated from others",
fromCollector: []string{"memory_limiter/logs", "custom_proc", "resourcedetection", "batch/logs"},
userPipelines: []string{"custom_proc", constants.LogsPPLPfx + "a"},
finalOutput: []string{"memory_limiter/logs", "custom_proc", "resourcedetection", constants.LogsPPLPfx + "a", "batch/logs"},
},
}
func TestBuildLogsPipeline(t *testing.T) {
for _, test := range BuildLogsPipelineTestData {
Convey(test.Name, t, func() {
v, err := buildCollectorPipelineProcessorsList(test.currentPipeline, test.logsPipeline)
v, err := buildCollectorPipelineProcessorsList(test.fromCollector, test.userPipelines)
So(err, ShouldBeNil)
fmt.Println(test.Name, "\n", test.currentPipeline, "\n", v, "\n", test.expectedPipeline)
So(v, ShouldResemble, test.expectedPipeline)
So(v, ShouldResemble, test.finalOutput)
})
}
}

View File

@@ -70,31 +70,12 @@ func (b *traceOperatorCTEBuilder) build(ctx context.Context, requestType qbtypes
selectFromCTE := rootCTEName
if b.operator.ReturnSpansFrom != "" {
sourceQueryCTE := b.queryToCTEName[b.operator.ReturnSpansFrom]
if sourceQueryCTE == "" {
selectFromCTE = b.queryToCTEName[b.operator.ReturnSpansFrom]
if selectFromCTE == "" {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput,
"returnSpansFrom references query '%s' which has no corresponding CTE",
b.operator.ReturnSpansFrom)
}
filteredCTEName := fmt.Sprintf("__return_from_%s", b.operator.ReturnSpansFrom)
// rootCTEName holds one row per matching *span*, not per *trace*, so it can
// contain many rows for the same trace_id. DISTINCT de-duplicates that set
// before ClickHouse builds the hash table for the IN check, keeping memory
// usage proportional to the number of distinct traces rather than spans.
matchingTracedSB := sqlbuilder.NewSelectBuilder()
matchingTracedSB.Select("DISTINCT trace_id")
matchingTracedSB.From(rootCTEName)
matchedTracesSQL, matchedTracesArgs := matchingTracedSB.BuildWithFlavor(sqlbuilder.ClickHouse)
filteredSB := sqlbuilder.NewSelectBuilder()
filteredSB.Select("*")
filteredSB.From(sourceQueryCTE)
filteredSB.Where(fmt.Sprintf("trace_id IN (%s)", matchedTracesSQL))
filteredSQL, filteredArgs := filteredSB.BuildWithFlavor(sqlbuilder.ClickHouse, matchedTracesArgs...)
b.addCTE(filteredCTEName, filteredSQL, filteredArgs, []string{sourceQueryCTE, rootCTEName})
selectFromCTE = filteredCTEName
}
finalStmt, err := b.buildFinalQuery(ctx, selectFromCTE, requestType)

View File

@@ -385,82 +385,6 @@ func TestTraceOperatorStatementBuilder(t *testing.T) {
},
expectedErr: nil,
},
{
name: "returnSpansFrom B: A -> B return B spans filtered by operator",
requestType: qbtypes.RequestTypeRaw,
operator: qbtypes.QueryBuilderTraceOperator{
Expression: "A -> B",
ReturnSpansFrom: "B",
Limit: 10,
},
compositeQuery: &qbtypes.CompositeQuery{
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Name: "A",
Signal: telemetrytypes.SignalTraces,
Filter: &qbtypes.Filter{Expression: "service.name = 'gateway'"},
},
},
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Name: "B",
Signal: telemetrytypes.SignalTraces,
Filter: &qbtypes.Filter{Expression: "service.name = 'database'"},
},
},
},
},
expected: qbtypes.Statement{
Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), A_INDIR_DESC_B AS (WITH RECURSIVE up AS (SELECT d.trace_id, d.span_id, d.parent_span_id, 0 AS depth FROM B AS d UNION ALL SELECT p.trace_id, p.span_id, p.parent_span_id, up.depth + 1 FROM all_spans AS p JOIN up ON p.trace_id = up.trace_id AND p.span_id = up.parent_span_id WHERE up.depth < 100) SELECT DISTINCT a.* FROM A AS a GLOBAL INNER JOIN (SELECT DISTINCT trace_id, span_id FROM up WHERE depth > 0 ) AS ancestors ON ancestors.trace_id = a.trace_id AND ancestors.span_id = a.span_id), __return_from_B AS (SELECT * FROM B WHERE trace_id IN (SELECT DISTINCT trace_id FROM A_INDIR_DESC_B)) SELECT timestamp, trace_id, span_id, name, duration_nano, parent_span_id FROM __return_from_B ORDER BY timestamp DESC LIMIT ? SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000",
Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "gateway", "%service.name%", "%service.name\":\"gateway%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "database", "%service.name%", "%service.name\":\"database%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
},
expectedErr: nil,
},
{
name: "returnSpansFrom C: (A -> B) && C return C spans filtered by operator",
requestType: qbtypes.RequestTypeRaw,
operator: qbtypes.QueryBuilderTraceOperator{
Expression: "(A -> B) && C",
ReturnSpansFrom: "C",
Limit: 10,
},
compositeQuery: &qbtypes.CompositeQuery{
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Name: "A",
Signal: telemetrytypes.SignalTraces,
Filter: &qbtypes.Filter{Expression: "service.name = 'gateway'"},
},
},
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Name: "B",
Signal: telemetrytypes.SignalTraces,
Filter: &qbtypes.Filter{Expression: "service.name = 'database'"},
},
},
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Name: "C",
Signal: telemetrytypes.SignalTraces,
Filter: &qbtypes.Filter{Expression: "service.name = 'auth'"},
},
},
},
},
expected: qbtypes.Statement{
Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), A_INDIR_DESC_B AS (WITH RECURSIVE up AS (SELECT d.trace_id, d.span_id, d.parent_span_id, 0 AS depth FROM B AS d UNION ALL SELECT p.trace_id, p.span_id, p.parent_span_id, up.depth + 1 FROM all_spans AS p JOIN up ON p.trace_id = up.trace_id AND p.span_id = up.parent_span_id WHERE up.depth < 100) SELECT DISTINCT a.* FROM A AS a GLOBAL INNER JOIN (SELECT DISTINCT trace_id, span_id FROM up WHERE depth > 0 ) AS ancestors ON ancestors.trace_id = a.trace_id AND ancestors.span_id = a.span_id), __resource_filter_C AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), C AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_C) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), A_INDIR_DESC_B_AND_C AS (SELECT l.* FROM A_INDIR_DESC_B AS l INNER JOIN C AS r ON l.trace_id = r.trace_id), __return_from_C AS (SELECT * FROM C WHERE trace_id IN (SELECT DISTINCT trace_id FROM A_INDIR_DESC_B_AND_C)) SELECT timestamp, trace_id, span_id, name, duration_nano, parent_span_id FROM __return_from_C ORDER BY timestamp DESC LIMIT ? SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000",
Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "gateway", "%service.name%", "%service.name\":\"gateway%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "database", "%service.name%", "%service.name\":\"database%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "auth", "%service.name%", "%service.name\":\"auth%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
},
expectedErr: nil,
},
}
fm := NewFieldMapper()

View File

@@ -72,7 +72,6 @@ class TraceOperatorQuery:
return_spans_from: str | None = None
limit: int | None = None
order: list[OrderBy] | None = None
select_fields: list[TelemetryFieldKey] | None = None
def to_dict(self) -> dict:
spec: dict[str, Any] = {
@@ -85,8 +84,6 @@ class TraceOperatorQuery:
spec["limit"] = self.limit
if self.order:
spec["order"] = [o.to_dict() if hasattr(o, "to_dict") else o for o in self.order]
if self.select_fields:
spec["selectFields"] = [f.to_dict() for f in self.select_fields]
return {"type": "builder_trace_operator", "spec": spec}

View File

@@ -1,436 +0,0 @@
"""
Integration tests for TraceOperatorQuery (builder_trace_operator) through the
/api/v5/query_range endpoint.
Covers:
1. Order-by variants (A -> B, A => B) with returnSpansFrom="A".
Guards against the NOT_FOUND_COLUMN_IN_BLOCK regression where ordering by a
column absent from an outer SELECT caused a query failure.
2. Expression operators (=>, ->, &&, ||, A NOT B) with and without returnSpansFrom.
returnSpansFrom semantics
--------------------------
returnSpansFrom="" (default)
The final rows come from the expression's root CTE. Only spans that
directly satisfy the structural predicate are returned.
returnSpansFrom="A"
The expression is still evaluated in full (the structural relationship
must hold), but the final rows are drawn from the A sub-query CTE,
filtered to traces that appeared in the expression result. Concretely:
the query returns every A span whose trace_id belongs to a trace that
matched the expression.
"""
from collections.abc import Callable
from datetime import UTC, datetime, timedelta
from http import HTTPStatus
import pytest
import requests
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.querier import get_rows
from fixtures.traces import TraceIdGenerator, Traces, TracesKind, TracesStatusCode
def _names(response: requests.Response) -> set:
return {r["data"]["name"] for r in get_rows(response)}
# ============================================================================
# Dataset — 4 traces using real OTel semantic-convention attributes
#
# Filter A = "http.method EXISTS" (HTTP entry-point spans)
# Filter B = "db.system = 'redis'" (direct Redis cache calls)
# / "db.system = 'postgresql'" (deeper DB queries, for indirect tests)
# / "messaging.system = 'kafka'" (async consumer, for OR tests)
#
# T1 checkout-svc [SERVER] POST /checkout (5s) ← structural root, http.method=POST
# ├─ proxy-svc [SERVER] api-proxy (3s) ← http.method=POST; no db children
# └─ [CLIENT] lookup-cart (redis)
# └─ [CLIENT] check-inventory (postgresql)
#
# T2 catalog-svc [SERVER] GET /catalog (1s) ← http.method=GET
# └─ [CLIENT] fetch-catalog (redis)
# └─ [CLIENT] read-cache (postgresql)
#
# T3 standalone-svc [SERVER] standalone-server ← http.method=POST; no db/cache children
# T4 isolated-svc [CONSUMER] isolated-worker ← messaging.system=kafka; no http.method → not in A
#
# T1 has TWO spans matching filter A (http.method EXISTS); returnSpansFrom changes what is returned:
# default → only spans that directly satisfy the structural predicate
# return_A → all matching A spans from traces where the predicate held
#
# Expression truth table:
# A -> B (indirect) A=http.method EXISTS B=db.system='postgresql' T1✓ T2✓ T3✗ T4✗
# A => B (direct) A=http.method EXISTS B=db.system='redis' T1✓ T2✓ T3✗ T4✗
# A && B A=http.method EXISTS B=db.system='redis' T1✓ T2✓ T3✗ T4✗
# A || B A=http.method EXISTS B=messaging.system='kafka' T1✓ T2✓ T3✓ T4✓
# A NOT B A=http.method EXISTS B=db.system='redis' T1✗ T2✗ T3✓ T4✗
#
# Order-by cases (all use returnSpansFrom=A, 3 rows expected):
# ob.indirect A->B order http.method DESC → POST(checkout+proxy), GET(catalog)
# ob.duration A=>B order duration_nano DESC → POST/checkout(5s), api-proxy(3s), GET/catalog(1s)
# ob.select A=>B order http.method DESC → POST, POST, GET
# ============================================================================
@pytest.mark.parametrize(
"case",
[
# ── Order-by: http.method DESC, NOT in selectFields ──────────────────────
# Guards against NOT_FOUND_COLUMN_IN_BLOCK: ordering by a column absent from
# the outer SELECT used to cause a ClickHouse query failure.
# returnSpansFrom="A" returns all T1 http.method spans: POST /checkout and api-proxy
# (both http.method="POST", services checkout-svc and proxy-svc), plus
# GET /catalog (http.method="GET") from T2.
# The two POST spans are tied so their relative order is undefined; catalog-svc
# (GET) is guaranteed to sort last.
pytest.param(
{
"filter_a": "http.method EXISTS",
"filter_b": "db.system = 'postgresql'",
"expression": "A -> B",
"return_spans_from": "A",
"select_fields": [{"name": "service.name", "fieldDataType": "string", "fieldContext": "resource"}],
"order": [{"key": {"name": "http.method", "fieldDataType": "string", "fieldContext": "attribute"}, "direction": "desc"}],
"validate": lambda r: len(get_rows(r)) == 3 and {get_rows(r)[0]["data"]["service.name"], get_rows(r)[1]["data"]["service.name"]} == {"checkout-svc", "proxy-svc"} and get_rows(r)[2]["data"]["service.name"] == "catalog-svc",
},
id="ob.indirect.http_method_not_in_select",
),
# ── Order-by: duration_nano DESC, core span field ─────────────────────────
# returnSpansFrom="A" includes api-proxy (3 s) in T1's result.
# Order: POST /checkout (5 s) > api-proxy (3 s) > GET /catalog (1 s).
pytest.param(
{
"filter_a": "http.method EXISTS",
"filter_b": "db.system = 'redis'",
"expression": "A => B",
"return_spans_from": "A",
"order": [{"key": {"name": "duration_nano", "fieldContext": "span"}, "direction": "desc"}],
"validate": lambda r: len(get_rows(r)) == 3 and get_rows(r)[0]["data"]["name"] == "POST /checkout" and get_rows(r)[1]["data"]["name"] == "api-proxy" and get_rows(r)[2]["data"]["name"] == "GET /catalog",
},
id="ob.duration.duration_nano_desc",
),
# ── Order-by: http.method DESC, IS in selectFields ────────────────────────
# http.method is selected so it appears in each result row.
# Both POST /checkout and api-proxy carry http.method="POST"; their relative
# order is undefined. GET /catalog ("GET") always sorts last.
pytest.param(
{
"filter_a": "http.method EXISTS",
"filter_b": "db.system = 'redis'",
"expression": "A => B",
"return_spans_from": "A",
"select_fields": [{"name": "http.method", "fieldDataType": "string", "fieldContext": "attribute"}],
"order": [{"key": {"name": "http.method", "fieldDataType": "string", "fieldContext": "attribute"}, "direction": "desc"}],
"validate": lambda r: len(get_rows(r)) == 3 and get_rows(r)[0]["data"]["http.method"] == "POST" and get_rows(r)[1]["data"]["http.method"] == "POST" and get_rows(r)[2]["data"]["http.method"] == "GET",
},
id="ob.select.http_method_in_select",
),
# ── A => B (direct child), returnSpansFrom="" ─────────────────────────────
# POST /checkout directly parents lookup-cart (redis); api-proxy has no redis child.
# T3 does not match (no redis descendant). Default returns only the satisfying A spans.
pytest.param(
{
"filter_a": "http.method EXISTS",
"filter_b": "db.system = 'redis'",
"expression": "A => B",
"return_spans_from": "",
"validate": lambda r: len(get_rows(r)) == 2 and _names(r) == {"POST /checkout", "GET /catalog"},
},
id="ex.direct_child.default",
),
# ── A => B (direct child), returnSpansFrom="A" ────────────────────────────
# T1 matches; return_A pulls all T1 http.method spans → api-proxy is included too.
pytest.param(
{
"filter_a": "http.method EXISTS",
"filter_b": "db.system = 'redis'",
"expression": "A => B",
"return_spans_from": "A",
"validate": lambda r: len(get_rows(r)) == 3 and _names(r) == {"POST /checkout", "GET /catalog", "api-proxy"},
},
id="ex.direct_child.return_A",
),
# ── A -> B (indirect descendant), returnSpansFrom="" ──────────────────────
# POST /checkout is an ancestor of check-inventory (postgresql) via lookup-cart.
# api-proxy has no postgresql descendants. T3 has no postgresql descendant.
pytest.param(
{
"filter_a": "http.method EXISTS",
"filter_b": "db.system = 'postgresql'",
"expression": "A -> B",
"return_spans_from": "",
"validate": lambda r: len(get_rows(r)) == 2 and _names(r) == {"POST /checkout", "GET /catalog"},
},
id="ex.indirect_descendant.default",
),
# ── A -> B (indirect descendant), returnSpansFrom="A" ────────────────────
# T1 matches; return_A pulls all T1 http.method spans → api-proxy is included too.
pytest.param(
{
"filter_a": "http.method EXISTS",
"filter_b": "db.system = 'postgresql'",
"expression": "A -> B",
"return_spans_from": "A",
"validate": lambda r: len(get_rows(r)) == 3 and _names(r) == {"POST /checkout", "GET /catalog", "api-proxy"},
},
id="ex.indirect_descendant.return_A",
),
# ── A && B (both present in same trace), returnSpansFrom="" ───────────────
# T1 and T2 match (each trace has http.method spans AND redis spans); T3 does not.
# A && B returns all A spans from matching traces — api-proxy is included
# because it shares T1's trace_id with POST /checkout.
# (return_A produces the same set by definition; no separate case needed.)
pytest.param(
{
"filter_a": "http.method EXISTS",
"filter_b": "db.system = 'redis'",
"expression": "A && B",
"return_spans_from": "",
"validate": lambda r: len(get_rows(r)) == 3 and _names(r) == {"POST /checkout", "GET /catalog", "api-proxy"},
},
id="ex.and.default",
),
# ── A || B (either present), returnSpansFrom="" ───────────────────────────
# T1, T2, T3 match via A (http.method spans); T4 matches via B (kafka span).
# Default returns UNION of all A and B spans from matching traces.
pytest.param(
{
"filter_a": "http.method EXISTS",
"filter_b": "messaging.system = 'kafka'",
"expression": "A || B",
"return_spans_from": "",
"validate": lambda r: len(get_rows(r)) == 5 and _names(r) == {"POST /checkout", "GET /catalog", "api-proxy", "standalone-server", "isolated-worker"},
},
id="ex.or.default",
),
# ── A || B, returnSpansFrom="A" ───────────────────────────────────────────
# All four traces match; only A spans are returned.
# T4 has no http.method span, so it contributes nothing to A.
pytest.param(
{
"filter_a": "http.method EXISTS",
"filter_b": "messaging.system = 'kafka'",
"expression": "A || B",
"return_spans_from": "A",
"validate": lambda r: len(get_rows(r)) == 4 and _names(r) == {"POST /checkout", "GET /catalog", "api-proxy", "standalone-server"},
},
id="ex.or.return_A",
),
# ── A NOT B (A present, B absent from trace), returnSpansFrom="" ─────────
# T1 and T2 do NOT match: their traces contain redis spans.
# T3 MATCHES: has http.method span but no redis span in its trace.
# T4 has no http.method span, so it cannot contribute an A span.
# (return_A produces the same set; no separate case needed.)
pytest.param(
{
"filter_a": "http.method EXISTS",
"filter_b": "db.system = 'redis'",
"expression": "A NOT B",
"return_spans_from": "",
"validate": lambda r: len(get_rows(r)) == 1 and _names(r) == {"standalone-server"},
},
id="ex.not.default",
),
],
)
def test_trace_operator(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[list[Traces]], None],
case: dict,
) -> None:
t1_trace_id = TraceIdGenerator.trace_id()
t1_checkout_span_id = TraceIdGenerator.span_id() # POST /checkout — structural root of T1
t1_child_span_id = TraceIdGenerator.span_id() # lookup-cart
t2_trace_id = TraceIdGenerator.trace_id()
t2_root_span_id = TraceIdGenerator.span_id()
t2_child_span_id = TraceIdGenerator.span_id()
now = datetime.now(tz=UTC).replace(second=0, microsecond=0)
insert_traces(
[
# T1 — two http.method spans in the same trace, modelling a real proxy+service pair.
# POST /checkout (checkout-svc) is the root (parent_span_id="").
# api-proxy (proxy-svc) is a structural child of POST /checkout but also has
# http.method set, so it matches filter A alongside POST /checkout.
# Both carry http.method="POST" — they differ only in service.name.
# This is what makes returnSpansFrom="" and returnSpansFrom="A" distinct:
# default → only POST /checkout satisfies A => B or A -> B
# return_A → api-proxy is pulled in too (all A spans from the matching trace)
Traces(
timestamp=now - timedelta(seconds=10),
duration=timedelta(seconds=5),
trace_id=t1_trace_id,
span_id=t1_checkout_span_id,
parent_span_id="",
name="POST /checkout",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "checkout-svc"},
attributes={"http.method": "POST", "http.route": "/checkout"},
),
Traces(
timestamp=now - timedelta(seconds=10),
duration=timedelta(seconds=3),
trace_id=t1_trace_id,
span_id=TraceIdGenerator.span_id(),
parent_span_id=t1_checkout_span_id,
name="api-proxy",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "proxy-svc"},
attributes={"http.method": "POST", "http.route": "/proxy"},
),
Traces(
timestamp=now - timedelta(seconds=9),
duration=timedelta(seconds=2),
trace_id=t1_trace_id,
span_id=t1_child_span_id,
parent_span_id=t1_checkout_span_id,
name="lookup-cart",
kind=TracesKind.SPAN_KIND_CLIENT,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "checkout-svc"},
attributes={"db.system": "redis", "db.operation": "GET"},
),
Traces(
timestamp=now - timedelta(seconds=8),
duration=timedelta(seconds=1),
trace_id=t1_trace_id,
span_id=TraceIdGenerator.span_id(),
parent_span_id=t1_child_span_id,
name="check-inventory",
kind=TracesKind.SPAN_KIND_CLIENT,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "checkout-svc"},
attributes={"db.system": "postgresql", "db.operation": "SELECT"},
),
# T2 — catalog-svc: GET /catalog (1 s root) → fetch-catalog (redis) → read-cache (postgresql)
Traces(
timestamp=now - timedelta(seconds=10),
duration=timedelta(seconds=1),
trace_id=t2_trace_id,
span_id=t2_root_span_id,
parent_span_id="",
name="GET /catalog",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "catalog-svc"},
attributes={"http.method": "GET", "http.route": "/catalog"},
),
Traces(
timestamp=now - timedelta(seconds=9),
duration=timedelta(seconds=2),
trace_id=t2_trace_id,
span_id=t2_child_span_id,
parent_span_id=t2_root_span_id,
name="fetch-catalog",
kind=TracesKind.SPAN_KIND_CLIENT,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "catalog-svc"},
attributes={"db.system": "redis", "db.operation": "GET"},
),
Traces(
timestamp=now - timedelta(seconds=8),
duration=timedelta(seconds=1),
trace_id=t2_trace_id,
span_id=TraceIdGenerator.span_id(),
parent_span_id=t2_child_span_id,
name="read-cache",
kind=TracesKind.SPAN_KIND_CLIENT,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "catalog-svc"},
attributes={"db.system": "postgresql", "db.operation": "SELECT"},
),
# T3 — standalone-svc: HTTP entry span with no downstream calls.
# Fails A => B / A -> B / A && B (no redis/postgresql descendant).
# Matches A NOT B (has http.method span, no redis child).
# Contributes to A || B via A.
Traces(
timestamp=now - timedelta(seconds=10),
duration=timedelta(seconds=3),
trace_id=TraceIdGenerator.trace_id(),
span_id=TraceIdGenerator.span_id(),
parent_span_id="",
name="standalone-server",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "standalone-svc"},
attributes={"http.method": "POST", "http.route": "/"},
),
# T4 — isolated-svc: Kafka consumer; no http.method so it never matches filter A.
# Used only as the B side of A || B to prove the OR operator matches via B.
Traces(
timestamp=now - timedelta(seconds=10),
duration=timedelta(seconds=1),
trace_id=TraceIdGenerator.trace_id(),
span_id=TraceIdGenerator.span_id(),
parent_span_id="",
name="isolated-worker",
kind=TracesKind.SPAN_KIND_CONSUMER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "isolated-svc"},
attributes={"messaging.system": "kafka", "messaging.destination": "orders"},
),
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
spec: dict = {
"name": "C",
"expression": case["expression"],
"returnSpansFrom": case.get("return_spans_from", ""),
"limit": case.get("limit", 100),
}
if case.get("select_fields"):
spec["selectFields"] = case["select_fields"]
if case.get("order"):
spec["order"] = case["order"]
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v5/query_range"),
timeout=5,
headers={"authorization": f"Bearer {token}"},
json={
"schemaVersion": "v1",
"start": start_ms,
"end": end_ms,
"requestType": "raw",
"compositeQuery": {
"queries": [
{
"type": "builder_query",
"spec": {"name": "A", "signal": "traces", "filter": {"expression": case["filter_a"]}, "limit": 100},
},
{
"type": "builder_query",
"spec": {"name": "B", "signal": "traces", "filter": {"expression": case["filter_b"]}, "limit": 100},
},
{"type": "builder_trace_operator", "spec": spec},
]
},
"formatOptions": {"formatTableResultForUI": False, "fillGaps": False},
},
)
assert response.status_code == HTTPStatus.OK, f"HTTP {response.status_code}: {response.text}"
assert case["validate"](response), f"validation failed: {response.json()}"