Compare commits

..

1 Commits

Author SHA1 Message Date
Piyush Singariya
ea2663b145 fix: enrich unspecified fields in logs pipelines filters (#10686)
Some checks are pending
build-staging / prepare (push) Waiting to run
build-staging / js-build (push) Blocked by required conditions
build-staging / go-build (push) Blocked by required conditions
build-staging / staging (push) Blocked by required conditions
Release Drafter / update_release_draft (push) Waiting to run
* fix: enrich unspecified fields

* fix: return error in enrich function

* chore: nit change asked
2026-03-25 05:01:26 +00:00
5 changed files with 108 additions and 80 deletions

View File

@@ -136,6 +136,7 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController(
signoz.SQLStore,
integrationsController.GetPipelinesForInstalledIntegrations,
reader,
)
if err != nil {
return nil, err

View File

@@ -1,65 +0,0 @@
import APIError from 'types/api/error';
import { errorDetails } from '../utils';
function makeAPIError(
message: string,
code = 'SOME_CODE',
errors: { message: string }[] = [],
): APIError {
return new APIError({
httpStatusCode: 500,
error: { code, message, url: '', errors },
});
}
describe('errorDetails', () => {
describe('when passed an APIError', () => {
it('returns the error message', () => {
const error = makeAPIError('something went wrong');
expect(errorDetails(error)).toBe('something went wrong');
});
it('appends details when errors array is non-empty', () => {
const error = makeAPIError('query failed', 'QUERY_ERROR', [
{ message: 'field X is invalid' },
{ message: 'field Y is missing' },
]);
const result = errorDetails(error);
expect(result).toContain('query failed');
expect(result).toContain('field X is invalid');
expect(result).toContain('field Y is missing');
});
it('does not append details when errors array is empty', () => {
const error = makeAPIError('simple error', 'CODE', []);
const result = errorDetails(error);
expect(result).toBe('simple error');
expect(result).not.toContain('Details');
});
});
describe('when passed a plain Error (not an APIError)', () => {
it('does not throw', () => {
const error = new Error('timeout exceeded');
expect(() => errorDetails(error)).not.toThrow();
});
it('returns the plain error message', () => {
const error = new Error('timeout exceeded');
expect(errorDetails(error)).toBe('timeout exceeded');
});
it('returns fallback when plain Error has no message', () => {
const error = new Error('');
expect(errorDetails(error)).toBe('Unknown error occurred');
});
});
describe('fallback behaviour', () => {
it('returns "Unknown error occurred" when message is undefined', () => {
const error = makeAPIError('');
expect(errorDetails(error)).toBe('Unknown error occurred');
});
});
});

View File

@@ -249,14 +249,13 @@ export const handleGraphClick = async ({
}
};
export const errorDetails = (error: APIError | Error): string => {
const { message, errors } =
(error instanceof APIError ? error.getErrorDetails()?.error : null) || {};
export const errorDetails = (error: APIError): string => {
const { message, errors } = error.getErrorDetails()?.error || {};
const details =
errors && errors.length > 0
errors?.length > 0
? `\n\nDetails: ${errors.map((e) => e.message).join('\n')}`
: '';
const errorDetails = `${message ?? error.message} ${details}`;
return errorDetails.trim() || 'Unknown error occurred';
const errorDetails = `${message} ${details}`;
return errorDetails || 'Unknown error occurred';
};

View File

@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"maps"
"slices"
"strings"
@@ -12,6 +13,7 @@ import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
"github.com/SigNoz/signoz/pkg/query-service/constants"
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
"github.com/SigNoz/signoz/pkg/query-service/model"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/utils"
@@ -34,19 +36,101 @@ type LogParsingPipelineController struct {
Repo
GetIntegrationPipelines func(context.Context, string) ([]pipelinetypes.GettablePipeline, error)
// TODO(Piyush): remove with qbv5 migration
reader interfaces.Reader
}
func NewLogParsingPipelinesController(
sqlStore sqlstore.SQLStore,
getIntegrationPipelines func(context.Context, string) ([]pipelinetypes.GettablePipeline, error),
reader interfaces.Reader,
) (*LogParsingPipelineController, error) {
repo := NewRepo(sqlStore)
return &LogParsingPipelineController{
Repo: repo,
GetIntegrationPipelines: getIntegrationPipelines,
reader: reader,
}, nil
}
// enrichPipelinesFilters resolves the type (tag vs resource) for filter keys that are
// missing type info, by looking them up in the store.
//
// TODO(Piyush): remove with qbv5 migration
func (pc *LogParsingPipelineController) enrichPipelinesFilters(
ctx context.Context, pipelines []pipelinetypes.GettablePipeline,
) ([]pipelinetypes.GettablePipeline, error) {
// Collect names of non-static keys that are missing type info.
// Static fields (body, trace_id, etc.) are intentionally Unspecified and map
// to top-level OTEL fields — they do not need enrichment.
unspecifiedNames := map[string]struct{}{}
for _, p := range pipelines {
if p.Filter != nil {
for _, item := range p.Filter.Items {
if item.Key.Type == v3.AttributeKeyTypeUnspecified {
// Skip static fields
if _, isStatic := constants.StaticFieldsLogsV3[item.Key.Key]; isStatic {
continue
}
// Skip enrich body.* fields
if strings.HasPrefix(item.Key.Key, "body.") {
continue
}
unspecifiedNames[item.Key.Key] = struct{}{}
}
}
}
}
if len(unspecifiedNames) == 0 {
return pipelines, nil
}
logFields, apiErr := pc.reader.GetLogFieldsFromNames(ctx, slices.Collect(maps.Keys(unspecifiedNames)))
if apiErr != nil {
slog.ErrorContext(ctx, "failed to fetch log fields for pipeline filter enrichment", "error", apiErr)
return pipelines, apiErr
}
// Build a simple name → AttributeKeyType map from the response.
fieldTypes := map[string]v3.AttributeKeyType{}
for _, f := range append(logFields.Selected, logFields.Interesting...) {
switch f.Type {
case constants.Resources:
fieldTypes[f.Name] = v3.AttributeKeyTypeResource
case constants.Attributes:
fieldTypes[f.Name] = v3.AttributeKeyTypeTag
}
}
// Set the resolved type on each untyped filter key in-place.
for i := range pipelines {
if pipelines[i].Filter != nil {
for j := range pipelines[i].Filter.Items {
key := &pipelines[i].Filter.Items[j].Key
if key.Type == v3.AttributeKeyTypeUnspecified {
// Skip static fields
if _, isStatic := constants.StaticFieldsLogsV3[key.Key]; isStatic {
continue
}
// Skip enrich body.* fields
if strings.HasPrefix(key.Key, "body.") {
continue
}
if t, ok := fieldTypes[key.Key]; ok {
key.Type = t
} else {
// default to attribute
key.Type = v3.AttributeKeyTypeTag
}
}
}
}
}
return pipelines, nil
}
// PipelinesResponse is used to prepare http response for pipelines config related requests
type PipelinesResponse struct {
*opamptypes.AgentConfigVersion
@@ -256,7 +340,12 @@ func (ic *LogParsingPipelineController) PreviewLogsPipelines(
ctx context.Context,
request *PipelinesPreviewRequest,
) (*PipelinesPreviewResponse, error) {
result, collectorLogs, err := SimulatePipelinesProcessing(ctx, request.Pipelines, request.Logs)
pipelines, err := ic.enrichPipelinesFilters(ctx, request.Pipelines)
if err != nil {
return nil, err
}
result, collectorLogs, err := SimulatePipelinesProcessing(ctx, pipelines, request.Logs)
if err != nil {
return nil, err
}
@@ -293,10 +382,8 @@ func (pc *LogParsingPipelineController) RecommendAgentConfig(
if configVersion != nil {
pipelinesVersion = configVersion.Version
}
pipelinesResp, err := pc.GetPipelinesByVersion(
context.Background(), orgId, pipelinesVersion,
)
ctx := context.Background()
pipelinesResp, err := pc.GetPipelinesByVersion(ctx, orgId, pipelinesVersion)
if err != nil {
return nil, "", err
}
@@ -306,12 +393,17 @@ func (pc *LogParsingPipelineController) RecommendAgentConfig(
return nil, "", errors.WrapInternalf(err, CodeRawPipelinesMarshalFailed, "could not serialize pipelines to JSON")
}
if querybuilder.BodyJSONQueryEnabled {
// add default normalize pipeline at the beginning, only for sending to collector
pipelinesResp.Pipelines = append([]pipelinetypes.GettablePipeline{pc.getNormalizePipeline()}, pipelinesResp.Pipelines...)
enrichedPipelines, err := pc.enrichPipelinesFilters(ctx, pipelinesResp.Pipelines)
if err != nil {
return nil, "", err
}
updatedConf, err := GenerateCollectorConfigWithPipelines(currentConfYaml, pipelinesResp.Pipelines)
if querybuilder.BodyJSONQueryEnabled {
// add default normalize pipeline at the beginning, only for sending to collector
enrichedPipelines = append([]pipelinetypes.GettablePipeline{pc.getNormalizePipeline()}, enrichedPipelines...)
}
updatedConf, err := GenerateCollectorConfigWithPipelines(currentConfYaml, enrichedPipelines)
if err != nil {
return nil, "", err
}

View File

@@ -121,6 +121,7 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController(
signoz.SQLStore,
integrationsController.GetPipelinesForInstalledIntegrations,
reader,
)
if err != nil {
return nil, err