Compare commits

..

3 Commits

Author SHA1 Message Date
Nikhil Soni
6114e7ae82 chore: add further filtering for required attribute columns 2026-05-18 11:28:40 +05:30
Nikhil Soni
91121fd9a1 refactor: move column selection to types 2026-05-18 10:57:48 +05:30
Nikhil Soni
877567d139 fix: only select the required attribute map 2026-05-18 10:50:44 +05:30
11 changed files with 50 additions and 541 deletions

2
go.mod
View File

@@ -23,7 +23,6 @@ require (
github.com/go-playground/validator/v10 v10.27.0
github.com/go-redis/redismock/v9 v9.2.0
github.com/go-viper/mapstructure/v2 v2.5.0
github.com/goccy/go-yaml v1.19.2
github.com/gojek/heimdall/v7 v7.0.3
github.com/golang-jwt/jwt/v5 v5.3.1
github.com/google/uuid v1.6.0
@@ -132,6 +131,7 @@ require (
github.com/go-openapi/swag/yamlutils v0.25.5 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/goccy/go-yaml v1.19.2 // indirect
github.com/google/go-cmp v0.7.0 // indirect
github.com/hashicorp/go-metrics v0.5.4 // indirect
github.com/huandu/go-clone v1.7.3 // indirect

View File

@@ -2,17 +2,12 @@ package implspanmapper
import (
"context"
"encoding/json"
"github.com/SigNoz/signoz/pkg/modules/spanmapper"
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/types/spantypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
var _ spanmapper.Module = (*module)(nil)
type module struct {
store spantypes.SpanMapperStore
}
@@ -39,22 +34,11 @@ func (module *module) UpdateGroup(ctx context.Context, orgID, id valuer.UUID, na
return err
}
group.Update(name, condition, enabled, updatedBy)
err = module.store.UpdateGroup(ctx, group)
if err != nil {
return err
}
agentConf.NotifyConfigUpdate(ctx)
return nil
return module.store.UpdateGroup(ctx, group)
}
func (module *module) DeleteGroup(ctx context.Context, orgID, id valuer.UUID) error {
err := module.store.DeleteGroup(ctx, orgID, id)
if err != nil {
return err
}
agentConf.NotifyConfigUpdate(ctx)
return nil
return module.store.DeleteGroup(ctx, orgID, id)
}
func (module *module) ListMappers(ctx context.Context, orgID, groupID valuer.UUID) ([]*spantypes.SpanMapper, error) {
@@ -70,12 +54,7 @@ func (module *module) CreateMapper(ctx context.Context, orgID, groupID valuer.UU
if _, err := module.store.GetGroup(ctx, orgID, groupID); err != nil {
return err
}
err := module.store.CreateMapper(ctx, mapper)
if err != nil {
return err
}
agentConf.NotifyConfigUpdate(ctx)
return nil
return module.store.CreateMapper(ctx, mapper)
}
func (module *module) UpdateMapper(ctx context.Context, orgID, groupID, id valuer.UUID, fieldContext spantypes.FieldContext, config *spantypes.SpanMapperConfig, enabled *bool, updatedBy string) error {
@@ -87,72 +66,9 @@ func (module *module) UpdateMapper(ctx context.Context, orgID, groupID, id value
return err
}
mapper.Update(fieldContext, config, enabled, updatedBy)
err = module.store.UpdateMapper(ctx, mapper)
if err != nil {
return err
}
agentConf.NotifyConfigUpdate(ctx)
return nil
return module.store.UpdateMapper(ctx, mapper)
}
func (module *module) DeleteMapper(ctx context.Context, orgID, groupID, id valuer.UUID) error {
err := module.store.DeleteMapper(ctx, orgID, groupID, id)
if err != nil {
return err
}
agentConf.NotifyConfigUpdate(ctx)
return nil
}
func (module *module) AgentFeatureType() agentConf.AgentFeatureType {
return spantypes.SpanAttrMappingFeatureType
}
func (module *module) RecommendAgentConfig(orgID valuer.UUID, currentConfYaml []byte, configVersion *opamptypes.AgentConfigVersion) ([]byte, string, error) {
ctx := context.Background()
enabled, err := module.listEnabledGroupsWithMappers(ctx, orgID)
if err != nil {
return nil, "", err
}
updatedConf, err := spantypes.GenerateCollectorConfigWithSpanMapperProcessor(currentConfYaml, enabled)
if err != nil {
return nil, "", err
}
serialized, err := json.Marshal(enabled)
if err != nil {
return nil, "", err
}
return updatedConf, string(serialized), nil
}
// listEnabledGroupsWithMappers returns groups with their mappers.
func (module *module) listEnabledGroupsWithMappers(ctx context.Context, orgID valuer.UUID) ([]*spantypes.SpanMapperGroupWithMappers, error) {
enabled := true
groups, err := module.store.ListGroups(ctx, orgID, &spantypes.ListSpanMapperGroupsQuery{Enabled: &enabled})
if err != nil {
return nil, err
}
out := make([]*spantypes.SpanMapperGroupWithMappers, 0, len(groups))
for _, g := range groups {
mappers, err := module.store.ListMappers(ctx, orgID, g.ID)
if err != nil {
return nil, err
}
if len(mappers) == 0 {
continue
}
enabledMappers := make([]*spantypes.SpanMapper, 0, len(mappers))
for _, m := range mappers {
if m.Enabled {
enabledMappers = append(enabledMappers, m)
}
}
out = append(out, &spantypes.SpanMapperGroupWithMappers{Group: g, Mappers: enabledMappers})
}
return out, nil
return module.store.DeleteMapper(ctx, orgID, groupID, id)
}

View File

@@ -4,16 +4,12 @@ import (
"context"
"net/http"
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
"github.com/SigNoz/signoz/pkg/types/spantypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
// Module defines the business logic for span attribute mapping groups and mappers.
type Module interface {
// Since this module interacts with OpAMP, it must implement the AgentFeature interface.
agentConf.AgentFeature
// Group operations
ListGroups(ctx context.Context, orgID valuer.UUID, q *spantypes.ListSpanMapperGroupsQuery) ([]*spantypes.SpanMapperGroup, error)
GetGroup(ctx context.Context, orgID, id valuer.UUID) (*spantypes.SpanMapperGroup, error)

View File

@@ -1156,9 +1156,11 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, orgID
r.logger.Info("cache miss for getFlamegraphSpansForTrace", "traceID", traceID)
selectCols := "timestamp, duration_nano, span_id, trace_id, has_error, links as references, resource_string_service$$name, name, events"
if len(req.SelectFields) > 0 {
selectCols += ", attributes_string, attributes_number, attributes_bool, resources_string"
selectFieldCols := req.GetSelectedFieldsSourceColumns()
if len(selectFieldCols) > 0 {
selectCols = fmt.Sprintf("%s, %s", selectCols, strings.Join(selectFieldCols, ", "))
}
flamegraphQuery := fmt.Sprintf("SELECT %s FROM %s.%s WHERE trace_id=$1 and ts_bucket_start>=$2 and ts_bucket_start<=$3 ORDER BY timestamp ASC, name ASC", selectCols, r.TraceDB, r.traceTableName)
searchScanResponses, err := r.GetSpansForTrace(ctx, traceID, flamegraphQuery)

View File

@@ -346,6 +346,45 @@ type GetFlamegraphSpansForTraceParams struct {
SelectFields []telemetrytypes.TelemetryFieldKey `json:"selectFields"`
}
func (r *GetFlamegraphSpansForTraceParams) GetSelectedFieldsSourceColumns() []string {
var needsAttrString, needsAttrNumber, needsAttrBool, needsResourceMap bool
for _, f := range r.SelectFields {
switch f.FieldContext {
case telemetrytypes.FieldContextAttribute:
switch f.FieldDataType {
case telemetrytypes.FieldDataTypeString:
needsAttrString = true
case telemetrytypes.FieldDataTypeFloat64, telemetrytypes.FieldDataTypeNumber, telemetrytypes.FieldDataTypeInt64:
needsAttrNumber = true
case telemetrytypes.FieldDataTypeBool:
needsAttrBool = true
default:
// Unknown type: AttributeValue searches all three maps, so we need all.
needsAttrString = true
needsAttrNumber = true
needsAttrBool = true
}
case telemetrytypes.FieldContextResource:
needsResourceMap = true
}
}
var cols []string
if needsAttrString {
cols = append(cols, "attributes_string")
}
if needsAttrNumber {
cols = append(cols, "attributes_number")
}
if needsAttrBool {
cols = append(cols, "attributes_bool")
}
if needsResourceMap {
cols = append(cols, "resources_string")
}
return cols
}
type SpanFilterParams struct {
TraceID []string `json:"traceID"`
Status []string `json:"status"`

View File

@@ -13,9 +13,7 @@ var (
ErrCodeMappingGroupAlreadyExists = errors.MustNewCode("span_attribute_mapping_group_already_exists")
)
// SpanMapperGroupCondition gates whether a group's rules run for a given span.
// A group runs when any attribute or resource key on the span CONTAINS one of
// the listed substrings (plain substring match — no glob syntax).
// A group runs when any of the listed attribute/resource key patterns match.
type SpanMapperGroupCondition struct {
Attributes []string `json:"attributes" required:"true" nullable:"true"`
Resource []string `json:"resource" required:"true" nullable:"true"`

View File

@@ -1,144 +0,0 @@
package spantypes
import (
"sort"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
"github.com/goccy/go-yaml"
)
const (
SpanAttrMappingFeatureType agentConf.AgentFeatureType = "span_attr_mapping"
ProcessorName = "signozspanmapper"
)
var (
ErrCodeInvalidCollectorConfig = errors.MustNewCode("invalid_collector_config")
ErrCodeBuildMappingProcessorConfig = errors.MustNewCode("build_mapping_processor_config")
)
type SpanMapperGroupWithMappers struct {
Group *SpanMapperGroup `json:"group"`
Mappers []*SpanMapper `json:"mappers"`
}
// spanMapperProcessorConfig is the collector config for signozspanmapper.
type spanMapperProcessorConfig struct {
Groups []spanMapperProcessorGroup `yaml:"groups" json:"groups"`
}
type spanMapperProcessorGroup struct {
ID string `yaml:"id" json:"id"`
ExistsAny spanMapperProcessorExistsAny `yaml:"exists_any" json:"exists_any"`
Attributes []spanMapperProcessorAttribute `yaml:"attributes" json:"attributes"`
}
type spanMapperProcessorExistsAny struct {
Attributes []string `yaml:"attributes,omitempty" json:"attributes,omitempty"`
Resource []string `yaml:"resource,omitempty" json:"resource,omitempty"`
}
type spanMapperProcessorAttribute struct {
Target string `yaml:"target" json:"target"`
Context string `yaml:"context,omitempty" json:"context,omitempty"`
Action string `yaml:"action,omitempty" json:"action,omitempty"`
Sources []string `yaml:"sources" json:"sources"`
}
func GenerateCollectorConfigWithSpanMapperProcessor(currentConfYaml []byte, groups []*SpanMapperGroupWithMappers) ([]byte, error) {
var collectorConf map[string]any
if err := yaml.Unmarshal(currentConfYaml, &collectorConf); err != nil {
return nil, errors.Wrapf(err, errors.TypeInvalidInput, ErrCodeInvalidCollectorConfig, "failed to unmarshal collector config")
}
// rare but don't do anything in this case, also means it's just comments.
if collectorConf == nil {
collectorConf = map[string]any{}
}
processors := map[string]any{}
if existing, ok := collectorConf["processors"]; ok && existing != nil {
p, ok := existing.(map[string]any)
if !ok {
return nil, errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidCollectorConfig, "collector config 'processors' must be a mapping, got %T", existing)
}
processors = p
}
procConfig := buildProcessorConfig(groups)
configBytes, err := yaml.Marshal(procConfig)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, ErrCodeBuildMappingProcessorConfig, "failed to marshal span mapper processor config")
}
var configMap any
if err := yaml.Unmarshal(configBytes, &configMap); err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, ErrCodeBuildMappingProcessorConfig, "failed to re-unmarshal span mapper processor config")
}
processors[ProcessorName] = configMap
collectorConf["processors"] = processors
out, err := yaml.Marshal(collectorConf)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, ErrCodeBuildMappingProcessorConfig, "failed to marshal collector config")
}
return out, nil
}
func buildProcessorConfig(groups []*SpanMapperGroupWithMappers) *spanMapperProcessorConfig {
out := make([]spanMapperProcessorGroup, 0, len(groups))
for _, gm := range groups {
rules := make([]spanMapperProcessorAttribute, 0, len(gm.Mappers))
for _, m := range gm.Mappers {
rules = append(rules, buildAttributeRule(m))
}
out = append(out, spanMapperProcessorGroup{
ID: gm.Group.Name,
ExistsAny: spanMapperProcessorExistsAny{
Attributes: gm.Group.Condition.Attributes,
Resource: gm.Group.Condition.Resource,
},
Attributes: rules,
})
}
return &spanMapperProcessorConfig{Groups: out}
}
// buildAttributeRule maps a single SpanMapper to a collector attribute rule.
// Sources are sorted by Priority DESC (highest-priority first), and read-from-
// resource sources are encoded via the "resource." prefix.
func buildAttributeRule(m *SpanMapper) spanMapperProcessorAttribute {
sources := make([]SpanMapperSource, len(m.Config.Sources))
copy(sources, m.Config.Sources)
sort.SliceStable(sources, func(i, j int) bool { return sources[i].Priority > sources[j].Priority })
keys := make([]string, 0, len(sources))
for _, s := range sources {
if s.Context == FieldContextResource {
keys = append(keys, FieldContextResource.StringValue()+"."+s.Key)
} else {
keys = append(keys, s.Key)
}
}
action := SpanMapperOperationCopy
if len(sources) > 0 && sources[0].Operation == SpanMapperOperationMove {
action = SpanMapperOperationMove
}
ctx := FieldContextSpanAttribute
if m.FieldContext == FieldContextResource {
ctx = FieldContextResource
}
return spanMapperProcessorAttribute{
Target: m.Name,
Context: ctx.StringValue(),
Action: action.StringValue(),
Sources: keys,
}
}

View File

@@ -1,193 +0,0 @@
package spantypes
import (
"os"
"path/filepath"
"testing"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"
)
func TestGenerateCollectorConfigWithSpanMapperProcessor(t *testing.T) {
t.Parallel()
baseline := loadFixture(t, "collector_baseline.yaml")
tests := []struct {
name string
groups []*SpanMapperGroupWithMappers
want string
}{
{
name: "no_groups",
want: "collector_no_groups.yaml",
},
{
name: "with_groups",
groups: []*SpanMapperGroupWithMappers{
{
Group: newGroup("llm", []string{"model"}, []string{"service.name"}),
Mappers: []*SpanMapper{
newMapper("gen_ai.request.model", FieldContextResource,
attrSrc("gen_ai.llm.model", SpanMapperOperationCopy, 3),
attrSrc("llm.model", SpanMapperOperationCopy, 2),
resSrc("service.name", SpanMapperOperationCopy, 1),
),
newMapper("gen_ai.request.tokens", FieldContextSpanAttribute,
attrSrc("gen_ai.request_tokens", SpanMapperOperationCopy, 2),
attrSrc("llm.tokens", SpanMapperOperationCopy, 1),
),
newMapper("gen_ai.request.input", FieldContextSpanAttribute,
attrSrc("gen_ai.input", SpanMapperOperationMove, 2),
attrSrc("llm.input", SpanMapperOperationMove, 1),
),
},
},
{
Group: newGroup("agent", []string{"agent."}, nil),
Mappers: []*SpanMapper{
newMapper("gen_ai.agent.name", FieldContextSpanAttribute,
attrSrc("agent.name", SpanMapperOperationCopy, 2),
attrSrc("llm.agent.name", SpanMapperOperationCopy, 1),
),
newMapper("gen_ai.agent.id", FieldContextSpanAttribute,
attrSrc("gen_ai.agent.id", SpanMapperOperationCopy, 2),
attrSrc("llm.agent.id", SpanMapperOperationCopy, 1),
),
},
},
{
Group: newGroup("tool", []string{"agent."}, nil),
Mappers: []*SpanMapper{
newMapper("gen_ai.tool.name", FieldContextSpanAttribute,
attrSrc("ai.tool.name", SpanMapperOperationCopy, 2),
attrSrc("llm.tool.name", SpanMapperOperationCopy, 1),
),
},
},
},
want: "collector_with_groups.yaml",
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
got, err := GenerateCollectorConfigWithSpanMapperProcessor(baseline, tc.groups)
require.NoError(t, err)
assertYAMLEqual(t, loadFixture(t, tc.want), got)
})
}
}
func TestGenerateCollectorConfigWithSpanMapperProcessor_Errors(t *testing.T) {
t.Parallel()
tests := []struct {
name string
in []byte
}{
{"processors_not_a_map", []byte("processors: not-a-map\n")},
{"malformed_yaml", []byte(": :")},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
_, err := GenerateCollectorConfigWithSpanMapperProcessor(tc.in, nil)
require.Error(t, err)
assert.True(t, errors.Ast(err, errors.TypeInvalidInput), "want TypeInvalidInput, got %v", err)
assert.True(t, errors.Asc(err, ErrCodeInvalidCollectorConfig), "want ErrCodeInvalidCollectorConfig, got %v", err)
})
}
}
func TestBuildAttributeRule(t *testing.T) {
t.Parallel()
tests := []struct {
name string
mapper *SpanMapper
want spanMapperProcessorAttribute
}{
{
name: "priority_sort_and_resource_prefix",
mapper: newMapper("gen_ai.request.model", FieldContextResource,
attrSrc("llm.model", SpanMapperOperationCopy, 20),
resSrc("service.name", SpanMapperOperationCopy, 10),
attrSrc("gen_ai.llm.model", SpanMapperOperationCopy, 30),
),
want: spanMapperProcessorAttribute{
Target: "gen_ai.request.model",
Context: FieldContextResource.StringValue(),
Action: SpanMapperOperationCopy.StringValue(),
Sources: []string{"gen_ai.llm.model", "llm.model", "resource.service.name"},
},
},
{
name: "move_action_follows_highest_priority_source",
mapper: newMapper("gen_ai.request.input", FieldContextSpanAttribute,
attrSrc("gen_ai.input", SpanMapperOperationMove, 20),
attrSrc("llm.input", SpanMapperOperationMove, 10),
),
want: spanMapperProcessorAttribute{
Target: "gen_ai.request.input",
Context: FieldContextSpanAttribute.StringValue(),
Action: SpanMapperOperationMove.StringValue(),
Sources: []string{"gen_ai.input", "llm.input"},
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
assert.Equal(t, tc.want, buildAttributeRule(tc.mapper))
})
}
}
func loadFixture(t *testing.T, name string) []byte {
t.Helper()
b, err := os.ReadFile(filepath.Join("testdata", name))
require.NoError(t, err)
return b
}
// assertYAMLEqual compares two YAML documents structurally so key order and
// slice formatting do not matter.
func assertYAMLEqual(t *testing.T, want, got []byte) {
t.Helper()
var w, g any
require.NoError(t, yaml.Unmarshal(want, &w))
require.NoError(t, yaml.Unmarshal(got, &g))
assert.Equal(t, w, g)
}
func newGroup(name string, attrs, res []string) *SpanMapperGroup {
return &SpanMapperGroup{
Name: name,
Condition: SpanMapperGroupCondition{Attributes: attrs, Resource: res},
Enabled: true,
}
}
func newMapper(name string, target FieldContext, sources ...SpanMapperSource) *SpanMapper {
return &SpanMapper{
Name: name,
FieldContext: target,
Config: SpanMapperConfig{Sources: sources},
Enabled: true,
}
}
func attrSrc(key string, op SpanMapperOperation, priority int) SpanMapperSource {
return SpanMapperSource{Key: key, Context: FieldContextSpanAttribute, Operation: op, Priority: priority}
}
func resSrc(key string, op SpanMapperOperation, priority int) SpanMapperSource {
return SpanMapperSource{Key: key, Context: FieldContextResource, Operation: op, Priority: priority}
}

