Compare commits

...

2 Commits

Author SHA1 Message Date
Piyush Singariya
9bc8a02f32 fix: return error in enrich function 2026-03-24 13:14:55 +05:30
Piyush Singariya
8084efc162 fix: enrich unspecified fields 2026-03-24 13:01:37 +05:30
3 changed files with 103 additions and 9 deletions

View File

@@ -136,6 +136,7 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController(
signoz.SQLStore,
integrationsController.GetPipelinesForInstalledIntegrations,
reader,
)
if err != nil {
return nil, err

View File

@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"maps"
"slices"
"strings"
@@ -12,6 +13,7 @@ import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
"github.com/SigNoz/signoz/pkg/query-service/constants"
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
"github.com/SigNoz/signoz/pkg/query-service/model"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/utils"
@@ -34,19 +36,101 @@ type LogParsingPipelineController struct {
Repo
GetIntegrationPipelines func(context.Context, string) ([]pipelinetypes.GettablePipeline, error)
// TODO(Piyush): remove with qbv5 migration
reader interfaces.Reader
}
func NewLogParsingPipelinesController(
sqlStore sqlstore.SQLStore,
getIntegrationPipelines func(context.Context, string) ([]pipelinetypes.GettablePipeline, error),
reader interfaces.Reader,
) (*LogParsingPipelineController, error) {
repo := NewRepo(sqlStore)
return &LogParsingPipelineController{
Repo: repo,
GetIntegrationPipelines: getIntegrationPipelines,
reader: reader,
}, nil
}
// enrichPipelinesFilters resolves the type (tag vs resource) for filter keys that are
// missing type info, by looking them up in the store.
//
// TODO(Piyush): remove with qbv5 migration
func (pc *LogParsingPipelineController) enrichPipelinesFilters(
ctx context.Context, pipelines []pipelinetypes.GettablePipeline,
) ([]pipelinetypes.GettablePipeline, error) {
// Collect names of non-static keys that are missing type info.
// Static fields (body, trace_id, etc.) are intentionally Unspecified and map
// to top-level OTEL fields — they do not need enrichment.
unspecifiedNames := map[string]bool{}
for _, p := range pipelines {
if p.Filter != nil {
for _, item := range p.Filter.Items {
if item.Key.Type == v3.AttributeKeyTypeUnspecified {
// Skip static fields
if _, isStatic := constants.StaticFieldsLogsV3[item.Key.Key]; isStatic {
continue
}
// Skip enrich body.* fields
if strings.HasPrefix(item.Key.Key, "body.") {
continue
}
unspecifiedNames[item.Key.Key] = true
}
}
}
}
if len(unspecifiedNames) == 0 {
return pipelines, nil
}
logFields, apiErr := pc.reader.GetLogFieldsFromNames(ctx, slices.Collect(maps.Keys(unspecifiedNames)))
if apiErr != nil {
slog.ErrorContext(ctx, "failed to fetch log fields for pipeline filter enrichment", "error", apiErr)
return pipelines, apiErr
}
// Build a simple name → AttributeKeyType map from the response.
fieldTypes := map[string]v3.AttributeKeyType{}
for _, f := range append(logFields.Selected, logFields.Interesting...) {
switch f.Type {
case constants.Resources:
fieldTypes[f.Name] = v3.AttributeKeyTypeResource
case constants.Attributes:
fieldTypes[f.Name] = v3.AttributeKeyTypeTag
}
}
// Set the resolved type on each untyped filter key in-place.
for i := range pipelines {
if pipelines[i].Filter != nil {
for j := range pipelines[i].Filter.Items {
key := &pipelines[i].Filter.Items[j].Key
if key.Type == v3.AttributeKeyTypeUnspecified {
// Skip static fields
if _, isStatic := constants.StaticFieldsLogsV3[key.Key]; isStatic {
continue
}
// Skip enrich body.* fields
if strings.HasPrefix(key.Key, "body.") {
continue
}
if t, ok := fieldTypes[key.Key]; ok {
key.Type = t
} else {
// default to attribute
key.Type = v3.AttributeKeyTypeTag
}
}
}
}
}
return pipelines, nil
}
// PipelinesResponse is used to prepare http response for pipelines config related requests
type PipelinesResponse struct {
*opamptypes.AgentConfigVersion
@@ -256,7 +340,12 @@ func (ic *LogParsingPipelineController) PreviewLogsPipelines(
ctx context.Context,
request *PipelinesPreviewRequest,
) (*PipelinesPreviewResponse, error) {
result, collectorLogs, err := SimulatePipelinesProcessing(ctx, request.Pipelines, request.Logs)
pipelines, err := ic.enrichPipelinesFilters(ctx, request.Pipelines)
if err != nil {
return nil, err
}
result, collectorLogs, err := SimulatePipelinesProcessing(ctx, pipelines, request.Logs)
if err != nil {
return nil, err
}
@@ -293,10 +382,8 @@ func (pc *LogParsingPipelineController) RecommendAgentConfig(
if configVersion != nil {
pipelinesVersion = configVersion.Version
}
pipelinesResp, err := pc.GetPipelinesByVersion(
context.Background(), orgId, pipelinesVersion,
)
ctx := context.Background()
pipelinesResp, err := pc.GetPipelinesByVersion(ctx, orgId, pipelinesVersion)
if err != nil {
return nil, "", err
}
@@ -306,12 +393,17 @@ func (pc *LogParsingPipelineController) RecommendAgentConfig(
return nil, "", errors.WrapInternalf(err, CodeRawPipelinesMarshalFailed, "could not serialize pipelines to JSON")
}
if querybuilder.BodyJSONQueryEnabled {
// add default normalize pipeline at the beginning, only for sending to collector
pipelinesResp.Pipelines = append([]pipelinetypes.GettablePipeline{pc.getNormalizePipeline()}, pipelinesResp.Pipelines...)
enrichedPipelines, err := pc.enrichPipelinesFilters(ctx, pipelinesResp.Pipelines)
if err != nil {
return nil, "", err
}
updatedConf, err := GenerateCollectorConfigWithPipelines(currentConfYaml, pipelinesResp.Pipelines)
if querybuilder.BodyJSONQueryEnabled {
// add default normalize pipeline at the beginning, only for sending to collector
enrichedPipelines = append([]pipelinetypes.GettablePipeline{pc.getNormalizePipeline()}, enrichedPipelines...)
}
updatedConf, err := GenerateCollectorConfigWithPipelines(currentConfYaml, enrichedPipelines)
if err != nil {
return nil, "", err
}

View File

@@ -121,6 +121,7 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController(
signoz.SQLStore,
integrationsController.GetPipelinesForInstalledIntegrations,
reader,
)
if err != nil {
return nil, err