Compare commits

..

38 Commits

Author SHA1 Message Date
Piyush Singariya
65adf8b74e Merge branch 'main' into traceop-returnspansfrom 2026-05-26 16:30:48 +05:30
Piyush Singariya
2080176766 chore: import functions from querier 2026-05-26 16:30:22 +05:30
Piyush Singariya
f3a1f7ba0c chore: added length checks 2026-05-26 10:59:06 +05:30
Piyush Singariya
6b4cc758ec Merge branch 'main' into traceop-returnspansfrom 2026-05-26 10:18:34 +05:30
Piyush Singariya
63918541d0 fix: py fmt 2026-05-26 10:18:15 +05:30
Piyush Singariya
c1812ebf29 Merge branch 'main' into traceop-returnspansfrom 2026-05-25 20:15:41 +05:30
Piyush Singariya
331fe3bda8 chore: fix the comments 2026-05-25 20:15:27 +05:30
Piyush Singariya
8f1113c528 fix: use pytest param 2026-05-25 20:12:06 +05:30
Piyush Singariya
b347ce6a28 fix: tests are better 2026-05-25 20:07:16 +05:30
Piyush Singariya
d57281f0bb test: wip rewrite integration tests better 2026-05-25 11:27:49 +05:30
Piyush Singariya
08008ad813 Merge branch 'main' into traceop-returnspansfrom 2026-05-20 15:52:13 +05:30
Piyush Singariya
1454a96d4d fix: replace JOIN with IN 2026-05-20 15:51:15 +05:30
Piyush Singariya
4c02ee28de fix: tests rewritten 2026-05-20 10:41:55 +05:30
Piyush Singariya
e8befce898 Merge branch 'main' into traceop-returnspansfrom 2026-05-19 13:44:37 +05:30
Piyush Singariya
ec2bcbcbdc fix: integration tests 2026-05-19 13:44:06 +05:30
Piyush Singariya
370db055b3 chore: fmt py 2026-05-19 11:23:19 +05:30
Piyush Singariya
d197212918 chore: fmt py 2026-05-19 11:18:06 +05:30
Piyush Singariya
96b6d8646f chore: tests updated 2026-05-19 11:16:59 +05:30
Piyush Singariya
0aa6165b18 Merge branch 'main' into traceop-returnspansfrom 2026-05-19 11:12:59 +05:30
Piyush Singariya
dafa81f3b4 Merge branch 'main' into traceop-returnspansfrom 2026-05-12 21:03:16 +05:30
Piyush Singariya
a992a13f56 revert: unused test 2026-05-12 20:58:17 +05:30
Piyush Singariya
79b36abbd7 chore: comments and test 2026-05-12 20:57:00 +05:30
Piyush Singariya
181c307d1a Merge branch 'main' into traceop-returnspansfrom 2026-05-12 18:14:09 +05:30
Piyush Singariya
becdd4d3b4 revert: build list query 2026-05-12 18:11:35 +05:30
Piyush Singariya
de0311201a revert: double select 2026-05-12 17:15:41 +05:30
Piyush Singariya
1804bfe802 fix: return spans from 2026-05-12 16:53:31 +05:30
Piyush Singariya
357444c94e Merge branch 'main' into traceop 2026-05-11 20:53:51 +05:30
Piyush Singariya
a8598f3bfa fix: alias all core columns 2026-05-11 20:53:09 +05:30
Piyush Singariya
bca71f9a33 chore: remove comments 2026-05-11 16:04:32 +05:30
Piyush Singariya
c93660357d chore: fmt python 2026-05-11 16:02:18 +05:30
Piyush Singariya
5651e3b7a8 Merge branch 'main' into traceop 2026-05-11 14:28:58 +05:30
Piyush Singariya
cf2cfbc7d4 fix: remove specific of timestamp 2026-05-11 14:27:01 +05:30
Piyush Singariya
a969c38224 chore: fmtlint 2026-05-07 13:53:12 +05:30
Piyush Singariya
b892a0f0a5 chore: file rename 2026-05-07 13:51:22 +05:30
Piyush Singariya
4d47762eba chore: separate e2e test file 2026-05-07 13:50:11 +05:30
Piyush Singariya
77396a0bb3 Merge branch 'main' into traceop 2026-05-07 12:56:59 +05:30
Piyush Singariya
28c05e1bab Merge branch 'main' into traceop 2026-05-04 14:27:19 +05:30
Piyush Singariya
2b9e383994 fix: trace raw export e2e 2026-04-30 15:25:43 +05:30
6 changed files with 681 additions and 130 deletions

View File