View File

@@ -1,17 +0,0 @@
receivers:
otlp:
protocols:
grpc:
processors:
signozspanmapper:
groups: []
batch: {}
exporters:
otlp:
endpoint: localhost:4317
service:
pipelines:
traces:
receivers: [otlp]
processors: [signozspanmapper, batch]
exporters: [otlp]

View File

@@ -1,17 +0,0 @@
receivers:
otlp:
protocols:
grpc:
processors:
signozspanmapper:
groups: []
batch: {}
exporters:
otlp:
endpoint: localhost:4317
service:
pipelines:
traces:
receivers: [otlp]
processors: [signozspanmapper, batch]
exporters: [otlp]

View File

@@ -1,71 +0,0 @@
receivers:
otlp:
protocols:
grpc:
processors:
signozspanmapper:
groups:
- id: llm
exists_any:
attributes:
- model
resource:
- service.name
attributes:
- target: gen_ai.request.model
context: resource
action: copy
sources:
- gen_ai.llm.model
- llm.model
- resource.service.name
- target: gen_ai.request.tokens
context: attribute
action: copy
sources:
- gen_ai.request_tokens
- llm.tokens
- target: gen_ai.request.input
context: attribute
action: move
sources:
- gen_ai.input
- llm.input
- id: agent
exists_any:
attributes:
- agent.
attributes:
- target: gen_ai.agent.name
context: attribute
action: copy
sources:
- agent.name
- llm.agent.name
- target: gen_ai.agent.id
context: attribute
action: copy
sources:
- gen_ai.agent.id
- llm.agent.id
- id: tool
exists_any:
attributes:
- agent.
attributes:
- target: gen_ai.tool.name
context: attribute
action: copy
sources:
- ai.tool.name
- llm.tool.name
batch: {}
exporters:
otlp:
endpoint: localhost:4317
service:
pipelines:
traces:
receivers: [otlp]
processors: [signozspanmapper, batch]
exporters: [otlp]