mirror of
https://github.com/SigNoz/signoz.git
synced 2026-05-26 20:00:33 +01:00
Compare commits
6 Commits
metricsExp
...
order-pipe
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
abbb6f7137 | ||
|
|
c1f3dacc0b | ||
|
|
2067b0f634 | ||
|
|
75df0c00ef | ||
|
|
d03ff055bb | ||
|
|
2b10fbd4dd |
@@ -2,14 +2,11 @@ package logparsingpipeline
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"gopkg.in/yaml.v3"
|
||||
|
||||
"log/slog"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/constants"
|
||||
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
|
||||
@@ -26,6 +23,13 @@ var (
|
||||
CodeCollectorConfigLogsPipelineNotFound = errors.MustNewCode("collector_config_logs_pipeline_not_found")
|
||||
)
|
||||
|
||||
const (
|
||||
memoryLimiterProcessor = "memory_limiter"
|
||||
memoryLimiterProcessorPrefix = "memory_limiter/"
|
||||
batchProcessor = "batch"
|
||||
batchProcessorPrefix = "batch/"
|
||||
)
|
||||
|
||||
// check if the processors already exist
|
||||
// if yes then update the processor.
|
||||
// if something doesn't exists then remove it.
|
||||
@@ -79,6 +83,14 @@ func getOtelPipelineFromConfig(config map[string]interface{}) (*otelPipeline, er
|
||||
return &p, nil
|
||||
}
|
||||
|
||||
// buildCollectorPipelineProcessorsList assembles the final processor list in the
|
||||
// required order:
|
||||
//
|
||||
// 1. memory_limiter processors (any processor named "memory_limiter" or "memory_limiter/<id>")
|
||||
// 2. other existing processors (in their original order), which may include signoz processors
|
||||
// that are not user-pipeline processors
|
||||
// 3. signoz user-pipeline processors
|
||||
// 4. batch processors (any processor named "batch" or "batch/<id>")
|
||||
func buildCollectorPipelineProcessorsList(
|
||||
currentCollectorProcessors []string,
|
||||
signozPipelineProcessorNames []string,
|
||||
@@ -86,90 +98,59 @@ func buildCollectorPipelineProcessorsList(
|
||||
lockLogsPipelineSpec.Lock()
|
||||
defer lockLogsPipelineSpec.Unlock()
|
||||
|
||||
exists := map[string]struct{}{}
|
||||
for _, v := range signozPipelineProcessorNames {
|
||||
exists[v] = struct{}{}
|
||||
// Build a set of the desired signoz processors so we can drop any stale version
|
||||
// of them (regardless of how they got into the current config) without
|
||||
// accidentally duplicating them in the output.
|
||||
desiredUserPipelineSet := make(map[string]struct{}, len(signozPipelineProcessorNames))
|
||||
for _, p := range signozPipelineProcessorNames {
|
||||
desiredUserPipelineSet[p] = struct{}{}
|
||||
}
|
||||
|
||||
// removed the old processors which are not used
|
||||
var pipeline []string
|
||||
for _, procName := range currentCollectorProcessors {
|
||||
_, isInDesiredPipelineProcs := exists[procName]
|
||||
if isInDesiredPipelineProcs || !hasSignozPipelineProcessorPrefix(procName) {
|
||||
pipeline = append(pipeline, procName)
|
||||
}
|
||||
}
|
||||
result := make([]string, 0, len(currentCollectorProcessors)+len(signozPipelineProcessorNames))
|
||||
|
||||
// create a reverse map of existing config processors and their position
|
||||
existing := map[string]int{}
|
||||
for i, p := range pipeline {
|
||||
name := p
|
||||
existing[name] = i
|
||||
}
|
||||
|
||||
// create mapping from our logsParserPipeline to position in existing processors (from current config)
|
||||
// this means, if "batch" holds position 3 in the current effective config, and 2 in our config, the map will be [2]: 3
|
||||
specVsExistingMap := map[int]int{}
|
||||
existingVsSpec := map[int]int{}
|
||||
|
||||
// go through plan and map its elements to current positions in effective config
|
||||
for i, m := range signozPipelineProcessorNames {
|
||||
if loc, ok := existing[m]; ok {
|
||||
specVsExistingMap[i] = loc
|
||||
existingVsSpec[loc] = i
|
||||
}
|
||||
}
|
||||
|
||||
lastMatched := 0
|
||||
newPipeline := []string{}
|
||||
|
||||
for i := 0; i < len(signozPipelineProcessorNames); i++ {
|
||||
m := signozPipelineProcessorNames[i]
|
||||
if loc, ok := specVsExistingMap[i]; ok {
|
||||
for j := lastMatched; j < loc; j++ {
|
||||
if hasSignozPipelineProcessorPrefix(pipeline[j]) {
|
||||
delete(specVsExistingMap, existingVsSpec[j])
|
||||
} else {
|
||||
newPipeline = append(newPipeline, pipeline[j])
|
||||
}
|
||||
// Note: logic assumes there'll be only one batch processor
|
||||
var batchProcIdx int
|
||||
var batchProcFound bool
|
||||
iteration:
|
||||
for idx, p := range currentCollectorProcessors {
|
||||
_, inDesiredSet := desiredUserPipelineSet[p]
|
||||
switch {
|
||||
// same processor exist; retain the location of pre-existing location
|
||||
case p == memoryLimiterProcessor || strings.HasPrefix(p, memoryLimiterProcessorPrefix):
|
||||
result = append(result, p)
|
||||
case hasSignozPipelineProcessorPrefix(p):
|
||||
// this processor has been dropped
|
||||
if !inDesiredSet {
|
||||
continue iteration
|
||||
} else {
|
||||
result = append(result, p)
|
||||
}
|
||||
newPipeline = append(newPipeline, pipeline[loc])
|
||||
lastMatched = loc + 1
|
||||
} else {
|
||||
newPipeline = append(newPipeline, m)
|
||||
case p == batchProcessor || strings.HasPrefix(p, batchProcessorPrefix):
|
||||
batchProcIdx = idx
|
||||
batchProcFound = true
|
||||
break iteration
|
||||
default:
|
||||
result = append(result, p)
|
||||
}
|
||||
|
||||
}
|
||||
if lastMatched < len(pipeline) {
|
||||
newPipeline = append(newPipeline, pipeline[lastMatched:]...)
|
||||
}
|
||||
|
||||
if checkDuplicateString(newPipeline) {
|
||||
// duplicates are most likely because the processor sequence in effective config conflicts
|
||||
// with the planned sequence as per planned pipeline
|
||||
return pipeline, fmt.Errorf("the effective config has an unexpected processor sequence: %v", pipeline)
|
||||
}
|
||||
|
||||
return newPipeline, nil
|
||||
}
|
||||
|
||||
func checkDuplicateString(pipeline []string) bool {
|
||||
exists := make(map[string]bool, len(pipeline))
|
||||
slog.Debug("checking duplicate processors in the pipeline", "pipeline", pipeline)
|
||||
for _, processor := range pipeline {
|
||||
name := processor
|
||||
if _, ok := exists[name]; ok {
|
||||
slog.Error(
|
||||
"duplicate processor name detected in generated collector config for log pipelines",
|
||||
"processor", processor,
|
||||
"pipeline", pipeline,
|
||||
)
|
||||
return true
|
||||
if inDesiredSet {
|
||||
// delete from desired pipeline set so they're not added twice
|
||||
delete(desiredUserPipelineSet, p)
|
||||
}
|
||||
|
||||
exists[name] = true
|
||||
}
|
||||
return false
|
||||
// add user pipelines
|
||||
for _, proc := range signozPipelineProcessorNames {
|
||||
_, add := desiredUserPipelineSet[proc]
|
||||
if add {
|
||||
result = append(result, proc)
|
||||
}
|
||||
}
|
||||
|
||||
// add batch processor and rest
|
||||
if batchProcFound {
|
||||
result = append(result, currentCollectorProcessors[batchProcIdx:]...)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func GenerateCollectorConfigWithPipelines(config []byte, pipelines []pipelinetypes.GettablePipeline) ([]byte, error) {
|
||||
|
||||
@@ -106,107 +106,109 @@ func TestBuildLogParsingProcessors(t *testing.T) {
|
||||
}
|
||||
|
||||
var BuildLogsPipelineTestData = []struct {
|
||||
Name string
|
||||
currentPipeline []string
|
||||
logsPipeline []string
|
||||
expectedPipeline []string
|
||||
Name string
|
||||
fromCollector []string
|
||||
userPipelines []string
|
||||
finalOutput []string
|
||||
}{
|
||||
{
|
||||
Name: "Add new pipelines",
|
||||
currentPipeline: []string{"processor1", "processor2"},
|
||||
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b"},
|
||||
expectedPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", "processor1", "processor2"},
|
||||
Name: "Add new pipelines",
|
||||
fromCollector: []string{"processor1", "processor2"},
|
||||
userPipelines: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b"},
|
||||
finalOutput: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b"},
|
||||
},
|
||||
{
|
||||
Name: "Add new pipeline and respect custom processors",
|
||||
currentPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2"},
|
||||
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c"},
|
||||
expectedPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", "processor2"},
|
||||
Name: "Add new pipeline and respect custom processors",
|
||||
fromCollector: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2"},
|
||||
userPipelines: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c"},
|
||||
finalOutput: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2", constants.LogsPPLPfx + "c"},
|
||||
},
|
||||
{
|
||||
Name: "Add new pipeline and respect custom processors",
|
||||
currentPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2"},
|
||||
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "d"},
|
||||
expectedPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "d", "processor2"},
|
||||
Name: "Add new pipeline and respect custom processors",
|
||||
fromCollector: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2"},
|
||||
userPipelines: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "d"},
|
||||
finalOutput: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "d"},
|
||||
},
|
||||
{
|
||||
Name: "Add new pipeline and respect custom processors in the beginning and middle",
|
||||
currentPipeline: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", "batch"},
|
||||
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c"},
|
||||
expectedPipeline: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", "batch"},
|
||||
Name: "Add new pipeline and respect custom processors in the beginning and middle",
|
||||
fromCollector: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", "batch"},
|
||||
userPipelines: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c"},
|
||||
finalOutput: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", "batch"},
|
||||
},
|
||||
{
|
||||
Name: "Remove old pipeline add add new",
|
||||
currentPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", "processor1", "processor2"},
|
||||
logsPipeline: []string{constants.LogsPPLPfx + "a"},
|
||||
expectedPipeline: []string{constants.LogsPPLPfx + "a", "processor1", "processor2"},
|
||||
Name: "Remove old pipeline add add new",
|
||||
fromCollector: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", "processor1", "processor2"},
|
||||
userPipelines: []string{constants.LogsPPLPfx + "a"},
|
||||
finalOutput: []string{constants.LogsPPLPfx + "a", "processor1", "processor2"},
|
||||
},
|
||||
{
|
||||
Name: "Remove old pipeline from middle",
|
||||
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", constants.LogsPPLPfx + "b", "batch"},
|
||||
logsPipeline: []string{constants.LogsPPLPfx + "a"},
|
||||
expectedPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", "batch"},
|
||||
Name: "Remove old pipeline from middle",
|
||||
fromCollector: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", constants.LogsPPLPfx + "b", "batch"},
|
||||
userPipelines: []string{constants.LogsPPLPfx + "a"},
|
||||
finalOutput: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", "batch"},
|
||||
},
|
||||
{
|
||||
Name: "Remove old pipeline from middle and add new pipeline",
|
||||
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", constants.LogsPPLPfx + "b", "batch"},
|
||||
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "c"},
|
||||
expectedPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "c", "processor3", "batch"},
|
||||
Name: "Remove old pipeline from middle and add new pipeline",
|
||||
fromCollector: []string{"memory_limiter", "processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", constants.LogsPPLPfx + "b", "batch"},
|
||||
userPipelines: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "c"},
|
||||
finalOutput: []string{"memory_limiter", "processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", constants.LogsPPLPfx + "c", "batch"},
|
||||
},
|
||||
{
|
||||
Name: "Remove multiple old pipelines from middle and add multiple new ones",
|
||||
currentPipeline: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", "processor3", constants.LogsPPLPfx + "c", "processor4", constants.LogsPPLPfx + "d", "processor5", "batch"},
|
||||
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "a1", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "c1"},
|
||||
expectedPipeline: []string{"processor1", constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "a1", "processor2", "processor3", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "c1", "processor4", "processor5", "batch"},
|
||||
},
|
||||
|
||||
// working
|
||||
{
|
||||
Name: "rearrange pipelines",
|
||||
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
|
||||
logsPipeline: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a"},
|
||||
expectedPipeline: []string{"processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", "batch"},
|
||||
Name: "Remove multiple old pipelines from middle and add multiple new ones",
|
||||
fromCollector: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", "processor3", constants.LogsPPLPfx + "c", "processor4", constants.LogsPPLPfx + "d", "processor5", "batch"},
|
||||
userPipelines: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "a1", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "c1"},
|
||||
finalOutput: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", "processor3", constants.LogsPPLPfx + "c", "processor4", "processor5", constants.LogsPPLPfx + "a1", constants.LogsPPLPfx + "c1", "batch"},
|
||||
},
|
||||
{
|
||||
Name: "rearrange pipelines with new processor",
|
||||
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
|
||||
logsPipeline: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c"},
|
||||
expectedPipeline: []string{"processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c", "batch"},
|
||||
// expectedPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_b", "processor3", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c", "batch"},
|
||||
Name: "rearrange pipelines",
|
||||
fromCollector: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
|
||||
userPipelines: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a"},
|
||||
finalOutput: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
|
||||
},
|
||||
{
|
||||
Name: "delete processor",
|
||||
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
|
||||
logsPipeline: []string{},
|
||||
expectedPipeline: []string{"processor1", "processor2", "processor3", "batch"},
|
||||
Name: "rearrange pipelines with new processor",
|
||||
fromCollector: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
|
||||
userPipelines: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c"},
|
||||
finalOutput: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_c", "batch"},
|
||||
},
|
||||
{
|
||||
Name: "last to first",
|
||||
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", "processor4", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c"},
|
||||
logsPipeline: []string{constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b"},
|
||||
expectedPipeline: []string{"processor1", "processor2", "processor3", "processor4", "batch", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b"},
|
||||
Name: "delete processor",
|
||||
fromCollector: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
|
||||
userPipelines: []string{},
|
||||
finalOutput: []string{"processor1", "processor2", "processor3", "batch"},
|
||||
},
|
||||
{
|
||||
Name: "multiple rearrange pipelines",
|
||||
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"},
|
||||
logsPipeline: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e"},
|
||||
expectedPipeline: []string{"processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", "batch", "processor4", "processor5", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e", "processor6", "processor7"},
|
||||
Name: "last to first",
|
||||
fromCollector: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", "processor4", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c"},
|
||||
userPipelines: []string{constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b"},
|
||||
finalOutput: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", "processor4", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_c", "batch", constants.LogsPPLPfx + "_c"},
|
||||
},
|
||||
{
|
||||
Name: "multiple rearrange with new pipelines",
|
||||
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"},
|
||||
logsPipeline: []string{constants.LogsPPLPfx + "_z", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e"},
|
||||
expectedPipeline: []string{constants.LogsPPLPfx + "_z", "processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", "batch", "processor4", "processor5", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e", "processor6", "processor7"},
|
||||
Name: "multiple rearrange pipelines",
|
||||
fromCollector: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"},
|
||||
userPipelines: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e"},
|
||||
finalOutput: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"},
|
||||
},
|
||||
{
|
||||
Name: "multiple rearrange with new pipelines",
|
||||
fromCollector: []string{"memory_limiter", "processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"},
|
||||
userPipelines: []string{constants.LogsPPLPfx + "_z", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e"},
|
||||
finalOutput: []string{"memory_limiter", "processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_z", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"},
|
||||
},
|
||||
{
|
||||
Name: "Prefixed proc in desired set not duplicated from others",
|
||||
fromCollector: []string{"memory_limiter/logs", "custom_proc", "resourcedetection", "batch/logs"},
|
||||
userPipelines: []string{"custom_proc", constants.LogsPPLPfx + "a"},
|
||||
finalOutput: []string{"memory_limiter/logs", "custom_proc", "resourcedetection", constants.LogsPPLPfx + "a", "batch/logs"},
|
||||
},
|
||||
}
|
||||
|
||||
func TestBuildLogsPipeline(t *testing.T) {
|
||||
for _, test := range BuildLogsPipelineTestData {
|
||||
Convey(test.Name, t, func() {
|
||||
v, err := buildCollectorPipelineProcessorsList(test.currentPipeline, test.logsPipeline)
|
||||
v, err := buildCollectorPipelineProcessorsList(test.fromCollector, test.userPipelines)
|
||||
So(err, ShouldBeNil)
|
||||
fmt.Println(test.Name, "\n", test.currentPipeline, "\n", v, "\n", test.expectedPipeline)
|
||||
So(v, ShouldResemble, test.expectedPipeline)
|
||||
So(v, ShouldResemble, test.finalOutput)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user