@@ -2,11 +2,14 @@ package logparsingpipeline
import (
"encoding/json"
"fmt"
"strings"
"sync"
"gopkg.in/yaml.v3"
"log/slog"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/constants"
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
@@ -23,13 +26,6 @@ var (
CodeCollectorConfigLogsPipelineNotFound = errors.MustNewCode("collector_config_logs_pipeline_not_found")
)
const (
memoryLimiterProcessor = "memory_limiter"
memoryLimiterProcessorPrefix = "memory_limiter/"
batchProcessor = "batch"
batchProcessorPrefix = "batch/"
)
// check if the processors already exist
// if yes then update the processor.
// if something doesn't exists then remove it.
@@ -83,14 +79,6 @@ func getOtelPipelineFromConfig(config map[string]interface{}) (*otelPipeline, er
return &p, nil
}
// buildCollectorPipelineProcessorsList assembles the final processor list in the
// required order:
//
// 1. memory_limiter processors (any processor named "memory_limiter" or "memory_limiter/<id>")
// 2. other existing processors (in their original order), which may include signoz processors
// that are not user-pipeline processors
// 3. signoz user-pipeline processors
// 4. batch processors (any processor named "batch" or "batch/<id>")
func buildCollectorPipelineProcessorsList(
currentCollectorProcessors []string,
signozPipelineProcessorNames []string,
@@ -98,59 +86,90 @@ func buildCollectorPipelineProcessorsList(
lockLogsPipelineSpec.Lock()
defer lockLogsPipelineSpec.Unlock()
// Build a set of the desired signoz processors so we can drop any stale version
// of them (regardless of how they got into the current config) without
// accidentally duplicating them in the output.
desiredUserPipelineSet := make(map[string]struct{}, len(signozPipelineProcessorNames))
for _, p := range signozPipelineProcessorNames {
desiredUserPipelineSet[p] = struct{}{}
exists := map[string]struct{}{}
for _, v := range signozPipelineProcessorNames {
exists[v] = struct{}{}
}
result := make([]string, 0, len(currentCollectorProcessors)+len(signozPipelineProcessorNames))
// removed the old processors which are not used
var pipeline []string
for _, procName := range currentCollectorProcessors {
_, isInDesiredPipelineProcs := exists[procName]
if isInDesiredPipelineProcs || !hasSignozPipelineProcessorPrefix(procName) {
pipeline = append(pipeline, procName)
}
}
// Note: logic assumes there'll be only one batch processor
var batchProcIdx int
var batchProcFound bool
iteration:
for idx, p := range currentCollectorProcessors {
_, inDesiredSet := desiredUserPipelineSet[p]
switch {
// same processor exist; retain the location of pre-existing location
case p == memoryLimiterProcessor || strings.HasPrefix(p, memoryLimiterProcessorPrefix):
result = append(result, p)
case hasSignozPipelineProcessorPrefix(p):
// this processor has been dropped
if !inDesiredSet {
continue iteration
} else {
result = append(result, p)
// create a reverse map of existing config processors and their position
existing := map[string]int{}
for i, p := range pipeline {
name := p
existing[name] = i
}
// create mapping from our logsParserPipeline to position in existing processors (from current config)
// this means, if "batch" holds position 3 in the current effective config, and 2 in our config, the map will be [2]: 3
specVsExistingMap := map[int]int{}
existingVsSpec := map[int]int{}
// go through plan and map its elements to current positions in effective config
for i, m := range signozPipelineProcessorNames {
if loc, ok := existing[m]; ok {
specVsExistingMap[i] = loc
existingVsSpec[loc] = i
}
}
lastMatched := 0
newPipeline := []string{}
for i := 0; i < len(signozPipelineProcessorNames); i++ {
m := signozPipelineProcessorNames[i]
if loc, ok := specVsExistingMap[i]; ok {
for j := lastMatched; j < loc; j++ {
if hasSignozPipelineProcessorPrefix(pipeline[j]) {
delete(specVsExistingMap, existingVsSpec[j])
} else {
newPipeline = append(newPipeline, pipeline[j])
}
}
case p == batchProcessor || strings.HasPrefix(p, batchProcessorPrefix):
batchProcIdx = idx
batchProcFound = true
break iteration
default:
result = append(result, p)
newPipeline = append(newPipeline, pipeline[loc])
lastMatched = loc + 1
} else {
newPipeline = append(newPipeline, m)
}
if inDesiredSet {
// delete from desired pipeline set so they're not added twice
delete(desiredUserPipelineSet, p)
}
}
// add user pipelines
for _, proc := range signozPipelineProcessorNames {
_, add := desiredUserPipelineSet[proc]
if add {
result = append(result, proc)
}
if lastMatched < len(pipeline) {
newPipeline = append(newPipeline, pipeline[lastMatched:]...)
}
// add batch processor and rest
if batchProcFound {
result = append(result, currentCollectorProcessors[batchProcIdx:]...)
if checkDuplicateString(newPipeline) {
// duplicates are most likely because the processor sequence in effective config conflicts
// with the planned sequence as per planned pipeline
return pipeline, fmt.Errorf("the effective config has an unexpected processor sequence: %v", pipeline)
}
return result, nil
return newPipeline, nil
}
func checkDuplicateString(pipeline []string) bool {
exists := make(map[string]bool, len(pipeline))
slog.Debug("checking duplicate processors in the pipeline", "pipeline", pipeline)
for _, processor := range pipeline {
name := processor
if _, ok := exists[name]; ok {
slog.Error(
"duplicate processor name detected in generated collector config for log pipelines",
"processor", processor,
"pipeline", pipeline,
)
return true
}
exists[name] = true
}
return false
}
func GenerateCollectorConfigWithPipelines(config []byte, pipelines []pipelinetypes.GettablePipeline) ([]byte, error) {

View File

@@ -106,109 +106,107 @@ func TestBuildLogParsingProcessors(t *testing.T) {
}
var BuildLogsPipelineTestData = []struct {
Name string
fromCollector []string
userPipelines []string
finalOutput []string
Name string
currentPipeline []string
logsPipeline []string
expectedPipeline []string
}{
{
Name: "Add new pipelines",
fromCollector: []string{"processor1", "processor2"},
userPipelines: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b"},
finalOutput: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b"},
Name: "Add new pipelines",
currentPipeline: []string{"processor1", "processor2"},
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b"},
expectedPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", "processor1", "processor2"},
},
{
Name: "Add new pipeline and respect custom processors",
fromCollector: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2"},
userPipelines: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c"},
finalOutput: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2", constants.LogsPPLPfx + "c"},
Name: "Add new pipeline and respect custom processors",
currentPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2"},
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c"},
expectedPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", "processor2"},
},
{
Name: "Add new pipeline and respect custom processors",
fromCollector: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2"},
userPipelines: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "d"},
finalOutput: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "d"},
Name: "Add new pipeline and respect custom processors",
currentPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2"},
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "d"},
expectedPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "d", "processor2"},
},
{
Name: "Add new pipeline and respect custom processors in the beginning and middle",
fromCollector: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", "batch"},
userPipelines: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c"},
finalOutput: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", "batch"},
Name: "Add new pipeline and respect custom processors in the beginning and middle",
currentPipeline: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", "batch"},
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c"},
expectedPipeline: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", "batch"},
},
{
Name: "Remove old pipeline add add new",
fromCollector: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", "processor1", "processor2"},
userPipelines: []string{constants.LogsPPLPfx + "a"},
finalOutput: []string{constants.LogsPPLPfx + "a", "processor1", "processor2"},
Name: "Remove old pipeline add add new",
currentPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", "processor1", "processor2"},
logsPipeline: []string{constants.LogsPPLPfx + "a"},
expectedPipeline: []string{constants.LogsPPLPfx + "a", "processor1", "processor2"},
},
{
Name: "Remove old pipeline from middle",
fromCollector: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", constants.LogsPPLPfx + "b", "batch"},
userPipelines: []string{constants.LogsPPLPfx + "a"},
finalOutput: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", "batch"},
Name: "Remove old pipeline from middle",
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", constants.LogsPPLPfx + "b", "batch"},
logsPipeline: []string{constants.LogsPPLPfx + "a"},
expectedPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", "batch"},
},
{
Name: "Remove old pipeline from middle and add new pipeline",
fromCollector: []string{"memory_limiter", "processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", constants.LogsPPLPfx + "b", "batch"},
userPipelines: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "c"},
finalOutput: []string{"memory_limiter", "processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", constants.LogsPPLPfx + "c", "batch"},
Name: "Remove old pipeline from middle and add new pipeline",
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", constants.LogsPPLPfx + "b", "batch"},
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "c"},
expectedPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "c", "processor3", "batch"},
},
{
Name: "Remove multiple old pipelines from middle and add multiple new ones",
fromCollector: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", "processor3", constants.LogsPPLPfx + "c", "processor4", constants.LogsPPLPfx + "d", "processor5", "batch"},
userPipelines: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "a1", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "c1"},
finalOutput: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", "processor3", constants.LogsPPLPfx + "c", "processor4", "processor5", constants.LogsPPLPfx + "a1", constants.LogsPPLPfx + "c1", "batch"},
Name: "Remove multiple old pipelines from middle and add multiple new ones",
currentPipeline: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", "processor3", constants.LogsPPLPfx + "c", "processor4", constants.LogsPPLPfx + "d", "processor5", "batch"},
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "a1", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "c1"},
expectedPipeline: []string{"processor1", constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "a1", "processor2", "processor3", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "c1", "processor4", "processor5", "batch"},
},
// working
{
Name: "rearrange pipelines",
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
logsPipeline: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a"},
expectedPipeline: []string{"processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", "batch"},
},
{
Name: "rearrange pipelines",
fromCollector: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
userPipelines: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a"},
finalOutput: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
Name: "rearrange pipelines with new processor",
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
logsPipeline: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c"},
expectedPipeline: []string{"processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c", "batch"},
// expectedPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_b", "processor3", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c", "batch"},
},
{
Name: "rearrange pipelines with new processor",
fromCollector: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
userPipelines: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c"},
finalOutput: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_c", "batch"},
Name: "delete processor",
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
logsPipeline: []string{},
expectedPipeline: []string{"processor1", "processor2", "processor3", "batch"},
},
{
Name: "delete processor",
fromCollector: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
userPipelines: []string{},
finalOutput: []string{"processor1", "processor2", "processor3", "batch"},
Name: "last to first",
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", "processor4", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c"},
logsPipeline: []string{constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b"},
expectedPipeline: []string{"processor1", "processor2", "processor3", "processor4", "batch", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b"},
},
{
Name: "last to first",
fromCollector: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", "processor4", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c"},
userPipelines: []string{constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b"},
finalOutput: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", "processor4", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_c", "batch", constants.LogsPPLPfx + "_c"},
Name: "multiple rearrange pipelines",
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"},
logsPipeline: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e"},
expectedPipeline: []string{"processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", "batch", "processor4", "processor5", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e", "processor6", "processor7"},
},
{
Name: "multiple rearrange pipelines",
fromCollector: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"},
userPipelines: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e"},
finalOutput: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"},
},
{
Name: "multiple rearrange with new pipelines",
fromCollector: []string{"memory_limiter", "processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"},
userPipelines: []string{constants.LogsPPLPfx + "_z", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e"},
finalOutput: []string{"memory_limiter", "processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_z", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"},
},
{
Name: "Prefixed proc in desired set not duplicated from others",
fromCollector: []string{"memory_limiter/logs", "custom_proc", "resourcedetection", "batch/logs"},
userPipelines: []string{"custom_proc", constants.LogsPPLPfx + "a"},
finalOutput: []string{"memory_limiter/logs", "custom_proc", "resourcedetection", constants.LogsPPLPfx + "a", "batch/logs"},
Name: "multiple rearrange with new pipelines",
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"},
logsPipeline: []string{constants.LogsPPLPfx + "_z", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e"},
expectedPipeline: []string{constants.LogsPPLPfx + "_z", "processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", "batch", "processor4", "processor5", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e", "processor6", "processor7"},
},
}
func TestBuildLogsPipeline(t *testing.T) {
for _, test := range BuildLogsPipelineTestData {
Convey(test.Name, t, func() {
v, err := buildCollectorPipelineProcessorsList(test.fromCollector, test.userPipelines)
v, err := buildCollectorPipelineProcessorsList(test.currentPipeline, test.logsPipeline)
So(err, ShouldBeNil)
So(v, ShouldResemble, test.finalOutput)
fmt.Println(test.Name, "\n", test.currentPipeline, "\n", v, "\n", test.expectedPipeline)
So(v, ShouldResemble, test.expectedPipeline)
})
}
}

View File

@@ -70,12 +70,31 @@ func (b *traceOperatorCTEBuilder) build(ctx context.Context, requestType qbtypes
selectFromCTE := rootCTEName
if b.operator.ReturnSpansFrom != "" {
selectFromCTE = b.queryToCTEName[b.operator.ReturnSpansFrom]
if selectFromCTE == "" {
sourceQueryCTE := b.queryToCTEName[b.operator.ReturnSpansFrom]
if sourceQueryCTE == "" {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput,
"returnSpansFrom references query '%s' which has no corresponding CTE",
b.operator.ReturnSpansFrom)
}
filteredCTEName := fmt.Sprintf("__return_from_%s", b.operator.ReturnSpansFrom)
// rootCTEName holds one row per matching *span*, not per *trace*, so it can
// contain many rows for the same trace_id. DISTINCT de-duplicates that set
// before ClickHouse builds the hash table for the IN check, keeping memory
// usage proportional to the number of distinct traces rather than spans.
matchingTracedSB := sqlbuilder.NewSelectBuilder()
matchingTracedSB.Select("DISTINCT trace_id")
matchingTracedSB.From(rootCTEName)
matchedTracesSQL, matchedTracesArgs := matchingTracedSB.BuildWithFlavor(sqlbuilder.ClickHouse)
filteredSB := sqlbuilder.NewSelectBuilder()
filteredSB.Select("*")
filteredSB.From(sourceQueryCTE)
filteredSB.Where(fmt.Sprintf("trace_id IN (%s)", matchedTracesSQL))
filteredSQL, filteredArgs := filteredSB.BuildWithFlavor(sqlbuilder.ClickHouse, matchedTracesArgs...)
b.addCTE(filteredCTEName, filteredSQL, filteredArgs, []string{sourceQueryCTE, rootCTEName})
selectFromCTE = filteredCTEName
}
finalStmt, err := b.buildFinalQuery(ctx, selectFromCTE, requestType)

View File

@@ -385,6 +385,82 @@ func TestTraceOperatorStatementBuilder(t *testing.T) {
},
expectedErr: nil,
},
{
name: "returnSpansFrom B: A -> B return B spans filtered by operator",
requestType: qbtypes.RequestTypeRaw,
operator: qbtypes.QueryBuilderTraceOperator{
Expression: "A -> B",
ReturnSpansFrom: "B",
Limit: 10,
},
compositeQuery: &qbtypes.CompositeQuery{
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Name: "A",
Signal: telemetrytypes.SignalTraces,
Filter: &qbtypes.Filter{Expression: "service.name = 'gateway'"},
},
},
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Name: "B",
Signal: telemetrytypes.SignalTraces,
Filter: &qbtypes.Filter{Expression: "service.name = 'database'"},
},
},
},
},
expected: qbtypes.Statement{
Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), A_INDIR_DESC_B AS (WITH RECURSIVE up AS (SELECT d.trace_id, d.span_id, d.parent_span_id, 0 AS depth FROM B AS d UNION ALL SELECT p.trace_id, p.span_id, p.parent_span_id, up.depth + 1 FROM all_spans AS p JOIN up ON p.trace_id = up.trace_id AND p.span_id = up.parent_span_id WHERE up.depth < 100) SELECT DISTINCT a.* FROM A AS a GLOBAL INNER JOIN (SELECT DISTINCT trace_id, span_id FROM up WHERE depth > 0 ) AS ancestors ON ancestors.trace_id = a.trace_id AND ancestors.span_id = a.span_id), __return_from_B AS (SELECT * FROM B WHERE trace_id IN (SELECT DISTINCT trace_id FROM A_INDIR_DESC_B)) SELECT timestamp, trace_id, span_id, name, duration_nano, parent_span_id FROM __return_from_B ORDER BY timestamp DESC LIMIT ? SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000",
Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "gateway", "%service.name%", "%service.name\":\"gateway%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "database", "%service.name%", "%service.name\":\"database%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
},
expectedErr: nil,
},
{
name: "returnSpansFrom C: (A -> B) && C return C spans filtered by operator",
requestType: qbtypes.RequestTypeRaw,
operator: qbtypes.QueryBuilderTraceOperator{
Expression: "(A -> B) && C",
ReturnSpansFrom: "C",
Limit: 10,
},
compositeQuery: &qbtypes.CompositeQuery{
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Name: "A",
Signal: telemetrytypes.SignalTraces,
Filter: &qbtypes.Filter{Expression: "service.name = 'gateway'"},
},
},
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Name: "B",
Signal: telemetrytypes.SignalTraces,
Filter: &qbtypes.Filter{Expression: "service.name = 'database'"},
},
},
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Name: "C",
Signal: telemetrytypes.SignalTraces,
Filter: &qbtypes.Filter{Expression: "service.name = 'auth'"},
},
},
},
},
expected: qbtypes.Statement{
Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), A_INDIR_DESC_B AS (WITH RECURSIVE up AS (SELECT d.trace_id, d.span_id, d.parent_span_id, 0 AS depth FROM B AS d UNION ALL SELECT p.trace_id, p.span_id, p.parent_span_id, up.depth + 1 FROM all_spans AS p JOIN up ON p.trace_id = up.trace_id AND p.span_id = up.parent_span_id WHERE up.depth < 100) SELECT DISTINCT a.* FROM A AS a GLOBAL INNER JOIN (SELECT DISTINCT trace_id, span_id FROM up WHERE depth > 0 ) AS ancestors ON ancestors.trace_id = a.trace_id AND ancestors.span_id = a.span_id), __resource_filter_C AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), C AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_C) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), A_INDIR_DESC_B_AND_C AS (SELECT l.* FROM A_INDIR_DESC_B AS l INNER JOIN C AS r ON l.trace_id = r.trace_id), __return_from_C AS (SELECT * FROM C WHERE trace_id IN (SELECT DISTINCT trace_id FROM A_INDIR_DESC_B_AND_C)) SELECT timestamp, trace_id, span_id, name, duration_nano, parent_span_id FROM __return_from_C ORDER BY timestamp DESC LIMIT ? SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000",
Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "gateway", "%service.name%", "%service.name\":\"gateway%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "database", "%service.name%", "%service.name\":\"database%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "auth", "%service.name%", "%service.name\":\"auth%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
},
expectedErr: nil,
},
}
fm := NewFieldMapper()

