|
|
|
|
@@ -4,6 +4,7 @@ import (
|
|
|
|
|
"context"
|
|
|
|
|
"encoding/json"
|
|
|
|
|
"fmt"
|
|
|
|
|
"maps"
|
|
|
|
|
"slices"
|
|
|
|
|
"strings"
|
|
|
|
|
|
|
|
|
|
@@ -12,6 +13,7 @@ import (
|
|
|
|
|
"github.com/SigNoz/signoz/pkg/errors"
|
|
|
|
|
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
|
|
|
|
|
"github.com/SigNoz/signoz/pkg/query-service/constants"
|
|
|
|
|
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
|
|
|
|
|
"github.com/SigNoz/signoz/pkg/query-service/model"
|
|
|
|
|
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
|
|
|
|
"github.com/SigNoz/signoz/pkg/query-service/utils"
|
|
|
|
|
@@ -34,19 +36,101 @@ type LogParsingPipelineController struct {
|
|
|
|
|
Repo
|
|
|
|
|
|
|
|
|
|
GetIntegrationPipelines func(context.Context, string) ([]pipelinetypes.GettablePipeline, error)
|
|
|
|
|
// TODO(Piyush): remove with qbv5 migration
|
|
|
|
|
reader interfaces.Reader
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewLogParsingPipelinesController(
|
|
|
|
|
sqlStore sqlstore.SQLStore,
|
|
|
|
|
getIntegrationPipelines func(context.Context, string) ([]pipelinetypes.GettablePipeline, error),
|
|
|
|
|
reader interfaces.Reader,
|
|
|
|
|
) (*LogParsingPipelineController, error) {
|
|
|
|
|
repo := NewRepo(sqlStore)
|
|
|
|
|
return &LogParsingPipelineController{
|
|
|
|
|
Repo: repo,
|
|
|
|
|
GetIntegrationPipelines: getIntegrationPipelines,
|
|
|
|
|
reader: reader,
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// enrichPipelinesFilters resolves the type (tag vs resource) for filter keys that are
|
|
|
|
|
// missing type info, by looking them up in the store.
|
|
|
|
|
//
|
|
|
|
|
// TODO(Piyush): remove with qbv5 migration
|
|
|
|
|
func (pc *LogParsingPipelineController) enrichPipelinesFilters(
|
|
|
|
|
ctx context.Context, pipelines []pipelinetypes.GettablePipeline,
|
|
|
|
|
) ([]pipelinetypes.GettablePipeline, error) {
|
|
|
|
|
// Collect names of non-static keys that are missing type info.
|
|
|
|
|
// Static fields (body, trace_id, etc.) are intentionally Unspecified and map
|
|
|
|
|
// to top-level OTEL fields — they do not need enrichment.
|
|
|
|
|
unspecifiedNames := map[string]bool{}
|
|
|
|
|
for _, p := range pipelines {
|
|
|
|
|
if p.Filter != nil {
|
|
|
|
|
for _, item := range p.Filter.Items {
|
|
|
|
|
if item.Key.Type == v3.AttributeKeyTypeUnspecified {
|
|
|
|
|
// Skip static fields
|
|
|
|
|
if _, isStatic := constants.StaticFieldsLogsV3[item.Key.Key]; isStatic {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
// Skip enrich body.* fields
|
|
|
|
|
if strings.HasPrefix(item.Key.Key, "body.") {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
unspecifiedNames[item.Key.Key] = true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if len(unspecifiedNames) == 0 {
|
|
|
|
|
return pipelines, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logFields, apiErr := pc.reader.GetLogFieldsFromNames(ctx, slices.Collect(maps.Keys(unspecifiedNames)))
|
|
|
|
|
if apiErr != nil {
|
|
|
|
|
slog.ErrorContext(ctx, "failed to fetch log fields for pipeline filter enrichment", "error", apiErr)
|
|
|
|
|
return pipelines, apiErr
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Build a simple name → AttributeKeyType map from the response.
|
|
|
|
|
fieldTypes := map[string]v3.AttributeKeyType{}
|
|
|
|
|
for _, f := range append(logFields.Selected, logFields.Interesting...) {
|
|
|
|
|
switch f.Type {
|
|
|
|
|
case constants.Resources:
|
|
|
|
|
fieldTypes[f.Name] = v3.AttributeKeyTypeResource
|
|
|
|
|
case constants.Attributes:
|
|
|
|
|
fieldTypes[f.Name] = v3.AttributeKeyTypeTag
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Set the resolved type on each untyped filter key in-place.
|
|
|
|
|
for i := range pipelines {
|
|
|
|
|
if pipelines[i].Filter != nil {
|
|
|
|
|
for j := range pipelines[i].Filter.Items {
|
|
|
|
|
key := &pipelines[i].Filter.Items[j].Key
|
|
|
|
|
if key.Type == v3.AttributeKeyTypeUnspecified {
|
|
|
|
|
// Skip static fields
|
|
|
|
|
if _, isStatic := constants.StaticFieldsLogsV3[key.Key]; isStatic {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
// Skip enrich body.* fields
|
|
|
|
|
if strings.HasPrefix(key.Key, "body.") {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if t, ok := fieldTypes[key.Key]; ok {
|
|
|
|
|
key.Type = t
|
|
|
|
|
} else {
|
|
|
|
|
// default to attribute
|
|
|
|
|
key.Type = v3.AttributeKeyTypeTag
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return pipelines, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// PipelinesResponse is used to prepare http response for pipelines config related requests
|
|
|
|
|
type PipelinesResponse struct {
|
|
|
|
|
*opamptypes.AgentConfigVersion
|
|
|
|
|
@@ -256,7 +340,12 @@ func (ic *LogParsingPipelineController) PreviewLogsPipelines(
|
|
|
|
|
ctx context.Context,
|
|
|
|
|
request *PipelinesPreviewRequest,
|
|
|
|
|
) (*PipelinesPreviewResponse, error) {
|
|
|
|
|
result, collectorLogs, err := SimulatePipelinesProcessing(ctx, request.Pipelines, request.Logs)
|
|
|
|
|
pipelines, err := ic.enrichPipelinesFilters(ctx, request.Pipelines)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
result, collectorLogs, err := SimulatePipelinesProcessing(ctx, pipelines, request.Logs)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
@@ -293,10 +382,8 @@ func (pc *LogParsingPipelineController) RecommendAgentConfig(
|
|
|
|
|
if configVersion != nil {
|
|
|
|
|
pipelinesVersion = configVersion.Version
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pipelinesResp, err := pc.GetPipelinesByVersion(
|
|
|
|
|
context.Background(), orgId, pipelinesVersion,
|
|
|
|
|
)
|
|
|
|
|
ctx := context.Background()
|
|
|
|
|
pipelinesResp, err := pc.GetPipelinesByVersion(ctx, orgId, pipelinesVersion)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, "", err
|
|
|
|
|
}
|
|
|
|
|
@@ -306,12 +393,17 @@ func (pc *LogParsingPipelineController) RecommendAgentConfig(
|
|
|
|
|
return nil, "", errors.WrapInternalf(err, CodeRawPipelinesMarshalFailed, "could not serialize pipelines to JSON")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if querybuilder.BodyJSONQueryEnabled {
|
|
|
|
|
// add default normalize pipeline at the beginning, only for sending to collector
|
|
|
|
|
pipelinesResp.Pipelines = append([]pipelinetypes.GettablePipeline{pc.getNormalizePipeline()}, pipelinesResp.Pipelines...)
|
|
|
|
|
enrichedPipelines, err := pc.enrichPipelinesFilters(ctx, pipelinesResp.Pipelines)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, "", err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
updatedConf, err := GenerateCollectorConfigWithPipelines(currentConfYaml, pipelinesResp.Pipelines)
|
|
|
|
|
if querybuilder.BodyJSONQueryEnabled {
|
|
|
|
|
// add default normalize pipeline at the beginning, only for sending to collector
|
|
|
|
|
enrichedPipelines = append([]pipelinetypes.GettablePipeline{pc.getNormalizePipeline()}, enrichedPipelines...)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
updatedConf, err := GenerateCollectorConfigWithPipelines(currentConfYaml, enrichedPipelines)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, "", err
|
|
|
|
|
}
|
|
|
|
|
|