mirror of
https://github.com/SigNoz/signoz.git
synced 2026-05-26 20:00:33 +01:00
Compare commits
2 Commits
order-pipe
...
metricsExp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
05e84b04cc | ||
|
|
def414c7bb |
@@ -219,19 +219,25 @@ func (m *module) GetStats(ctx context.Context, orgID valuer.UUID, req *metricsex
|
||||
return nil, err
|
||||
}
|
||||
|
||||
filterWhereClause, err := m.buildFilterClause(ctx, req.Filter, req.Start, req.End)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
var (
|
||||
metricStats []metricsexplorertypes.Stat
|
||||
total uint64
|
||||
err error
|
||||
)
|
||||
|
||||
hasFilter := req.Filter != nil && strings.TrimSpace(req.Filter.Expression) != ""
|
||||
|
||||
if hasFilter {
|
||||
var filterWhereClause *sqlbuilder.WhereClause
|
||||
filterWhereClause, err = m.buildFilterClause(ctx, req.Filter, req.Start, req.End)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
metricStats, total, err = m.fetchMetricsStatsWithSamples(ctx, req, filterWhereClause, false, req.OrderBy)
|
||||
} else {
|
||||
metricStats, total, err = m.fetchMetricsStatsWithSamplesFastPath(ctx, req, false, req.OrderBy)
|
||||
}
|
||||
|
||||
// Single query to get stats with samples, timeseries counts in required sorting order
|
||||
metricStats, total, err := m.fetchMetricsStatsWithSamples(
|
||||
ctx,
|
||||
req,
|
||||
filterWhereClause,
|
||||
false,
|
||||
req.OrderBy,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -1085,6 +1091,109 @@ func (m *module) fetchMetricsStatsWithSamples(
|
||||
return metricStats, total, nil
|
||||
}
|
||||
|
||||
func (m *module) fetchMetricsStatsWithSamplesFastPath(
|
||||
ctx context.Context,
|
||||
req *metricsexplorertypes.StatsRequest,
|
||||
normalized bool,
|
||||
orderBy *qbtypes.OrderBy,
|
||||
) ([]metricsexplorertypes.Stat, uint64, error) {
|
||||
ctx = m.withMetricsExplorerContext(ctx, "fetchMetricsStatsWithSamplesFastPath")
|
||||
|
||||
start, end, distributedTsTable, localTsTable := telemetrymetrics.WhichTSTableToUse(uint64(req.Start), uint64(req.End), nil)
|
||||
samplesTable := telemetrymetrics.WhichSamplesTableToUse(uint64(req.Start), uint64(req.End), metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, nil)
|
||||
countExp := telemetrymetrics.CountExpressionForSamplesTable(samplesTable)
|
||||
|
||||
// Timeseries counts per metric
|
||||
tsSB := sqlbuilder.NewSelectBuilder()
|
||||
tsSB.Select(
|
||||
"metric_name",
|
||||
"uniq(fingerprint) AS timeseries",
|
||||
)
|
||||
tsSB.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, distributedTsTable))
|
||||
tsSB.Where(tsSB.Between("unix_milli", start, end))
|
||||
tsSB.Where("NOT startsWith(metric_name, 'signoz')")
|
||||
tsSB.Where(tsSB.E("__normalized", normalized))
|
||||
tsSB.GroupBy("metric_name")
|
||||
|
||||
// Distinct metric_names from local TS table — narrows samples scan on its leading sort key
|
||||
metricNamesSB := sqlbuilder.NewSelectBuilder()
|
||||
metricNamesSB.Select("DISTINCT metric_name")
|
||||
metricNamesSB.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, localTsTable))
|
||||
metricNamesSB.Where(metricNamesSB.Between("unix_milli", start, end))
|
||||
metricNamesSB.Where("NOT startsWith(metric_name, 'signoz')")
|
||||
metricNamesSB.Where(metricNamesSB.E("__normalized", normalized))
|
||||
|
||||
// Samples counts per metric
|
||||
samplesSB := sqlbuilder.NewSelectBuilder()
|
||||
samplesSB.Select(
|
||||
"metric_name",
|
||||
fmt.Sprintf("%s AS samples", countExp),
|
||||
)
|
||||
samplesSB.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, samplesTable))
|
||||
samplesSB.Where(samplesSB.Between("unix_milli", req.Start, req.End))
|
||||
samplesSB.Where("NOT startsWith(metric_name, 'signoz')")
|
||||
samplesSB.Where(fmt.Sprintf("metric_name IN (%s)", samplesSB.Var(metricNamesSB)))
|
||||
samplesSB.GroupBy("metric_name")
|
||||
|
||||
cteBuilder := sqlbuilder.With(
|
||||
sqlbuilder.CTEQuery("__time_series_counts").As(tsSB),
|
||||
sqlbuilder.CTEQuery("__sample_counts").As(samplesSB),
|
||||
)
|
||||
|
||||
finalSB := cteBuilder.Select(
|
||||
"COALESCE(ts.metric_name, s.metric_name) AS metric_name",
|
||||
"COALESCE(ts.timeseries, 0) AS timeseries",
|
||||
"COALESCE(s.samples, 0) AS samples",
|
||||
"COUNT(*) OVER() AS total",
|
||||
)
|
||||
finalSB.From("__time_series_counts ts")
|
||||
finalSB.JoinWithOption(sqlbuilder.FullOuterJoin, "__sample_counts s", "ts.metric_name = s.metric_name")
|
||||
finalSB.Where("(COALESCE(ts.timeseries, 0) > 0 OR COALESCE(s.samples, 0) > 0)")
|
||||
|
||||
orderByColumn, orderDirection, err := getStatsOrderByColumn(orderBy)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
finalSB.OrderBy(
|
||||
fmt.Sprintf("%s %s", orderByColumn, strings.ToUpper(orderDirection)),
|
||||
"metric_name ASC",
|
||||
)
|
||||
finalSB.Limit(req.Limit)
|
||||
finalSB.Offset(req.Offset)
|
||||
|
||||
query, args := finalSB.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
valueCtx := ctxtypes.SetClickhouseMaxThreads(ctx, m.config.TelemetryStore.Threads)
|
||||
db := m.telemetryStore.ClickhouseDB()
|
||||
rows, err := db.Query(valueCtx, query, args...)
|
||||
if err != nil {
|
||||
return nil, 0, errors.WrapInternalf(err, errors.CodeInternal, "failed to execute metrics stats with samples fastpath query")
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
metricStats := make([]metricsexplorertypes.Stat, 0)
|
||||
var total uint64
|
||||
|
||||
for rows.Next() {
|
||||
var (
|
||||
metricStat metricsexplorertypes.Stat
|
||||
rowTotal uint64
|
||||
)
|
||||
if err := rows.Scan(&metricStat.MetricName, &metricStat.TimeSeries, &metricStat.Samples, &rowTotal); err != nil {
|
||||
return nil, 0, errors.WrapInternalf(err, errors.CodeInternal, "failed to scan metrics stats row")
|
||||
}
|
||||
metricStats = append(metricStats, metricStat)
|
||||
total = rowTotal
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, 0, errors.WrapInternalf(err, errors.CodeInternal, "error iterating metrics stats rows")
|
||||
}
|
||||
|
||||
return metricStats, total, nil
|
||||
}
|
||||
|
||||
func (m *module) computeTimeseriesTreemap(ctx context.Context, req *metricsexplorertypes.TreemapRequest, filterWhereClause *sqlbuilder.WhereClause) ([]metricsexplorertypes.TreemapEntry, error) {
|
||||
ctx = m.withMetricsExplorerContext(ctx, "computeTimeseriesTreemap")
|
||||
|
||||
|
||||
@@ -2,11 +2,14 @@ package logparsingpipeline
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"gopkg.in/yaml.v3"
|
||||
|
||||
"log/slog"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/constants"
|
||||
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
|
||||
@@ -23,13 +26,6 @@ var (
|
||||
CodeCollectorConfigLogsPipelineNotFound = errors.MustNewCode("collector_config_logs_pipeline_not_found")
|
||||
)
|
||||
|
||||
const (
|
||||
memoryLimiterProcessor = "memory_limiter"
|
||||
memoryLimiterProcessorPrefix = "memory_limiter/"
|
||||
batchProcessor = "batch"
|
||||
batchProcessorPrefix = "batch/"
|
||||
)
|
||||
|
||||
// check if the processors already exist
|
||||
// if yes then update the processor.
|
||||
// if something doesn't exists then remove it.
|
||||
@@ -83,14 +79,6 @@ func getOtelPipelineFromConfig(config map[string]interface{}) (*otelPipeline, er
|
||||
return &p, nil
|
||||
}
|
||||
|
||||
// buildCollectorPipelineProcessorsList assembles the final processor list in the
|
||||
// required order:
|
||||
//
|
||||
// 1. memory_limiter processors (any processor named "memory_limiter" or "memory_limiter/<id>")
|
||||
// 2. other existing processors (in their original order), which may include signoz processors
|
||||
// that are not user-pipeline processors
|
||||
// 3. signoz user-pipeline processors
|
||||
// 4. batch processors (any processor named "batch" or "batch/<id>")
|
||||
func buildCollectorPipelineProcessorsList(
|
||||
currentCollectorProcessors []string,
|
||||
signozPipelineProcessorNames []string,
|
||||
@@ -98,59 +86,90 @@ func buildCollectorPipelineProcessorsList(
|
||||
lockLogsPipelineSpec.Lock()
|
||||
defer lockLogsPipelineSpec.Unlock()
|
||||
|
||||
// Build a set of the desired signoz processors so we can drop any stale version
|
||||
// of them (regardless of how they got into the current config) without
|
||||
// accidentally duplicating them in the output.
|
||||
desiredUserPipelineSet := make(map[string]struct{}, len(signozPipelineProcessorNames))
|
||||
for _, p := range signozPipelineProcessorNames {
|
||||
desiredUserPipelineSet[p] = struct{}{}
|
||||
exists := map[string]struct{}{}
|
||||
for _, v := range signozPipelineProcessorNames {
|
||||
exists[v] = struct{}{}
|
||||
}
|
||||
|
||||
result := make([]string, 0, len(currentCollectorProcessors)+len(signozPipelineProcessorNames))
|
||||
// removed the old processors which are not used
|
||||
var pipeline []string
|
||||
for _, procName := range currentCollectorProcessors {
|
||||
_, isInDesiredPipelineProcs := exists[procName]
|
||||
if isInDesiredPipelineProcs || !hasSignozPipelineProcessorPrefix(procName) {
|
||||
pipeline = append(pipeline, procName)
|
||||
}
|
||||
}
|
||||
|
||||
// Note: logic assumes there'll be only one batch processor
|
||||
var batchProcIdx int
|
||||
var batchProcFound bool
|
||||
iteration:
|
||||
for idx, p := range currentCollectorProcessors {
|
||||
_, inDesiredSet := desiredUserPipelineSet[p]
|
||||
switch {
|
||||
// same processor exist; retain the location of pre-existing location
|
||||
case p == memoryLimiterProcessor || strings.HasPrefix(p, memoryLimiterProcessorPrefix):
|
||||
result = append(result, p)
|
||||
case hasSignozPipelineProcessorPrefix(p):
|
||||
// this processor has been dropped
|
||||
if !inDesiredSet {
|
||||
continue iteration
|
||||
} else {
|
||||
result = append(result, p)
|
||||
// create a reverse map of existing config processors and their position
|
||||
existing := map[string]int{}
|
||||
for i, p := range pipeline {
|
||||
name := p
|
||||
existing[name] = i
|
||||
}
|
||||
|
||||
// create mapping from our logsParserPipeline to position in existing processors (from current config)
|
||||
// this means, if "batch" holds position 3 in the current effective config, and 2 in our config, the map will be [2]: 3
|
||||
specVsExistingMap := map[int]int{}
|
||||
existingVsSpec := map[int]int{}
|
||||
|
||||
// go through plan and map its elements to current positions in effective config
|
||||
for i, m := range signozPipelineProcessorNames {
|
||||
if loc, ok := existing[m]; ok {
|
||||
specVsExistingMap[i] = loc
|
||||
existingVsSpec[loc] = i
|
||||
}
|
||||
}
|
||||
|
||||
lastMatched := 0
|
||||
newPipeline := []string{}
|
||||
|
||||
for i := 0; i < len(signozPipelineProcessorNames); i++ {
|
||||
m := signozPipelineProcessorNames[i]
|
||||
if loc, ok := specVsExistingMap[i]; ok {
|
||||
for j := lastMatched; j < loc; j++ {
|
||||
if hasSignozPipelineProcessorPrefix(pipeline[j]) {
|
||||
delete(specVsExistingMap, existingVsSpec[j])
|
||||
} else {
|
||||
newPipeline = append(newPipeline, pipeline[j])
|
||||
}
|
||||
}
|
||||
case p == batchProcessor || strings.HasPrefix(p, batchProcessorPrefix):
|
||||
batchProcIdx = idx
|
||||
batchProcFound = true
|
||||
break iteration
|
||||
default:
|
||||
result = append(result, p)
|
||||
newPipeline = append(newPipeline, pipeline[loc])
|
||||
lastMatched = loc + 1
|
||||
} else {
|
||||
newPipeline = append(newPipeline, m)
|
||||
}
|
||||
|
||||
if inDesiredSet {
|
||||
// delete from desired pipeline set so they're not added twice
|
||||
delete(desiredUserPipelineSet, p)
|
||||
}
|
||||
}
|
||||
// add user pipelines
|
||||
for _, proc := range signozPipelineProcessorNames {
|
||||
_, add := desiredUserPipelineSet[proc]
|
||||
if add {
|
||||
result = append(result, proc)
|
||||
}
|
||||
if lastMatched < len(pipeline) {
|
||||
newPipeline = append(newPipeline, pipeline[lastMatched:]...)
|
||||
}
|
||||
|
||||
// add batch processor and rest
|
||||
if batchProcFound {
|
||||
result = append(result, currentCollectorProcessors[batchProcIdx:]...)
|
||||
if checkDuplicateString(newPipeline) {
|
||||
// duplicates are most likely because the processor sequence in effective config conflicts
|
||||
// with the planned sequence as per planned pipeline
|
||||
return pipeline, fmt.Errorf("the effective config has an unexpected processor sequence: %v", pipeline)
|
||||
}
|
||||
return result, nil
|
||||
|
||||
return newPipeline, nil
|
||||
}
|
||||
|
||||
func checkDuplicateString(pipeline []string) bool {
|
||||
exists := make(map[string]bool, len(pipeline))
|
||||
slog.Debug("checking duplicate processors in the pipeline", "pipeline", pipeline)
|
||||
for _, processor := range pipeline {
|
||||
name := processor
|
||||
if _, ok := exists[name]; ok {
|
||||
slog.Error(
|
||||
"duplicate processor name detected in generated collector config for log pipelines",
|
||||
"processor", processor,
|
||||
"pipeline", pipeline,
|
||||
)
|
||||
return true
|
||||
}
|
||||
|
||||
exists[name] = true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func GenerateCollectorConfigWithPipelines(config []byte, pipelines []pipelinetypes.GettablePipeline) ([]byte, error) {
|
||||
|
||||
@@ -106,109 +106,107 @@ func TestBuildLogParsingProcessors(t *testing.T) {
|
||||
}
|
||||
|
||||
var BuildLogsPipelineTestData = []struct {
|
||||
Name string
|
||||
fromCollector []string
|
||||
userPipelines []string
|
||||
finalOutput []string
|
||||
Name string
|
||||
currentPipeline []string
|
||||
logsPipeline []string
|
||||
expectedPipeline []string
|
||||
}{
|
||||
{
|
||||
Name: "Add new pipelines",
|
||||
fromCollector: []string{"processor1", "processor2"},
|
||||
userPipelines: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b"},
|
||||
finalOutput: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b"},
|
||||
Name: "Add new pipelines",
|
||||
currentPipeline: []string{"processor1", "processor2"},
|
||||
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b"},
|
||||
expectedPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", "processor1", "processor2"},
|
||||
},
|
||||
{
|
||||
Name: "Add new pipeline and respect custom processors",
|
||||
fromCollector: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2"},
|
||||
userPipelines: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c"},
|
||||
finalOutput: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2", constants.LogsPPLPfx + "c"},
|
||||
Name: "Add new pipeline and respect custom processors",
|
||||
currentPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2"},
|
||||
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c"},
|
||||
expectedPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", "processor2"},
|
||||
},
|
||||
{
|
||||
Name: "Add new pipeline and respect custom processors",
|
||||
fromCollector: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2"},
|
||||
userPipelines: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "d"},
|
||||
finalOutput: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "d"},
|
||||
Name: "Add new pipeline and respect custom processors",
|
||||
currentPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2"},
|
||||
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "d"},
|
||||
expectedPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "d", "processor2"},
|
||||
},
|
||||
{
|
||||
Name: "Add new pipeline and respect custom processors in the beginning and middle",
|
||||
fromCollector: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", "batch"},
|
||||
userPipelines: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c"},
|
||||
finalOutput: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", "batch"},
|
||||
Name: "Add new pipeline and respect custom processors in the beginning and middle",
|
||||
currentPipeline: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", "batch"},
|
||||
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c"},
|
||||
expectedPipeline: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", "batch"},
|
||||
},
|
||||
{
|
||||
Name: "Remove old pipeline add add new",
|
||||
fromCollector: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", "processor1", "processor2"},
|
||||
userPipelines: []string{constants.LogsPPLPfx + "a"},
|
||||
finalOutput: []string{constants.LogsPPLPfx + "a", "processor1", "processor2"},
|
||||
Name: "Remove old pipeline add add new",
|
||||
currentPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", "processor1", "processor2"},
|
||||
logsPipeline: []string{constants.LogsPPLPfx + "a"},
|
||||
expectedPipeline: []string{constants.LogsPPLPfx + "a", "processor1", "processor2"},
|
||||
},
|
||||
{
|
||||
Name: "Remove old pipeline from middle",
|
||||
fromCollector: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", constants.LogsPPLPfx + "b", "batch"},
|
||||
userPipelines: []string{constants.LogsPPLPfx + "a"},
|
||||
finalOutput: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", "batch"},
|
||||
Name: "Remove old pipeline from middle",
|
||||
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", constants.LogsPPLPfx + "b", "batch"},
|
||||
logsPipeline: []string{constants.LogsPPLPfx + "a"},
|
||||
expectedPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", "batch"},
|
||||
},
|
||||
{
|
||||
Name: "Remove old pipeline from middle and add new pipeline",
|
||||
fromCollector: []string{"memory_limiter", "processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", constants.LogsPPLPfx + "b", "batch"},
|
||||
userPipelines: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "c"},
|
||||
finalOutput: []string{"memory_limiter", "processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", constants.LogsPPLPfx + "c", "batch"},
|
||||
Name: "Remove old pipeline from middle and add new pipeline",
|
||||
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", constants.LogsPPLPfx + "b", "batch"},
|
||||
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "c"},
|
||||
expectedPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "c", "processor3", "batch"},
|
||||
},
|
||||
{
|
||||
Name: "Remove multiple old pipelines from middle and add multiple new ones",
|
||||
fromCollector: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", "processor3", constants.LogsPPLPfx + "c", "processor4", constants.LogsPPLPfx + "d", "processor5", "batch"},
|
||||
userPipelines: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "a1", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "c1"},
|
||||
finalOutput: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", "processor3", constants.LogsPPLPfx + "c", "processor4", "processor5", constants.LogsPPLPfx + "a1", constants.LogsPPLPfx + "c1", "batch"},
|
||||
Name: "Remove multiple old pipelines from middle and add multiple new ones",
|
||||
currentPipeline: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", "processor3", constants.LogsPPLPfx + "c", "processor4", constants.LogsPPLPfx + "d", "processor5", "batch"},
|
||||
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "a1", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "c1"},
|
||||
expectedPipeline: []string{"processor1", constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "a1", "processor2", "processor3", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "c1", "processor4", "processor5", "batch"},
|
||||
},
|
||||
|
||||
// working
|
||||
{
|
||||
Name: "rearrange pipelines",
|
||||
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
|
||||
logsPipeline: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a"},
|
||||
expectedPipeline: []string{"processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", "batch"},
|
||||
},
|
||||
{
|
||||
Name: "rearrange pipelines",
|
||||
fromCollector: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
|
||||
userPipelines: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a"},
|
||||
finalOutput: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
|
||||
Name: "rearrange pipelines with new processor",
|
||||
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
|
||||
logsPipeline: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c"},
|
||||
expectedPipeline: []string{"processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c", "batch"},
|
||||
// expectedPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_b", "processor3", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c", "batch"},
|
||||
},
|
||||
{
|
||||
Name: "rearrange pipelines with new processor",
|
||||
fromCollector: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
|
||||
userPipelines: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c"},
|
||||
finalOutput: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_c", "batch"},
|
||||
Name: "delete processor",
|
||||
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
|
||||
logsPipeline: []string{},
|
||||
expectedPipeline: []string{"processor1", "processor2", "processor3", "batch"},
|
||||
},
|
||||
{
|
||||
Name: "delete processor",
|
||||
fromCollector: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
|
||||
userPipelines: []string{},
|
||||
finalOutput: []string{"processor1", "processor2", "processor3", "batch"},
|
||||
Name: "last to first",
|
||||
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", "processor4", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c"},
|
||||
logsPipeline: []string{constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b"},
|
||||
expectedPipeline: []string{"processor1", "processor2", "processor3", "processor4", "batch", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b"},
|
||||
},
|
||||
{
|
||||
Name: "last to first",
|
||||
fromCollector: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", "processor4", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c"},
|
||||
userPipelines: []string{constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b"},
|
||||
finalOutput: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", "processor4", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_c", "batch", constants.LogsPPLPfx + "_c"},
|
||||
Name: "multiple rearrange pipelines",
|
||||
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"},
|
||||
logsPipeline: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e"},
|
||||
expectedPipeline: []string{"processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", "batch", "processor4", "processor5", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e", "processor6", "processor7"},
|
||||
},
|
||||
{
|
||||
Name: "multiple rearrange pipelines",
|
||||
fromCollector: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"},
|
||||
userPipelines: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e"},
|
||||
finalOutput: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"},
|
||||
},
|
||||
{
|
||||
Name: "multiple rearrange with new pipelines",
|
||||
fromCollector: []string{"memory_limiter", "processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"},
|
||||
userPipelines: []string{constants.LogsPPLPfx + "_z", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e"},
|
||||
finalOutput: []string{"memory_limiter", "processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_z", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"},
|
||||
},
|
||||
{
|
||||
Name: "Prefixed proc in desired set not duplicated from others",
|
||||
fromCollector: []string{"memory_limiter/logs", "custom_proc", "resourcedetection", "batch/logs"},
|
||||
userPipelines: []string{"custom_proc", constants.LogsPPLPfx + "a"},
|
||||
finalOutput: []string{"memory_limiter/logs", "custom_proc", "resourcedetection", constants.LogsPPLPfx + "a", "batch/logs"},
|
||||
Name: "multiple rearrange with new pipelines",
|
||||
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"},
|
||||
logsPipeline: []string{constants.LogsPPLPfx + "_z", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e"},
|
||||
expectedPipeline: []string{constants.LogsPPLPfx + "_z", "processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", "batch", "processor4", "processor5", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e", "processor6", "processor7"},
|
||||
},
|
||||
}
|
||||
|
||||
func TestBuildLogsPipeline(t *testing.T) {
|
||||
for _, test := range BuildLogsPipelineTestData {
|
||||
Convey(test.Name, t, func() {
|
||||
v, err := buildCollectorPipelineProcessorsList(test.fromCollector, test.userPipelines)
|
||||
v, err := buildCollectorPipelineProcessorsList(test.currentPipeline, test.logsPipeline)
|
||||
So(err, ShouldBeNil)
|
||||
So(v, ShouldResemble, test.finalOutput)
|
||||
fmt.Println(test.Name, "\n", test.currentPipeline, "\n", v, "\n", test.expectedPipeline)
|
||||
So(v, ShouldResemble, test.expectedPipeline)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user