View File

@@ -72,6 +72,7 @@ class TraceOperatorQuery:
return_spans_from: str | None = None
limit: int | None = None
order: list[OrderBy] | None = None
select_fields: list[TelemetryFieldKey] | None = None
def to_dict(self) -> dict:
spec: dict[str, Any] = {
@@ -84,6 +85,8 @@ class TraceOperatorQuery:
spec["limit"] = self.limit
if self.order:
spec["order"] = [o.to_dict() if hasattr(o, "to_dict") else o for o in self.order]
if self.select_fields:
spec["selectFields"] = [f.to_dict() for f in self.select_fields]
return {"type": "builder_trace_operator", "spec": spec}

View File

@@ -0,0 +1,436 @@
"""
Integration tests for TraceOperatorQuery (builder_trace_operator) through the
/api/v5/query_range endpoint.
Covers:
1. Order-by variants (A -> B, A => B) with returnSpansFrom="A".
Guards against the NOT_FOUND_COLUMN_IN_BLOCK regression where ordering by a
column absent from an outer SELECT caused a query failure.
2. Expression operators (=>, ->, &&, ||, A NOT B) with and without returnSpansFrom.
returnSpansFrom semantics
--------------------------
returnSpansFrom="" (default)
The final rows come from the expression's root CTE. Only spans that
directly satisfy the structural predicate are returned.
returnSpansFrom="A"
The expression is still evaluated in full (the structural relationship
must hold), but the final rows are drawn from the A sub-query CTE,
filtered to traces that appeared in the expression result. Concretely:
the query returns every A span whose trace_id belongs to a trace that
matched the expression.
"""
from collections.abc import Callable
from datetime import UTC, datetime, timedelta
from http import HTTPStatus
import pytest
import requests
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.querier import get_rows
from fixtures.traces import TraceIdGenerator, Traces, TracesKind, TracesStatusCode
def _names(response: requests.Response) -> set:
return {r["data"]["name"] for r in get_rows(response)}
# ============================================================================
# Dataset — 4 traces using real OTel semantic-convention attributes
#
# Filter A = "http.method EXISTS" (HTTP entry-point spans)
# Filter B = "db.system = 'redis'" (direct Redis cache calls)
# / "db.system = 'postgresql'" (deeper DB queries, for indirect tests)
# / "messaging.system = 'kafka'" (async consumer, for OR tests)
#
# T1 checkout-svc [SERVER] POST /checkout (5s) ← structural root, http.method=POST
# ├─ proxy-svc [SERVER] api-proxy (3s) ← http.method=POST; no db children
# └─ [CLIENT] lookup-cart (redis)
# └─ [CLIENT] check-inventory (postgresql)
#
# T2 catalog-svc [SERVER] GET /catalog (1s) ← http.method=GET
# └─ [CLIENT] fetch-catalog (redis)
# └─ [CLIENT] read-cache (postgresql)
#
# T3 standalone-svc [SERVER] standalone-server ← http.method=POST; no db/cache children
# T4 isolated-svc [CONSUMER] isolated-worker ← messaging.system=kafka; no http.method → not in A
#
# T1 has TWO spans matching filter A (http.method EXISTS); returnSpansFrom changes what is returned:
# default → only spans that directly satisfy the structural predicate
# return_A → all matching A spans from traces where the predicate held
#
# Expression truth table:
# A -> B (indirect) A=http.method EXISTS B=db.system='postgresql' T1✓ T2✓ T3✗ T4✗
# A => B (direct) A=http.method EXISTS B=db.system='redis' T1✓ T2✓ T3✗ T4✗
# A && B A=http.method EXISTS B=db.system='redis' T1✓ T2✓ T3✗ T4✗
# A || B A=http.method EXISTS B=messaging.system='kafka' T1✓ T2✓ T3✓ T4✓
# A NOT B A=http.method EXISTS B=db.system='redis' T1✗ T2✗ T3✓ T4✗
#
# Order-by cases (all use returnSpansFrom=A, 3 rows expected):
# ob.indirect A->B order http.method DESC → POST(checkout+proxy), GET(catalog)
# ob.duration A=>B order duration_nano DESC → POST/checkout(5s), api-proxy(3s), GET/catalog(1s)
# ob.select A=>B order http.method DESC → POST, POST, GET
# ============================================================================
@pytest.mark.parametrize(
"case",
[
# ── Order-by: http.method DESC, NOT in selectFields ──────────────────────
# Guards against NOT_FOUND_COLUMN_IN_BLOCK: ordering by a column absent from
# the outer SELECT used to cause a ClickHouse query failure.
# returnSpansFrom="A" returns all T1 http.method spans: POST /checkout and api-proxy
# (both http.method="POST", services checkout-svc and proxy-svc), plus
# GET /catalog (http.method="GET") from T2.
# The two POST spans are tied so their relative order is undefined; catalog-svc
# (GET) is guaranteed to sort last.
pytest.param(
{
"filter_a": "http.method EXISTS",
"filter_b": "db.system = 'postgresql'",
"expression": "A -> B",
"return_spans_from": "A",
"select_fields": [{"name": "service.name", "fieldDataType": "string", "fieldContext": "resource"}],
"order": [{"key": {"name": "http.method", "fieldDataType": "string", "fieldContext": "attribute"}, "direction": "desc"}],
"validate": lambda r: len(get_rows(r)) == 3 and {get_rows(r)[0]["data"]["service.name"], get_rows(r)[1]["data"]["service.name"]} == {"checkout-svc", "proxy-svc"} and get_rows(r)[2]["data"]["service.name"] == "catalog-svc",
},
id="ob.indirect.http_method_not_in_select",
),
# ── Order-by: duration_nano DESC, core span field ─────────────────────────
# returnSpansFrom="A" includes api-proxy (3 s) in T1's result.
# Order: POST /checkout (5 s) > api-proxy (3 s) > GET /catalog (1 s).
pytest.param(
{
"filter_a": "http.method EXISTS",
"filter_b": "db.system = 'redis'",
"expression": "A => B",
"return_spans_from": "A",
"order": [{"key": {"name": "duration_nano", "fieldContext": "span"}, "direction": "desc"}],
"validate": lambda r: len(get_rows(r)) == 3 and get_rows(r)[0]["data"]["name"] == "POST /checkout" and get_rows(r)[1]["data"]["name"] == "api-proxy" and get_rows(r)[2]["data"]["name"] == "GET /catalog",
},
id="ob.duration.duration_nano_desc",
),
# ── Order-by: http.method DESC, IS in selectFields ────────────────────────
# http.method is selected so it appears in each result row.
# Both POST /checkout and api-proxy carry http.method="POST"; their relative
# order is undefined. GET /catalog ("GET") always sorts last.
pytest.param(
{
"filter_a": "http.method EXISTS",
"filter_b": "db.system = 'redis'",
"expression": "A => B",
"return_spans_from": "A",
"select_fields": [{"name": "http.method", "fieldDataType": "string", "fieldContext": "attribute"}],
"order": [{"key": {"name": "http.method", "fieldDataType": "string", "fieldContext": "attribute"}, "direction": "desc"}],
"validate": lambda r: len(get_rows(r)) == 3 and get_rows(r)[0]["data"]["http.method"] == "POST" and get_rows(r)[1]["data"]["http.method"] == "POST" and get_rows(r)[2]["data"]["http.method"] == "GET",
},
id="ob.select.http_method_in_select",
),
# ── A => B (direct child), returnSpansFrom="" ─────────────────────────────
# POST /checkout directly parents lookup-cart (redis); api-proxy has no redis child.
# T3 does not match (no redis descendant). Default returns only the satisfying A spans.
pytest.param(
{
"filter_a": "http.method EXISTS",
"filter_b": "db.system = 'redis'",
"expression": "A => B",
"return_spans_from": "",
"validate": lambda r: len(get_rows(r)) == 2 and _names(r) == {"POST /checkout", "GET /catalog"},
},
id="ex.direct_child.default",
),
# ── A => B (direct child), returnSpansFrom="A" ────────────────────────────
# T1 matches; return_A pulls all T1 http.method spans → api-proxy is included too.
pytest.param(
{
"filter_a": "http.method EXISTS",
"filter_b": "db.system = 'redis'",
"expression": "A => B",
"return_spans_from": "A",
"validate": lambda r: len(get_rows(r)) == 3 and _names(r) == {"POST /checkout", "GET /catalog", "api-proxy"},
},
id="ex.direct_child.return_A",
),
# ── A -> B (indirect descendant), returnSpansFrom="" ──────────────────────
# POST /checkout is an ancestor of check-inventory (postgresql) via lookup-cart.
# api-proxy has no postgresql descendants. T3 has no postgresql descendant.
pytest.param(
{
"filter_a": "http.method EXISTS",
"filter_b": "db.system = 'postgresql'",
"expression": "A -> B",
"return_spans_from": "",
"validate": lambda r: len(get_rows(r)) == 2 and _names(r) == {"POST /checkout", "GET /catalog"},
},
id="ex.indirect_descendant.default",
),
# ── A -> B (indirect descendant), returnSpansFrom="A" ────────────────────
# T1 matches; return_A pulls all T1 http.method spans → api-proxy is included too.
pytest.param(
{
"filter_a": "http.method EXISTS",
"filter_b": "db.system = 'postgresql'",
"expression": "A -> B",
"return_spans_from": "A",
"validate": lambda r: len(get_rows(r)) == 3 and _names(r) == {"POST /checkout", "GET /catalog", "api-proxy"},
},
id="ex.indirect_descendant.return_A",
),
# ── A && B (both present in same trace), returnSpansFrom="" ───────────────
# T1 and T2 match (each trace has http.method spans AND redis spans); T3 does not.
# A && B returns all A spans from matching traces — api-proxy is included
# because it shares T1's trace_id with POST /checkout.
# (return_A produces the same set by definition; no separate case needed.)
pytest.param(
{
"filter_a": "http.method EXISTS",
"filter_b": "db.system = 'redis'",
"expression": "A && B",
"return_spans_from": "",
"validate": lambda r: len(get_rows(r)) == 3 and _names(r) == {"POST /checkout", "GET /catalog", "api-proxy"},
},
id="ex.and.default",
),
# ── A || B (either present), returnSpansFrom="" ───────────────────────────
# T1, T2, T3 match via A (http.method spans); T4 matches via B (kafka span).
# Default returns UNION of all A and B spans from matching traces.
pytest.param(
{
"filter_a": "http.method EXISTS",
"filter_b": "messaging.system = 'kafka'",
"expression": "A || B",
"return_spans_from": "",
"validate": lambda r: len(get_rows(r)) == 5 and _names(r) == {"POST /checkout", "GET /catalog", "api-proxy", "standalone-server", "isolated-worker"},
},
id="ex.or.default",
),
# ── A || B, returnSpansFrom="A" ───────────────────────────────────────────
# All four traces match; only A spans are returned.
# T4 has no http.method span, so it contributes nothing to A.
pytest.param(
{
"filter_a": "http.method EXISTS",
"filter_b": "messaging.system = 'kafka'",
"expression": "A || B",
"return_spans_from": "A",
"validate": lambda r: len(get_rows(r)) == 4 and _names(r) == {"POST /checkout", "GET /catalog", "api-proxy", "standalone-server"},
},
id="ex.or.return_A",
),
# ── A NOT B (A present, B absent from trace), returnSpansFrom="" ─────────
# T1 and T2 do NOT match: their traces contain redis spans.
# T3 MATCHES: has http.method span but no redis span in its trace.
# T4 has no http.method span, so it cannot contribute an A span.
# (return_A produces the same set; no separate case needed.)
pytest.param(
{
"filter_a": "http.method EXISTS",
"filter_b": "db.system = 'redis'",
"expression": "A NOT B",
"return_spans_from": "",
"validate": lambda r: len(get_rows(r)) == 1 and _names(r) == {"standalone-server"},
},
id="ex.not.default",
),
],
)
def test_trace_operator(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[list[Traces]], None],
case: dict,
) -> None:
t1_trace_id = TraceIdGenerator.trace_id()
t1_checkout_span_id = TraceIdGenerator.span_id() # POST /checkout — structural root of T1
t1_child_span_id = TraceIdGenerator.span_id() # lookup-cart
t2_trace_id = TraceIdGenerator.trace_id()
t2_root_span_id = TraceIdGenerator.span_id()
t2_child_span_id = TraceIdGenerator.span_id()
now = datetime.now(tz=UTC).replace(second=0, microsecond=0)
insert_traces(
[
# T1 — two http.method spans in the same trace, modelling a real proxy+service pair.
# POST /checkout (checkout-svc) is the root (parent_span_id="").
# api-proxy (proxy-svc) is a structural child of POST /checkout but also has
# http.method set, so it matches filter A alongside POST /checkout.
# Both carry http.method="POST" — they differ only in service.name.
# This is what makes returnSpansFrom="" and returnSpansFrom="A" distinct:
# default → only POST /checkout satisfies A => B or A -> B
# return_A → api-proxy is pulled in too (all A spans from the matching trace)
Traces(
timestamp=now - timedelta(seconds=10),
duration=timedelta(seconds=5),
trace_id=t1_trace_id,
span_id=t1_checkout_span_id,
parent_span_id="",
name="POST /checkout",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "checkout-svc"},
attributes={"http.method": "POST", "http.route": "/checkout"},
),
Traces(
timestamp=now - timedelta(seconds=10),
duration=timedelta(seconds=3),
trace_id=t1_trace_id,
span_id=TraceIdGenerator.span_id(),
parent_span_id=t1_checkout_span_id,
name="api-proxy",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "proxy-svc"},
attributes={"http.method": "POST", "http.route": "/proxy"},
),
Traces(
timestamp=now - timedelta(seconds=9),
duration=timedelta(seconds=2),
trace_id=t1_trace_id,
span_id=t1_child_span_id,
parent_span_id=t1_checkout_span_id,
name="lookup-cart",
kind=TracesKind.SPAN_KIND_CLIENT,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "checkout-svc"},
attributes={"db.system": "redis", "db.operation": "GET"},
),
Traces(
timestamp=now - timedelta(seconds=8),
duration=timedelta(seconds=1),
trace_id=t1_trace_id,
span_id=TraceIdGenerator.span_id(),
parent_span_id=t1_child_span_id,
name="check-inventory",
kind=TracesKind.SPAN_KIND_CLIENT,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "checkout-svc"},
attributes={"db.system": "postgresql", "db.operation": "SELECT"},
),
# T2 — catalog-svc: GET /catalog (1 s root) → fetch-catalog (redis) → read-cache (postgresql)
Traces(
timestamp=now - timedelta(seconds=10),
duration=timedelta(seconds=1),
trace_id=t2_trace_id,
span_id=t2_root_span_id,
parent_span_id="",
name="GET /catalog",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "catalog-svc"},
attributes={"http.method": "GET", "http.route": "/catalog"},
),
Traces(
timestamp=now - timedelta(seconds=9),
duration=timedelta(seconds=2),
trace_id=t2_trace_id,
span_id=t2_child_span_id,
parent_span_id=t2_root_span_id,
name="fetch-catalog",
kind=TracesKind.SPAN_KIND_CLIENT,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "catalog-svc"},
attributes={"db.system": "redis", "db.operation": "GET"},
),
Traces(
timestamp=now - timedelta(seconds=8),
duration=timedelta(seconds=1),
trace_id=t2_trace_id,
span_id=TraceIdGenerator.span_id(),
parent_span_id=t2_child_span_id,
name="read-cache",
kind=TracesKind.SPAN_KIND_CLIENT,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "catalog-svc"},
attributes={"db.system": "postgresql", "db.operation": "SELECT"},
),
# T3 — standalone-svc: HTTP entry span with no downstream calls.
# Fails A => B / A -> B / A && B (no redis/postgresql descendant).
# Matches A NOT B (has http.method span, no redis child).
# Contributes to A || B via A.
Traces(
timestamp=now - timedelta(seconds=10),
duration=timedelta(seconds=3),
trace_id=TraceIdGenerator.trace_id(),
span_id=TraceIdGenerator.span_id(),
parent_span_id="",
name="standalone-server",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "standalone-svc"},
attributes={"http.method": "POST", "http.route": "/"},
),
# T4 — isolated-svc: Kafka consumer; no http.method so it never matches filter A.
# Used only as the B side of A || B to prove the OR operator matches via B.
Traces(
timestamp=now - timedelta(seconds=10),
duration=timedelta(seconds=1),
trace_id=TraceIdGenerator.trace_id(),
span_id=TraceIdGenerator.span_id(),
parent_span_id="",
name="isolated-worker",
kind=TracesKind.SPAN_KIND_CONSUMER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={"service.name": "isolated-svc"},
attributes={"messaging.system": "kafka", "messaging.destination": "orders"},
),
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
spec: dict = {
"name": "C",
"expression": case["expression"],
"returnSpansFrom": case.get("return_spans_from", ""),
"limit": case.get("limit", 100),
}
if case.get("select_fields"):
spec["selectFields"] = case["select_fields"]
if case.get("order"):
spec["order"] = case["order"]
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v5/query_range"),
timeout=5,
headers={"authorization": f"Bearer {token}"},
json={
"schemaVersion": "v1",
"start": start_ms,
"end": end_ms,
"requestType": "raw",
"compositeQuery": {
"queries": [
{
"type": "builder_query",
"spec": {"name": "A", "signal": "traces", "filter": {"expression": case["filter_a"]}, "limit": 100},
},
{
"type": "builder_query",
"spec": {"name": "B", "signal": "traces", "filter": {"expression": case["filter_b"]}, "limit": 100},
},
{"type": "builder_trace_operator", "spec": spec},
]
},
"formatOptions": {"formatTableResultForUI": False, "fillGaps": False},
},
)
assert response.status_code == HTTPStatus.OK, f"HTTP {response.status_code}: {response.text}"
assert case["validate"](response), f"validation failed: {response.json()}"