mirror of
https://github.com/SigNoz/signoz.git
synced 2026-05-18 16:00:32 +01:00
Compare commits
3 Commits
ns/flamegr
...
issue_4361
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c71b16ff18 | ||
|
|
3c86f82e8f | ||
|
|
96b6c6caba |
@@ -11,6 +11,7 @@ function makeSpan(
|
||||
): FlamegraphSpan {
|
||||
return {
|
||||
parentSpanId: '',
|
||||
traceId: 'trace-1',
|
||||
hasError: false,
|
||||
serviceName: 'svc',
|
||||
name: 'op',
|
||||
|
||||
@@ -6,6 +6,7 @@ export const MOCK_SPAN: FlamegraphSpan = {
|
||||
durationNano: 50_000_000, // 50ms
|
||||
spanId: 'span-1',
|
||||
parentSpanId: '',
|
||||
traceId: 'trace-1',
|
||||
hasError: false,
|
||||
serviceName: 'test-service',
|
||||
name: 'test-span',
|
||||
|
||||
@@ -23,6 +23,7 @@ export interface FlamegraphSpan {
|
||||
durationNano: number;
|
||||
spanId: string;
|
||||
parentSpanId: string;
|
||||
traceId: string;
|
||||
hasError: boolean;
|
||||
serviceName: string;
|
||||
name: string;
|
||||
|
||||
2
go.mod
2
go.mod
@@ -23,6 +23,7 @@ require (
|
||||
github.com/go-playground/validator/v10 v10.27.0
|
||||
github.com/go-redis/redismock/v9 v9.2.0
|
||||
github.com/go-viper/mapstructure/v2 v2.5.0
|
||||
github.com/goccy/go-yaml v1.19.2
|
||||
github.com/gojek/heimdall/v7 v7.0.3
|
||||
github.com/golang-jwt/jwt/v5 v5.3.1
|
||||
github.com/google/uuid v1.6.0
|
||||
@@ -131,7 +132,6 @@ require (
|
||||
github.com/go-openapi/swag/yamlutils v0.25.5 // indirect
|
||||
github.com/go-playground/locales v0.14.1 // indirect
|
||||
github.com/go-playground/universal-translator v0.18.1 // indirect
|
||||
github.com/goccy/go-yaml v1.19.2 // indirect
|
||||
github.com/google/go-cmp v0.7.0 // indirect
|
||||
github.com/hashicorp/go-metrics v0.5.4 // indirect
|
||||
github.com/huandu/go-clone v1.7.3 // indirect
|
||||
|
||||
@@ -2,12 +2,17 @@ package implspanmapper
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/modules/spanmapper"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
|
||||
"github.com/SigNoz/signoz/pkg/types/opamptypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/spantypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
var _ spanmapper.Module = (*module)(nil)
|
||||
|
||||
type module struct {
|
||||
store spantypes.SpanMapperStore
|
||||
}
|
||||
@@ -34,11 +39,22 @@ func (module *module) UpdateGroup(ctx context.Context, orgID, id valuer.UUID, na
|
||||
return err
|
||||
}
|
||||
group.Update(name, condition, enabled, updatedBy)
|
||||
return module.store.UpdateGroup(ctx, group)
|
||||
|
||||
err = module.store.UpdateGroup(ctx, group)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
agentConf.NotifyConfigUpdate(ctx)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (module *module) DeleteGroup(ctx context.Context, orgID, id valuer.UUID) error {
|
||||
return module.store.DeleteGroup(ctx, orgID, id)
|
||||
err := module.store.DeleteGroup(ctx, orgID, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
agentConf.NotifyConfigUpdate(ctx)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (module *module) ListMappers(ctx context.Context, orgID, groupID valuer.UUID) ([]*spantypes.SpanMapper, error) {
|
||||
@@ -54,7 +70,12 @@ func (module *module) CreateMapper(ctx context.Context, orgID, groupID valuer.UU
|
||||
if _, err := module.store.GetGroup(ctx, orgID, groupID); err != nil {
|
||||
return err
|
||||
}
|
||||
return module.store.CreateMapper(ctx, mapper)
|
||||
err := module.store.CreateMapper(ctx, mapper)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
agentConf.NotifyConfigUpdate(ctx)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (module *module) UpdateMapper(ctx context.Context, orgID, groupID, id valuer.UUID, fieldContext spantypes.FieldContext, config *spantypes.SpanMapperConfig, enabled *bool, updatedBy string) error {
|
||||
@@ -66,9 +87,72 @@ func (module *module) UpdateMapper(ctx context.Context, orgID, groupID, id value
|
||||
return err
|
||||
}
|
||||
mapper.Update(fieldContext, config, enabled, updatedBy)
|
||||
return module.store.UpdateMapper(ctx, mapper)
|
||||
err = module.store.UpdateMapper(ctx, mapper)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
agentConf.NotifyConfigUpdate(ctx)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (module *module) DeleteMapper(ctx context.Context, orgID, groupID, id valuer.UUID) error {
|
||||
return module.store.DeleteMapper(ctx, orgID, groupID, id)
|
||||
err := module.store.DeleteMapper(ctx, orgID, groupID, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
agentConf.NotifyConfigUpdate(ctx)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (module *module) AgentFeatureType() agentConf.AgentFeatureType {
|
||||
return spantypes.SpanAttrMappingFeatureType
|
||||
}
|
||||
|
||||
func (module *module) RecommendAgentConfig(orgID valuer.UUID, currentConfYaml []byte, configVersion *opamptypes.AgentConfigVersion) ([]byte, string, error) {
|
||||
ctx := context.Background()
|
||||
|
||||
enabled, err := module.listEnabledGroupsWithMappers(ctx, orgID)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
updatedConf, err := spantypes.GenerateCollectorConfigWithSpanMapperProcessor(currentConfYaml, enabled)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
serialized, err := json.Marshal(enabled)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
return updatedConf, string(serialized), nil
|
||||
}
|
||||
|
||||
// listEnabledGroupsWithMappers returns groups with their mappers.
|
||||
func (module *module) listEnabledGroupsWithMappers(ctx context.Context, orgID valuer.UUID) ([]*spantypes.SpanMapperGroupWithMappers, error) {
|
||||
enabled := true
|
||||
groups, err := module.store.ListGroups(ctx, orgID, &spantypes.ListSpanMapperGroupsQuery{Enabled: &enabled})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
out := make([]*spantypes.SpanMapperGroupWithMappers, 0, len(groups))
|
||||
for _, g := range groups {
|
||||
mappers, err := module.store.ListMappers(ctx, orgID, g.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(mappers) == 0 {
|
||||
continue
|
||||
}
|
||||
enabledMappers := make([]*spantypes.SpanMapper, 0, len(mappers))
|
||||
for _, m := range mappers {
|
||||
if m.Enabled {
|
||||
enabledMappers = append(enabledMappers, m)
|
||||
}
|
||||
}
|
||||
out = append(out, &spantypes.SpanMapperGroupWithMappers{Group: g, Mappers: enabledMappers})
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
@@ -4,12 +4,16 @@ import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
|
||||
"github.com/SigNoz/signoz/pkg/types/spantypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
// Module defines the business logic for span attribute mapping groups and mappers.
|
||||
type Module interface {
|
||||
// Since this module interacts with OpAMP, it must implement the AgentFeature interface.
|
||||
agentConf.AgentFeature
|
||||
|
||||
// Group operations
|
||||
ListGroups(ctx context.Context, orgID valuer.UUID, q *spantypes.ListSpanMapperGroupsQuery) ([]*spantypes.SpanMapperGroup, error)
|
||||
GetGroup(ctx context.Context, orgID, id valuer.UUID) (*spantypes.SpanMapperGroup, error)
|
||||
|
||||
@@ -1140,8 +1140,6 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, orgID
|
||||
// map[traceID][level]span
|
||||
var selectedSpans = [][]*model.FlamegraphSpan{}
|
||||
var traceRoots []*model.FlamegraphSpan
|
||||
// time bounds for Pass 1 and Pass 2 (set on cache miss, zero on cache hit)
|
||||
var tsBucketStart, tsBucketEnd int64
|
||||
|
||||
// get the trace tree from cache!
|
||||
cachedTraceData, err := r.GetFlamegraphSpansForTraceCache(ctx, orgID, traceID)
|
||||
@@ -1157,59 +1155,62 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, orgID
|
||||
if err != nil {
|
||||
r.logger.Info("cache miss for getFlamegraphSpansForTrace", "traceID", traceID)
|
||||
|
||||
// Inline summary query to get time bounds shared by Pass 1 and Pass 2.
|
||||
var traceSummary model.TraceSummary
|
||||
summaryQuery := fmt.Sprintf(
|
||||
"SELECT trace_id, min(start) AS start, max(end) AS end, sum(num_spans) AS num_spans FROM %s.%s WHERE trace_id=$1 GROUP BY trace_id",
|
||||
r.TraceDB, r.traceSummaryTable)
|
||||
if summaryErr := r.db.QueryRow(ctx, summaryQuery, traceID).Scan(
|
||||
&traceSummary.TraceID, &traceSummary.Start, &traceSummary.End, &traceSummary.NumSpans,
|
||||
); summaryErr != nil {
|
||||
if summaryErr == sql.ErrNoRows {
|
||||
return trace, nil
|
||||
}
|
||||
r.logger.Error("Error in processing flamegraph trace summary sql query", errorsV2.Attr(summaryErr))
|
||||
return nil, model.ExecutionError(fmt.Errorf("getFlamegraphSpansForTrace: error querying trace summary: %w", summaryErr))
|
||||
selectCols := "timestamp, duration_nano, span_id, trace_id, has_error, links as references, resource_string_service$$name, name, events"
|
||||
if len(req.SelectFields) > 0 {
|
||||
selectCols += ", attributes_string, attributes_number, attributes_bool, resources_string"
|
||||
}
|
||||
tsBucketStart = traceSummary.Start.Unix() - 1800
|
||||
tsBucketEnd = traceSummary.End.Unix()
|
||||
flamegraphQuery := fmt.Sprintf("SELECT %s FROM %s.%s WHERE trace_id=$1 and ts_bucket_start>=$2 and ts_bucket_start<=$3 ORDER BY timestamp ASC, name ASC", selectCols, r.TraceDB, r.traceTableName)
|
||||
|
||||
// Pass 1: skeleton query — no events, no attribute maps.
|
||||
// Keeps tree-building memory lean; events are fetched in Pass 2 only for
|
||||
// the windowed spans that are actually returned in the response.
|
||||
skeletonQuery := fmt.Sprintf(
|
||||
"SELECT DISTINCT ON (span_id) timestamp, duration_nano, span_id, parent_span_id, has_error, resource_string_service$$name, name FROM %s.%s WHERE trace_id=$1 AND ts_bucket_start>=$2 AND ts_bucket_start<=$3 ORDER BY timestamp ASC, name ASC",
|
||||
r.TraceDB, r.traceTableName)
|
||||
|
||||
var skeletonSpans []model.SpanItemV2
|
||||
if skeletonErr := r.db.Select(ctx, &skeletonSpans, skeletonQuery, traceID,
|
||||
strconv.FormatInt(tsBucketStart, 10), strconv.FormatInt(tsBucketEnd, 10),
|
||||
); skeletonErr != nil {
|
||||
r.logger.Error("Error in processing flamegraph skeleton sql query", errorsV2.Attr(skeletonErr))
|
||||
return nil, model.ExecutionError(fmt.Errorf("getFlamegraphSpansForTrace: error querying skeleton spans: %w", skeletonErr))
|
||||
searchScanResponses, err := r.GetSpansForTrace(ctx, traceID, flamegraphQuery)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(skeletonSpans) == 0 {
|
||||
if len(searchScanResponses) == 0 {
|
||||
return trace, nil
|
||||
}
|
||||
|
||||
for _, item := range skeletonSpans {
|
||||
for _, item := range searchScanResponses {
|
||||
ref := []model.OtelSpanRef{}
|
||||
err := json.Unmarshal([]byte(item.References), &ref)
|
||||
if err != nil {
|
||||
r.logger.Error("Error unmarshalling references", errorsV2.Attr(err))
|
||||
return nil, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "getFlamegraphSpansForTrace: error in unmarshalling references %s", err.Error())
|
||||
}
|
||||
|
||||
events := make([]model.Event, 0)
|
||||
for _, event := range item.Events {
|
||||
var eventMap model.Event
|
||||
err = json.Unmarshal([]byte(event), &eventMap)
|
||||
if err != nil {
|
||||
r.logger.Error("Error unmarshalling events", errorsV2.Attr(err))
|
||||
return nil, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "getFlamegraphSpansForTrace: error in unmarshalling events %s", err.Error())
|
||||
}
|
||||
events = append(events, eventMap)
|
||||
}
|
||||
|
||||
jsonItem := model.FlamegraphSpan{
|
||||
SpanID: item.SpanID,
|
||||
TraceID: item.TraceID,
|
||||
ServiceName: item.ServiceName,
|
||||
Name: item.Name,
|
||||
DurationNano: item.DurationNano,
|
||||
HasError: item.HasError,
|
||||
ParentSpanID: item.ParentSpanId,
|
||||
References: ref,
|
||||
Events: events,
|
||||
Children: make([]*model.FlamegraphSpan, 0),
|
||||
}
|
||||
|
||||
if len(req.SelectFields) > 0 {
|
||||
jsonItem.SetRequestedFields(item, req.SelectFields)
|
||||
}
|
||||
|
||||
// metadata calculation
|
||||
startTimeUnixNano := uint64(item.TimeUnixNano.UnixNano())
|
||||
if startTime == 0 || startTimeUnixNano < startTime {
|
||||
startTime = startTimeUnixNano
|
||||
}
|
||||
if endTime == 0 || (startTimeUnixNano+jsonItem.DurationNano) > endTime {
|
||||
endTime = startTimeUnixNano + jsonItem.DurationNano
|
||||
endTime = (startTimeUnixNano + jsonItem.DurationNano)
|
||||
}
|
||||
if durationNano == 0 || jsonItem.DurationNano > durationNano {
|
||||
durationNano = jsonItem.DurationNano
|
||||
@@ -1218,34 +1219,41 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, orgID
|
||||
jsonItem.TimeUnixNano = uint64(item.TimeUnixNano.UnixNano() / 1000000)
|
||||
spanIdToSpanNodeMap[jsonItem.SpanID] = &jsonItem
|
||||
}
|
||||
skeletonSpans = nil
|
||||
|
||||
// build parent-child tree using parent_span_id; insert placeholders for missing parents
|
||||
// traverse through the map and append each node to the children array of the parent node
|
||||
// and add missing spans
|
||||
for _, spanNode := range spanIdToSpanNodeMap {
|
||||
if spanNode.ParentSpanID == "" {
|
||||
traceRoots = append(traceRoots, spanNode)
|
||||
} else if parentNode, exists := spanIdToSpanNodeMap[spanNode.ParentSpanID]; exists {
|
||||
parentNode.Children = append(parentNode.Children, spanNode)
|
||||
} else {
|
||||
if _, alreadyCreated := spanIdToSpanNodeMap[spanNode.ParentSpanID]; !alreadyCreated {
|
||||
missingSpan := &model.FlamegraphSpan{
|
||||
SpanID: spanNode.ParentSpanID,
|
||||
Name: "Missing Span",
|
||||
TimeUnixNano: spanNode.TimeUnixNano,
|
||||
DurationNano: spanNode.DurationNano,
|
||||
Events: make([]model.Event, 0),
|
||||
Children: make([]*model.FlamegraphSpan, 0),
|
||||
hasParentSpanNode := false
|
||||
for _, reference := range spanNode.References {
|
||||
if reference.RefType == "CHILD_OF" && reference.SpanId != "" {
|
||||
hasParentSpanNode = true
|
||||
if parentNode, exists := spanIdToSpanNodeMap[reference.SpanId]; exists {
|
||||
parentNode.Children = append(parentNode.Children, spanNode)
|
||||
} else {
|
||||
// insert the missing spans
|
||||
missingSpan := model.FlamegraphSpan{
|
||||
SpanID: reference.SpanId,
|
||||
TraceID: spanNode.TraceID,
|
||||
ServiceName: "",
|
||||
Name: "Missing Span",
|
||||
TimeUnixNano: spanNode.TimeUnixNano,
|
||||
DurationNano: spanNode.DurationNano,
|
||||
HasError: false,
|
||||
Events: make([]model.Event, 0),
|
||||
Children: make([]*model.FlamegraphSpan, 0),
|
||||
}
|
||||
missingSpan.Children = append(missingSpan.Children, spanNode)
|
||||
spanIdToSpanNodeMap[missingSpan.SpanID] = &missingSpan
|
||||
traceRoots = append(traceRoots, &missingSpan)
|
||||
}
|
||||
spanIdToSpanNodeMap[missingSpan.SpanID] = missingSpan
|
||||
traceRoots = append(traceRoots, missingSpan)
|
||||
}
|
||||
spanIdToSpanNodeMap[spanNode.ParentSpanID].Children = append(
|
||||
spanIdToSpanNodeMap[spanNode.ParentSpanID].Children, spanNode)
|
||||
}
|
||||
if !hasParentSpanNode && !tracedetail.ContainsFlamegraphSpan(traceRoots, spanNode) {
|
||||
traceRoots = append(traceRoots, spanNode)
|
||||
}
|
||||
}
|
||||
|
||||
selectedSpans = tracedetail.GetAllSpansForFlamegraph(traceRoots, spanIdToSpanNodeMap)
|
||||
spanIdToSpanNodeMap = nil
|
||||
|
||||
// TODO: set the trace data (model.GetFlamegraphSpansForTraceCache) in cache here
|
||||
// removed existing cache usage since it was not getting used due to this bug https://github.com/SigNoz/engineering-pod/issues/4648
|
||||
@@ -1268,74 +1276,6 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, orgID
|
||||
}
|
||||
r.logger.Debug("getFlamegraphSpansForTrace: processing post cache", "duration", time.Since(processingPostCache), "traceID", traceID, "totalSpans", totalSpanCount, "limit", clientLimit)
|
||||
|
||||
// Pass 2: hydrate events and requested attribute fields only for the selected window spans.
|
||||
// tsBucketStart is non-zero only when we performed a DB fetch (cache miss path).
|
||||
if err != nil && tsBucketStart != 0 {
|
||||
needsAttrMaps := false
|
||||
needsResourceMap := false
|
||||
for _, f := range req.SelectFields {
|
||||
if f.FieldContext == telemetrytypes.FieldContextAttribute {
|
||||
needsAttrMaps = true
|
||||
}
|
||||
if f.FieldContext == telemetrytypes.FieldContextResource {
|
||||
needsResourceMap = true
|
||||
}
|
||||
}
|
||||
|
||||
selectedSpanIDs := make([]string, 0)
|
||||
selectedSpanMap := make(map[string]*model.FlamegraphSpan)
|
||||
for _, level := range selectedSpansForRequest {
|
||||
for _, span := range level {
|
||||
selectedSpanIDs = append(selectedSpanIDs, span.SpanID)
|
||||
selectedSpanMap[span.SpanID] = span
|
||||
}
|
||||
}
|
||||
|
||||
if len(selectedSpanIDs) > 0 {
|
||||
hydrateCols := "span_id, events"
|
||||
if needsAttrMaps {
|
||||
hydrateCols += ", attributes_string, attributes_number, attributes_bool"
|
||||
}
|
||||
if needsResourceMap {
|
||||
hydrateCols += ", resources_string"
|
||||
}
|
||||
hydrateQuery := fmt.Sprintf(
|
||||
"SELECT %s FROM %s.%s WHERE trace_id=@traceID AND ts_bucket_start>=@tsStart AND ts_bucket_start<=@tsEnd AND span_id IN @spanIDs",
|
||||
hydrateCols, r.TraceDB, r.traceTableName)
|
||||
|
||||
var hydrateRows []model.SpanItemV2
|
||||
if hydrateErr := r.db.Select(ctx, &hydrateRows, hydrateQuery,
|
||||
clickhouse.Named("traceID", traceID),
|
||||
clickhouse.Named("tsStart", tsBucketStart),
|
||||
clickhouse.Named("tsEnd", tsBucketEnd),
|
||||
clickhouse.Named("spanIDs", selectedSpanIDs),
|
||||
); hydrateErr != nil {
|
||||
r.logger.Error("Error in processing flamegraph hydration sql query", errorsV2.Attr(hydrateErr))
|
||||
return nil, model.ExecutionError(fmt.Errorf("getFlamegraphSpansForTrace: error querying events: %w", hydrateErr))
|
||||
}
|
||||
|
||||
for _, item := range hydrateRows {
|
||||
span, ok := selectedSpanMap[item.SpanID]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
events := make([]model.Event, 0, len(item.Events))
|
||||
for _, event := range item.Events {
|
||||
var eventMap model.Event
|
||||
if unmarshalErr := json.Unmarshal([]byte(event), &eventMap); unmarshalErr != nil {
|
||||
r.logger.Error("Error unmarshalling events", errorsV2.Attr(unmarshalErr))
|
||||
return nil, errorsV2.Newf(errorsV2.TypeInternal, errorsV2.CodeInternal, "getFlamegraphSpansForTrace: error in unmarshalling events %s", unmarshalErr.Error())
|
||||
}
|
||||
events = append(events, eventMap)
|
||||
}
|
||||
span.Events = events
|
||||
if len(req.SelectFields) > 0 {
|
||||
span.SetRequestedFields(item, req.SelectFields)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
trace.Spans = selectedSpansForRequest
|
||||
trace.StartTimestampMillis = startTime / 1000000
|
||||
trace.EndTimestampMillis = endTime / 1000000
|
||||
|
||||
@@ -297,13 +297,14 @@ type FlamegraphSpan struct {
|
||||
TimeUnixNano uint64 `json:"timestamp"`
|
||||
DurationNano uint64 `json:"durationNano"`
|
||||
SpanID string `json:"spanId"`
|
||||
TraceID string `json:"traceId"`
|
||||
HasError bool `json:"hasError"`
|
||||
ServiceName string `json:"serviceName"`
|
||||
Name string `json:"name"`
|
||||
Level int64 `json:"level"`
|
||||
ParentSpanID string `json:"parentSpanId"`
|
||||
Events []Event `json:"event"`
|
||||
Children []*FlamegraphSpan `json:"-"`
|
||||
References []OtelSpanRef `json:"references,omitempty"`
|
||||
Children []*FlamegraphSpan `json:"children"`
|
||||
Attributes map[string]any `json:"attributes,omitempty"`
|
||||
Resource map[string]string `json:"resource,omitempty"`
|
||||
}
|
||||
|
||||
@@ -13,7 +13,9 @@ var (
|
||||
ErrCodeMappingGroupAlreadyExists = errors.MustNewCode("span_attribute_mapping_group_already_exists")
|
||||
)
|
||||
|
||||
// A group runs when any of the listed attribute/resource key patterns match.
|
||||
// SpanMapperGroupCondition gates whether a group's rules run for a given span.
|
||||
// A group runs when any attribute or resource key on the span CONTAINS one of
|
||||
// the listed substrings (plain substring match — no glob syntax).
|
||||
type SpanMapperGroupCondition struct {
|
||||
Attributes []string `json:"attributes" required:"true" nullable:"true"`
|
||||
Resource []string `json:"resource" required:"true" nullable:"true"`
|
||||
|
||||
144
pkg/types/spantypes/spanmapperprocessor.go
Normal file
144
pkg/types/spantypes/spanmapperprocessor.go
Normal file
@@ -0,0 +1,144 @@
|
||||
package spantypes
|
||||
|
||||
import (
|
||||
"sort"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
|
||||
"github.com/goccy/go-yaml"
|
||||
)
|
||||
|
||||
const (
|
||||
SpanAttrMappingFeatureType agentConf.AgentFeatureType = "span_attr_mapping"
|
||||
|
||||
ProcessorName = "signozspanmapper"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrCodeInvalidCollectorConfig = errors.MustNewCode("invalid_collector_config")
|
||||
ErrCodeBuildMappingProcessorConfig = errors.MustNewCode("build_mapping_processor_config")
|
||||
)
|
||||
|
||||
type SpanMapperGroupWithMappers struct {
|
||||
Group *SpanMapperGroup `json:"group"`
|
||||
Mappers []*SpanMapper `json:"mappers"`
|
||||
}
|
||||
|
||||
// spanMapperProcessorConfig is the collector config for signozspanmapper.
|
||||
type spanMapperProcessorConfig struct {
|
||||
Groups []spanMapperProcessorGroup `yaml:"groups" json:"groups"`
|
||||
}
|
||||
|
||||
type spanMapperProcessorGroup struct {
|
||||
ID string `yaml:"id" json:"id"`
|
||||
ExistsAny spanMapperProcessorExistsAny `yaml:"exists_any" json:"exists_any"`
|
||||
Attributes []spanMapperProcessorAttribute `yaml:"attributes" json:"attributes"`
|
||||
}
|
||||
|
||||
type spanMapperProcessorExistsAny struct {
|
||||
Attributes []string `yaml:"attributes,omitempty" json:"attributes,omitempty"`
|
||||
Resource []string `yaml:"resource,omitempty" json:"resource,omitempty"`
|
||||
}
|
||||
|
||||
type spanMapperProcessorAttribute struct {
|
||||
Target string `yaml:"target" json:"target"`
|
||||
Context string `yaml:"context,omitempty" json:"context,omitempty"`
|
||||
Action string `yaml:"action,omitempty" json:"action,omitempty"`
|
||||
Sources []string `yaml:"sources" json:"sources"`
|
||||
}
|
||||
|
||||
func GenerateCollectorConfigWithSpanMapperProcessor(currentConfYaml []byte, groups []*SpanMapperGroupWithMappers) ([]byte, error) {
|
||||
var collectorConf map[string]any
|
||||
if err := yaml.Unmarshal(currentConfYaml, &collectorConf); err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInvalidInput, ErrCodeInvalidCollectorConfig, "failed to unmarshal collector config")
|
||||
}
|
||||
// rare but don't do anything in this case, also means it's just comments.
|
||||
if collectorConf == nil {
|
||||
collectorConf = map[string]any{}
|
||||
}
|
||||
|
||||
processors := map[string]any{}
|
||||
if existing, ok := collectorConf["processors"]; ok && existing != nil {
|
||||
p, ok := existing.(map[string]any)
|
||||
if !ok {
|
||||
return nil, errors.Newf(errors.TypeInvalidInput, ErrCodeInvalidCollectorConfig, "collector config 'processors' must be a mapping, got %T", existing)
|
||||
}
|
||||
processors = p
|
||||
}
|
||||
|
||||
procConfig := buildProcessorConfig(groups)
|
||||
configBytes, err := yaml.Marshal(procConfig)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, ErrCodeBuildMappingProcessorConfig, "failed to marshal span mapper processor config")
|
||||
}
|
||||
var configMap any
|
||||
if err := yaml.Unmarshal(configBytes, &configMap); err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, ErrCodeBuildMappingProcessorConfig, "failed to re-unmarshal span mapper processor config")
|
||||
}
|
||||
|
||||
processors[ProcessorName] = configMap
|
||||
collectorConf["processors"] = processors
|
||||
|
||||
out, err := yaml.Marshal(collectorConf)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, ErrCodeBuildMappingProcessorConfig, "failed to marshal collector config")
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func buildProcessorConfig(groups []*SpanMapperGroupWithMappers) *spanMapperProcessorConfig {
|
||||
out := make([]spanMapperProcessorGroup, 0, len(groups))
|
||||
|
||||
for _, gm := range groups {
|
||||
rules := make([]spanMapperProcessorAttribute, 0, len(gm.Mappers))
|
||||
for _, m := range gm.Mappers {
|
||||
rules = append(rules, buildAttributeRule(m))
|
||||
}
|
||||
|
||||
out = append(out, spanMapperProcessorGroup{
|
||||
ID: gm.Group.Name,
|
||||
ExistsAny: spanMapperProcessorExistsAny{
|
||||
Attributes: gm.Group.Condition.Attributes,
|
||||
Resource: gm.Group.Condition.Resource,
|
||||
},
|
||||
Attributes: rules,
|
||||
})
|
||||
}
|
||||
|
||||
return &spanMapperProcessorConfig{Groups: out}
|
||||
}
|
||||
|
||||
// buildAttributeRule maps a single SpanMapper to a collector attribute rule.
|
||||
// Sources are sorted by Priority DESC (highest-priority first), and read-from-
|
||||
// resource sources are encoded via the "resource." prefix.
|
||||
func buildAttributeRule(m *SpanMapper) spanMapperProcessorAttribute {
|
||||
sources := make([]SpanMapperSource, len(m.Config.Sources))
|
||||
copy(sources, m.Config.Sources)
|
||||
sort.SliceStable(sources, func(i, j int) bool { return sources[i].Priority > sources[j].Priority })
|
||||
|
||||
keys := make([]string, 0, len(sources))
|
||||
for _, s := range sources {
|
||||
if s.Context == FieldContextResource {
|
||||
keys = append(keys, FieldContextResource.StringValue()+"."+s.Key)
|
||||
} else {
|
||||
keys = append(keys, s.Key)
|
||||
}
|
||||
}
|
||||
|
||||
action := SpanMapperOperationCopy
|
||||
if len(sources) > 0 && sources[0].Operation == SpanMapperOperationMove {
|
||||
action = SpanMapperOperationMove
|
||||
}
|
||||
|
||||
ctx := FieldContextSpanAttribute
|
||||
if m.FieldContext == FieldContextResource {
|
||||
ctx = FieldContextResource
|
||||
}
|
||||
|
||||
return spanMapperProcessorAttribute{
|
||||
Target: m.Name,
|
||||
Context: ctx.StringValue(),
|
||||
Action: action.StringValue(),
|
||||
Sources: keys,
|
||||
}
|
||||
}
|
||||
193
pkg/types/spantypes/spanmapperprocessor_test.go
Normal file
193
pkg/types/spantypes/spanmapperprocessor_test.go
Normal file
@@ -0,0 +1,193 @@
|
||||
package spantypes
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
func TestGenerateCollectorConfigWithSpanMapperProcessor(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
baseline := loadFixture(t, "collector_baseline.yaml")
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
groups []*SpanMapperGroupWithMappers
|
||||
want string
|
||||
}{
|
||||
{
|
||||
name: "no_groups",
|
||||
want: "collector_no_groups.yaml",
|
||||
},
|
||||
{
|
||||
name: "with_groups",
|
||||
groups: []*SpanMapperGroupWithMappers{
|
||||
{
|
||||
Group: newGroup("llm", []string{"model"}, []string{"service.name"}),
|
||||
Mappers: []*SpanMapper{
|
||||
newMapper("gen_ai.request.model", FieldContextResource,
|
||||
attrSrc("gen_ai.llm.model", SpanMapperOperationCopy, 3),
|
||||
attrSrc("llm.model", SpanMapperOperationCopy, 2),
|
||||
resSrc("service.name", SpanMapperOperationCopy, 1),
|
||||
),
|
||||
newMapper("gen_ai.request.tokens", FieldContextSpanAttribute,
|
||||
attrSrc("gen_ai.request_tokens", SpanMapperOperationCopy, 2),
|
||||
attrSrc("llm.tokens", SpanMapperOperationCopy, 1),
|
||||
),
|
||||
newMapper("gen_ai.request.input", FieldContextSpanAttribute,
|
||||
attrSrc("gen_ai.input", SpanMapperOperationMove, 2),
|
||||
attrSrc("llm.input", SpanMapperOperationMove, 1),
|
||||
),
|
||||
},
|
||||
},
|
||||
{
|
||||
Group: newGroup("agent", []string{"agent."}, nil),
|
||||
Mappers: []*SpanMapper{
|
||||
newMapper("gen_ai.agent.name", FieldContextSpanAttribute,
|
||||
attrSrc("agent.name", SpanMapperOperationCopy, 2),
|
||||
attrSrc("llm.agent.name", SpanMapperOperationCopy, 1),
|
||||
),
|
||||
newMapper("gen_ai.agent.id", FieldContextSpanAttribute,
|
||||
attrSrc("gen_ai.agent.id", SpanMapperOperationCopy, 2),
|
||||
attrSrc("llm.agent.id", SpanMapperOperationCopy, 1),
|
||||
),
|
||||
},
|
||||
},
|
||||
{
|
||||
Group: newGroup("tool", []string{"agent."}, nil),
|
||||
Mappers: []*SpanMapper{
|
||||
newMapper("gen_ai.tool.name", FieldContextSpanAttribute,
|
||||
attrSrc("ai.tool.name", SpanMapperOperationCopy, 2),
|
||||
attrSrc("llm.tool.name", SpanMapperOperationCopy, 1),
|
||||
),
|
||||
},
|
||||
},
|
||||
},
|
||||
want: "collector_with_groups.yaml",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
got, err := GenerateCollectorConfigWithSpanMapperProcessor(baseline, tc.groups)
|
||||
require.NoError(t, err)
|
||||
assertYAMLEqual(t, loadFixture(t, tc.want), got)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGenerateCollectorConfigWithSpanMapperProcessor_Errors(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
in []byte
|
||||
}{
|
||||
{"processors_not_a_map", []byte("processors: not-a-map\n")},
|
||||
{"malformed_yaml", []byte(": :")},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
_, err := GenerateCollectorConfigWithSpanMapperProcessor(tc.in, nil)
|
||||
require.Error(t, err)
|
||||
assert.True(t, errors.Ast(err, errors.TypeInvalidInput), "want TypeInvalidInput, got %v", err)
|
||||
assert.True(t, errors.Asc(err, ErrCodeInvalidCollectorConfig), "want ErrCodeInvalidCollectorConfig, got %v", err)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildAttributeRule(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
mapper *SpanMapper
|
||||
want spanMapperProcessorAttribute
|
||||
}{
|
||||
{
|
||||
name: "priority_sort_and_resource_prefix",
|
||||
mapper: newMapper("gen_ai.request.model", FieldContextResource,
|
||||
attrSrc("llm.model", SpanMapperOperationCopy, 20),
|
||||
resSrc("service.name", SpanMapperOperationCopy, 10),
|
||||
attrSrc("gen_ai.llm.model", SpanMapperOperationCopy, 30),
|
||||
),
|
||||
want: spanMapperProcessorAttribute{
|
||||
Target: "gen_ai.request.model",
|
||||
Context: FieldContextResource.StringValue(),
|
||||
Action: SpanMapperOperationCopy.StringValue(),
|
||||
Sources: []string{"gen_ai.llm.model", "llm.model", "resource.service.name"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "move_action_follows_highest_priority_source",
|
||||
mapper: newMapper("gen_ai.request.input", FieldContextSpanAttribute,
|
||||
attrSrc("gen_ai.input", SpanMapperOperationMove, 20),
|
||||
attrSrc("llm.input", SpanMapperOperationMove, 10),
|
||||
),
|
||||
want: spanMapperProcessorAttribute{
|
||||
Target: "gen_ai.request.input",
|
||||
Context: FieldContextSpanAttribute.StringValue(),
|
||||
Action: SpanMapperOperationMove.StringValue(),
|
||||
Sources: []string{"gen_ai.input", "llm.input"},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
assert.Equal(t, tc.want, buildAttributeRule(tc.mapper))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func loadFixture(t *testing.T, name string) []byte {
|
||||
t.Helper()
|
||||
b, err := os.ReadFile(filepath.Join("testdata", name))
|
||||
require.NoError(t, err)
|
||||
return b
|
||||
}
|
||||
|
||||
// assertYAMLEqual compares two YAML documents structurally so key order and
|
||||
// slice formatting do not matter.
|
||||
func assertYAMLEqual(t *testing.T, want, got []byte) {
|
||||
t.Helper()
|
||||
var w, g any
|
||||
require.NoError(t, yaml.Unmarshal(want, &w))
|
||||
require.NoError(t, yaml.Unmarshal(got, &g))
|
||||
assert.Equal(t, w, g)
|
||||
}
|
||||
|
||||
func newGroup(name string, attrs, res []string) *SpanMapperGroup {
|
||||
return &SpanMapperGroup{
|
||||
Name: name,
|
||||
Condition: SpanMapperGroupCondition{Attributes: attrs, Resource: res},
|
||||
Enabled: true,
|
||||
}
|
||||
}
|
||||
|
||||
func newMapper(name string, target FieldContext, sources ...SpanMapperSource) *SpanMapper {
|
||||
return &SpanMapper{
|
||||
Name: name,
|
||||
FieldContext: target,
|
||||
Config: SpanMapperConfig{Sources: sources},
|
||||
Enabled: true,
|
||||
}
|
||||
}
|
||||
|
||||
func attrSrc(key string, op SpanMapperOperation, priority int) SpanMapperSource {
|
||||
return SpanMapperSource{Key: key, Context: FieldContextSpanAttribute, Operation: op, Priority: priority}
|
||||
}
|
||||
|
||||
func resSrc(key string, op SpanMapperOperation, priority int) SpanMapperSource {
|
||||
return SpanMapperSource{Key: key, Context: FieldContextResource, Operation: op, Priority: priority}
|
||||
}
|
||||
17
pkg/types/spantypes/testdata/collector_baseline.yaml
vendored
Normal file
17
pkg/types/spantypes/testdata/collector_baseline.yaml
vendored
Normal file
@@ -0,0 +1,17 @@
|
||||
receivers:
|
||||
otlp:
|
||||
protocols:
|
||||
grpc:
|
||||
processors:
|
||||
signozspanmapper:
|
||||
groups: []
|
||||
batch: {}
|
||||
exporters:
|
||||
otlp:
|
||||
endpoint: localhost:4317
|
||||
service:
|
||||
pipelines:
|
||||
traces:
|
||||
receivers: [otlp]
|
||||
processors: [signozspanmapper, batch]
|
||||
exporters: [otlp]
|
||||
17
pkg/types/spantypes/testdata/collector_no_groups.yaml
vendored
Normal file
17
pkg/types/spantypes/testdata/collector_no_groups.yaml
vendored
Normal file
@@ -0,0 +1,17 @@
|
||||
receivers:
|
||||
otlp:
|
||||
protocols:
|
||||
grpc:
|
||||
processors:
|
||||
signozspanmapper:
|
||||
groups: []
|
||||
batch: {}
|
||||
exporters:
|
||||
otlp:
|
||||
endpoint: localhost:4317
|
||||
service:
|
||||
pipelines:
|
||||
traces:
|
||||
receivers: [otlp]
|
||||
processors: [signozspanmapper, batch]
|
||||
exporters: [otlp]
|
||||
71
pkg/types/spantypes/testdata/collector_with_groups.yaml
vendored
Normal file
71
pkg/types/spantypes/testdata/collector_with_groups.yaml
vendored
Normal file
@@ -0,0 +1,71 @@
|
||||
receivers:
|
||||
otlp:
|
||||
protocols:
|
||||
grpc:
|
||||
processors:
|
||||
signozspanmapper:
|
||||
groups:
|
||||
- id: llm
|
||||
exists_any:
|
||||
attributes:
|
||||
- model
|
||||
resource:
|
||||
- service.name
|
||||
attributes:
|
||||
- target: gen_ai.request.model
|
||||
context: resource
|
||||
action: copy
|
||||
sources:
|
||||
- gen_ai.llm.model
|
||||
- llm.model
|
||||
- resource.service.name
|
||||
- target: gen_ai.request.tokens
|
||||
context: attribute
|
||||
action: copy
|
||||
sources:
|
||||
- gen_ai.request_tokens
|
||||
- llm.tokens
|
||||
- target: gen_ai.request.input
|
||||
context: attribute
|
||||
action: move
|
||||
sources:
|
||||
- gen_ai.input
|
||||
- llm.input
|
||||
- id: agent
|
||||
exists_any:
|
||||
attributes:
|
||||
- agent.
|
||||
attributes:
|
||||
- target: gen_ai.agent.name
|
||||
context: attribute
|
||||
action: copy
|
||||
sources:
|
||||
- agent.name
|
||||
- llm.agent.name
|
||||
- target: gen_ai.agent.id
|
||||
context: attribute
|
||||
action: copy
|
||||
sources:
|
||||
- gen_ai.agent.id
|
||||
- llm.agent.id
|
||||
- id: tool
|
||||
exists_any:
|
||||
attributes:
|
||||
- agent.
|
||||
attributes:
|
||||
- target: gen_ai.tool.name
|
||||
context: attribute
|
||||
action: copy
|
||||
sources:
|
||||
- ai.tool.name
|
||||
- llm.tool.name
|
||||
batch: {}
|
||||
exporters:
|
||||
otlp:
|
||||
endpoint: localhost:4317
|
||||
service:
|
||||
pipelines:
|
||||
traces:
|
||||
receivers: [otlp]
|
||||
processors: [signozspanmapper, batch]
|
||||
exporters: [otlp]
|
||||
Reference in New Issue
Block a user