Compare commits

..

6 Commits

Author SHA1 Message Date
Piyush Singariya
abbb6f7137 revert: var name change 2026-05-26 21:11:48 +05:30
Piyush Singariya
c1f3dacc0b fix: remove old pipelines 2026-05-26 21:10:55 +05:30
Piyush Singariya
2067b0f634 revert: test name 2026-05-26 20:58:11 +05:30
Piyush Singariya
75df0c00ef Merge branch 'main' into order-pipelines 2026-05-26 19:09:15 +05:30
Piyush Singariya
d03ff055bb fix: preserve order of pipelines between memory_limiter and batch 2026-05-26 19:08:07 +05:30
Piyush Singariya
2b10fbd4dd chore: var names changed 2026-05-26 17:38:27 +05:30
3 changed files with 141 additions and 267 deletions

View File

@@ -219,25 +219,19 @@ func (m *module) GetStats(ctx context.Context, orgID valuer.UUID, req *metricsex
return nil, err
}
var (
metricStats []metricsexplorertypes.Stat
total uint64
err error
)
hasFilter := req.Filter != nil && strings.TrimSpace(req.Filter.Expression) != ""
if hasFilter {
var filterWhereClause *sqlbuilder.WhereClause
filterWhereClause, err = m.buildFilterClause(ctx, req.Filter, req.Start, req.End)
if err != nil {
return nil, err
}
metricStats, total, err = m.fetchMetricsStatsWithSamples(ctx, req, filterWhereClause, false, req.OrderBy)
} else {
metricStats, total, err = m.fetchMetricsStatsWithSamplesFastPath(ctx, req, false, req.OrderBy)
filterWhereClause, err := m.buildFilterClause(ctx, req.Filter, req.Start, req.End)
if err != nil {
return nil, err
}
// Single query to get stats with samples, timeseries counts in required sorting order
metricStats, total, err := m.fetchMetricsStatsWithSamples(
ctx,
req,
filterWhereClause,
false,
req.OrderBy,
)
if err != nil {
return nil, err
}
@@ -1091,109 +1085,6 @@ func (m *module) fetchMetricsStatsWithSamples(
return metricStats, total, nil
}
func (m *module) fetchMetricsStatsWithSamplesFastPath(
ctx context.Context,
req *metricsexplorertypes.StatsRequest,
normalized bool,
orderBy *qbtypes.OrderBy,
) ([]metricsexplorertypes.Stat, uint64, error) {
ctx = m.withMetricsExplorerContext(ctx, "fetchMetricsStatsWithSamplesFastPath")
start, end, distributedTsTable, localTsTable := telemetrymetrics.WhichTSTableToUse(uint64(req.Start), uint64(req.End), nil)
samplesTable := telemetrymetrics.WhichSamplesTableToUse(uint64(req.Start), uint64(req.End), metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, nil)
countExp := telemetrymetrics.CountExpressionForSamplesTable(samplesTable)
// Timeseries counts per metric
tsSB := sqlbuilder.NewSelectBuilder()
tsSB.Select(
"metric_name",
"uniq(fingerprint) AS timeseries",
)
tsSB.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, distributedTsTable))
tsSB.Where(tsSB.Between("unix_milli", start, end))
tsSB.Where("NOT startsWith(metric_name, 'signoz')")
tsSB.Where(tsSB.E("__normalized", normalized))
tsSB.GroupBy("metric_name")
// Distinct metric_names from local TS table — narrows samples scan on its leading sort key
metricNamesSB := sqlbuilder.NewSelectBuilder()
metricNamesSB.Select("DISTINCT metric_name")
metricNamesSB.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, localTsTable))
metricNamesSB.Where(metricNamesSB.Between("unix_milli", start, end))
metricNamesSB.Where("NOT startsWith(metric_name, 'signoz')")
metricNamesSB.Where(metricNamesSB.E("__normalized", normalized))
// Samples counts per metric
samplesSB := sqlbuilder.NewSelectBuilder()
samplesSB.Select(
"metric_name",
fmt.Sprintf("%s AS samples", countExp),
)
samplesSB.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, samplesTable))
samplesSB.Where(samplesSB.Between("unix_milli", req.Start, req.End))
samplesSB.Where("NOT startsWith(metric_name, 'signoz')")
samplesSB.Where(fmt.Sprintf("metric_name IN (%s)", samplesSB.Var(metricNamesSB)))
samplesSB.GroupBy("metric_name")
cteBuilder := sqlbuilder.With(
sqlbuilder.CTEQuery("__time_series_counts").As(tsSB),
sqlbuilder.CTEQuery("__sample_counts").As(samplesSB),
)
finalSB := cteBuilder.Select(
"COALESCE(ts.metric_name, s.metric_name) AS metric_name",
"COALESCE(ts.timeseries, 0) AS timeseries",
"COALESCE(s.samples, 0) AS samples",
"COUNT(*) OVER() AS total",
)
finalSB.From("__time_series_counts ts")
finalSB.JoinWithOption(sqlbuilder.FullOuterJoin, "__sample_counts s", "ts.metric_name = s.metric_name")
finalSB.Where("(COALESCE(ts.timeseries, 0) > 0 OR COALESCE(s.samples, 0) > 0)")
orderByColumn, orderDirection, err := getStatsOrderByColumn(orderBy)
if err != nil {
return nil, 0, err
}
finalSB.OrderBy(
fmt.Sprintf("%s %s", orderByColumn, strings.ToUpper(orderDirection)),
"metric_name ASC",
)
finalSB.Limit(req.Limit)
finalSB.Offset(req.Offset)
query, args := finalSB.BuildWithFlavor(sqlbuilder.ClickHouse)
valueCtx := ctxtypes.SetClickhouseMaxThreads(ctx, m.config.TelemetryStore.Threads)
db := m.telemetryStore.ClickhouseDB()
rows, err := db.Query(valueCtx, query, args...)
if err != nil {
return nil, 0, errors.WrapInternalf(err, errors.CodeInternal, "failed to execute metrics stats with samples fastpath query")
}
defer rows.Close()
metricStats := make([]metricsexplorertypes.Stat, 0)
var total uint64
for rows.Next() {
var (
metricStat metricsexplorertypes.Stat
rowTotal uint64
)
if err := rows.Scan(&metricStat.MetricName, &metricStat.TimeSeries, &metricStat.Samples, &rowTotal); err != nil {
return nil, 0, errors.WrapInternalf(err, errors.CodeInternal, "failed to scan metrics stats row")
}
metricStats = append(metricStats, metricStat)
total = rowTotal
}
if err := rows.Err(); err != nil {
return nil, 0, errors.WrapInternalf(err, errors.CodeInternal, "error iterating metrics stats rows")
}
return metricStats, total, nil
}
func (m *module) computeTimeseriesTreemap(ctx context.Context, req *metricsexplorertypes.TreemapRequest, filterWhereClause *sqlbuilder.WhereClause) ([]metricsexplorertypes.TreemapEntry, error) {
ctx = m.withMetricsExplorerContext(ctx, "computeTimeseriesTreemap")

View File

@@ -2,14 +2,11 @@ package logparsingpipeline
import (
"encoding/json"
"fmt"
"strings"
"sync"
"gopkg.in/yaml.v3"
"log/slog"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/constants"
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
@@ -26,6 +23,13 @@ var (
CodeCollectorConfigLogsPipelineNotFound = errors.MustNewCode("collector_config_logs_pipeline_not_found")
)
const (
memoryLimiterProcessor = "memory_limiter"
memoryLimiterProcessorPrefix = "memory_limiter/"
batchProcessor = "batch"
batchProcessorPrefix = "batch/"
)
// check if the processors already exist
// if yes then update the processor.
// if something doesn't exists then remove it.
@@ -79,6 +83,14 @@ func getOtelPipelineFromConfig(config map[string]interface{}) (*otelPipeline, er
return &p, nil
}
// buildCollectorPipelineProcessorsList assembles the final processor list in the
// required order:
//
// 1. memory_limiter processors (any processor named "memory_limiter" or "memory_limiter/<id>")
// 2. other existing processors (in their original order), which may include signoz processors
// that are not user-pipeline processors
// 3. signoz user-pipeline processors
// 4. batch processors (any processor named "batch" or "batch/<id>")
func buildCollectorPipelineProcessorsList(
currentCollectorProcessors []string,
signozPipelineProcessorNames []string,
@@ -86,90 +98,59 @@ func buildCollectorPipelineProcessorsList(
lockLogsPipelineSpec.Lock()
defer lockLogsPipelineSpec.Unlock()
exists := map[string]struct{}{}
for _, v := range signozPipelineProcessorNames {
exists[v] = struct{}{}
// Build a set of the desired signoz processors so we can drop any stale version
// of them (regardless of how they got into the current config) without
// accidentally duplicating them in the output.
desiredUserPipelineSet := make(map[string]struct{}, len(signozPipelineProcessorNames))
for _, p := range signozPipelineProcessorNames {
desiredUserPipelineSet[p] = struct{}{}
}
// removed the old processors which are not used
var pipeline []string
for _, procName := range currentCollectorProcessors {
_, isInDesiredPipelineProcs := exists[procName]
if isInDesiredPipelineProcs || !hasSignozPipelineProcessorPrefix(procName) {
pipeline = append(pipeline, procName)
}
}
result := make([]string, 0, len(currentCollectorProcessors)+len(signozPipelineProcessorNames))
// create a reverse map of existing config processors and their position
existing := map[string]int{}
for i, p := range pipeline {
name := p
existing[name] = i
}
// create mapping from our logsParserPipeline to position in existing processors (from current config)
// this means, if "batch" holds position 3 in the current effective config, and 2 in our config, the map will be [2]: 3
specVsExistingMap := map[int]int{}
existingVsSpec := map[int]int{}
// go through plan and map its elements to current positions in effective config
for i, m := range signozPipelineProcessorNames {
if loc, ok := existing[m]; ok {
specVsExistingMap[i] = loc
existingVsSpec[loc] = i
}
}
lastMatched := 0
newPipeline := []string{}
for i := 0; i < len(signozPipelineProcessorNames); i++ {
m := signozPipelineProcessorNames[i]
if loc, ok := specVsExistingMap[i]; ok {
for j := lastMatched; j < loc; j++ {
if hasSignozPipelineProcessorPrefix(pipeline[j]) {
delete(specVsExistingMap, existingVsSpec[j])
} else {
newPipeline = append(newPipeline, pipeline[j])
}
// Note: logic assumes there'll be only one batch processor
var batchProcIdx int
var batchProcFound bool
iteration:
for idx, p := range currentCollectorProcessors {
_, inDesiredSet := desiredUserPipelineSet[p]
switch {
// same processor exist; retain the location of pre-existing location
case p == memoryLimiterProcessor || strings.HasPrefix(p, memoryLimiterProcessorPrefix):
result = append(result, p)
case hasSignozPipelineProcessorPrefix(p):
// this processor has been dropped
if !inDesiredSet {
continue iteration
} else {
result = append(result, p)
}
newPipeline = append(newPipeline, pipeline[loc])
lastMatched = loc + 1
} else {
newPipeline = append(newPipeline, m)
case p == batchProcessor || strings.HasPrefix(p, batchProcessorPrefix):
batchProcIdx = idx
batchProcFound = true
break iteration
default:
result = append(result, p)
}
}
if lastMatched < len(pipeline) {
newPipeline = append(newPipeline, pipeline[lastMatched:]...)
}
if checkDuplicateString(newPipeline) {
// duplicates are most likely because the processor sequence in effective config conflicts
// with the planned sequence as per planned pipeline
return pipeline, fmt.Errorf("the effective config has an unexpected processor sequence: %v", pipeline)
}
return newPipeline, nil
}
func checkDuplicateString(pipeline []string) bool {
exists := make(map[string]bool, len(pipeline))
slog.Debug("checking duplicate processors in the pipeline", "pipeline", pipeline)
for _, processor := range pipeline {
name := processor
if _, ok := exists[name]; ok {
slog.Error(
"duplicate processor name detected in generated collector config for log pipelines",
"processor", processor,
"pipeline", pipeline,
)
return true
if inDesiredSet {
// delete from desired pipeline set so they're not added twice
delete(desiredUserPipelineSet, p)
}
exists[name] = true
}
return false
// add user pipelines
for _, proc := range signozPipelineProcessorNames {
_, add := desiredUserPipelineSet[proc]
if add {
result = append(result, proc)
}
}
// add batch processor and rest
if batchProcFound {
result = append(result, currentCollectorProcessors[batchProcIdx:]...)
}
return result, nil
}
func GenerateCollectorConfigWithPipelines(config []byte, pipelines []pipelinetypes.GettablePipeline) ([]byte, error) {

View File

@@ -106,107 +106,109 @@ func TestBuildLogParsingProcessors(t *testing.T) {
}
var BuildLogsPipelineTestData = []struct {
Name string
currentPipeline []string
logsPipeline []string
expectedPipeline []string
Name string
fromCollector []string
userPipelines []string
finalOutput []string
}{
{
Name: "Add new pipelines",
currentPipeline: []string{"processor1", "processor2"},
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b"},
expectedPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", "processor1", "processor2"},
Name: "Add new pipelines",
fromCollector: []string{"processor1", "processor2"},
userPipelines: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b"},
finalOutput: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b"},
},
{
Name: "Add new pipeline and respect custom processors",
currentPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2"},
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c"},
expectedPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", "processor2"},
Name: "Add new pipeline and respect custom processors",
fromCollector: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2"},
userPipelines: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c"},
finalOutput: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2", constants.LogsPPLPfx + "c"},
},
{
Name: "Add new pipeline and respect custom processors",
currentPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2"},
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "d"},
expectedPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "d", "processor2"},
Name: "Add new pipeline and respect custom processors",
fromCollector: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2"},
userPipelines: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "d"},
finalOutput: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "d"},
},
{
Name: "Add new pipeline and respect custom processors in the beginning and middle",
currentPipeline: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", "batch"},
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c"},
expectedPipeline: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", "batch"},
Name: "Add new pipeline and respect custom processors in the beginning and middle",
fromCollector: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", "batch"},
userPipelines: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c"},
finalOutput: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", "batch"},
},
{
Name: "Remove old pipeline add add new",
currentPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", "processor1", "processor2"},
logsPipeline: []string{constants.LogsPPLPfx + "a"},
expectedPipeline: []string{constants.LogsPPLPfx + "a", "processor1", "processor2"},
Name: "Remove old pipeline add add new",
fromCollector: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", "processor1", "processor2"},
userPipelines: []string{constants.LogsPPLPfx + "a"},
finalOutput: []string{constants.LogsPPLPfx + "a", "processor1", "processor2"},
},
{
Name: "Remove old pipeline from middle",
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", constants.LogsPPLPfx + "b", "batch"},
logsPipeline: []string{constants.LogsPPLPfx + "a"},
expectedPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", "batch"},
Name: "Remove old pipeline from middle",
fromCollector: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", constants.LogsPPLPfx + "b", "batch"},
userPipelines: []string{constants.LogsPPLPfx + "a"},
finalOutput: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", "batch"},
},
{
Name: "Remove old pipeline from middle and add new pipeline",
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", constants.LogsPPLPfx + "b", "batch"},
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "c"},
expectedPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "c", "processor3", "batch"},
Name: "Remove old pipeline from middle and add new pipeline",
fromCollector: []string{"memory_limiter", "processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", constants.LogsPPLPfx + "b", "batch"},
userPipelines: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "c"},
finalOutput: []string{"memory_limiter", "processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", constants.LogsPPLPfx + "c", "batch"},
},
{
Name: "Remove multiple old pipelines from middle and add multiple new ones",
currentPipeline: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", "processor3", constants.LogsPPLPfx + "c", "processor4", constants.LogsPPLPfx + "d", "processor5", "batch"},
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "a1", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "c1"},
expectedPipeline: []string{"processor1", constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "a1", "processor2", "processor3", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "c1", "processor4", "processor5", "batch"},
},
// working
{
Name: "rearrange pipelines",
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
logsPipeline: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a"},
expectedPipeline: []string{"processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", "batch"},
Name: "Remove multiple old pipelines from middle and add multiple new ones",
fromCollector: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", "processor3", constants.LogsPPLPfx + "c", "processor4", constants.LogsPPLPfx + "d", "processor5", "batch"},
userPipelines: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "a1", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "c1"},
finalOutput: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", "processor3", constants.LogsPPLPfx + "c", "processor4", "processor5", constants.LogsPPLPfx + "a1", constants.LogsPPLPfx + "c1", "batch"},
},
{
Name: "rearrange pipelines with new processor",
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
logsPipeline: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c"},
expectedPipeline: []string{"processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c", "batch"},
// expectedPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_b", "processor3", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c", "batch"},
Name: "rearrange pipelines",
fromCollector: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
userPipelines: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a"},
finalOutput: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
},
{
Name: "delete processor",
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
logsPipeline: []string{},
expectedPipeline: []string{"processor1", "processor2", "processor3", "batch"},
Name: "rearrange pipelines with new processor",
fromCollector: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
userPipelines: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c"},
finalOutput: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_c", "batch"},
},
{
Name: "last to first",
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", "processor4", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c"},
logsPipeline: []string{constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b"},
expectedPipeline: []string{"processor1", "processor2", "processor3", "processor4", "batch", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b"},
Name: "delete processor",
fromCollector: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
userPipelines: []string{},
finalOutput: []string{"processor1", "processor2", "processor3", "batch"},
},
{
Name: "multiple rearrange pipelines",
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"},
logsPipeline: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e"},
expectedPipeline: []string{"processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", "batch", "processor4", "processor5", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e", "processor6", "processor7"},
Name: "last to first",
fromCollector: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", "processor4", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c"},
userPipelines: []string{constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b"},
finalOutput: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", "processor4", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_c", "batch", constants.LogsPPLPfx + "_c"},
},
{
Name: "multiple rearrange with new pipelines",
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"},
logsPipeline: []string{constants.LogsPPLPfx + "_z", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e"},
expectedPipeline: []string{constants.LogsPPLPfx + "_z", "processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", "batch", "processor4", "processor5", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e", "processor6", "processor7"},
Name: "multiple rearrange pipelines",
fromCollector: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"},
userPipelines: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e"},
finalOutput: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"},
},
{
Name: "multiple rearrange with new pipelines",
fromCollector: []string{"memory_limiter", "processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"},
userPipelines: []string{constants.LogsPPLPfx + "_z", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e"},
finalOutput: []string{"memory_limiter", "processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_z", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"},
},
{
Name: "Prefixed proc in desired set not duplicated from others",
fromCollector: []string{"memory_limiter/logs", "custom_proc", "resourcedetection", "batch/logs"},
userPipelines: []string{"custom_proc", constants.LogsPPLPfx + "a"},
finalOutput: []string{"memory_limiter/logs", "custom_proc", "resourcedetection", constants.LogsPPLPfx + "a", "batch/logs"},
},
}
func TestBuildLogsPipeline(t *testing.T) {
for _, test := range BuildLogsPipelineTestData {
Convey(test.Name, t, func() {
v, err := buildCollectorPipelineProcessorsList(test.currentPipeline, test.logsPipeline)
v, err := buildCollectorPipelineProcessorsList(test.fromCollector, test.userPipelines)
So(err, ShouldBeNil)
fmt.Println(test.Name, "\n", test.currentPipeline, "\n", v, "\n", test.expectedPipeline)
So(v, ShouldResemble, test.expectedPipeline)
So(v, ShouldResemble, test.finalOutput)
})
}
}