mirror of
https://github.com/SigNoz/signoz.git
synced 2026-06-23 08:30:35 +01:00
Compare commits
2 Commits
feat/event
...
issue_5267
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1e1be670f1 | ||
|
|
d1682f2ab6 |
@@ -6992,6 +6992,16 @@ components:
|
||||
required:
|
||||
- items
|
||||
type: object
|
||||
SpantypesGettableSpanMapperTest:
|
||||
properties:
|
||||
spans:
|
||||
items:
|
||||
$ref: '#/components/schemas/SpantypesSpanMapperTestSpan'
|
||||
nullable: true
|
||||
type: array
|
||||
required:
|
||||
- spans
|
||||
type: object
|
||||
SpantypesGettableTraceAggregations:
|
||||
properties:
|
||||
aggregations:
|
||||
@@ -7079,6 +7089,39 @@ components:
|
||||
- name
|
||||
- condition
|
||||
type: object
|
||||
SpantypesPostableSpanMapperTest:
|
||||
properties:
|
||||
groups:
|
||||
items:
|
||||
$ref: '#/components/schemas/SpantypesPostableSpanMapperTestGroup'
|
||||
nullable: true
|
||||
type: array
|
||||
spans:
|
||||
items:
|
||||
$ref: '#/components/schemas/SpantypesSpanMapperTestSpan'
|
||||
nullable: true
|
||||
type: array
|
||||
required:
|
||||
- spans
|
||||
- groups
|
||||
type: object
|
||||
SpantypesPostableSpanMapperTestGroup:
|
||||
properties:
|
||||
condition:
|
||||
$ref: '#/components/schemas/SpantypesSpanMapperGroupCondition'
|
||||
enabled:
|
||||
type: boolean
|
||||
mappers:
|
||||
items:
|
||||
$ref: '#/components/schemas/SpantypesPostableSpanMapper'
|
||||
nullable: true
|
||||
type: array
|
||||
name:
|
||||
type: string
|
||||
required:
|
||||
- name
|
||||
- condition
|
||||
type: object
|
||||
SpantypesPostableTraceAggregations:
|
||||
properties:
|
||||
aggregations:
|
||||
@@ -7240,6 +7283,17 @@ components:
|
||||
- operation
|
||||
- priority
|
||||
type: object
|
||||
SpantypesSpanMapperTestSpan:
|
||||
properties:
|
||||
attributes:
|
||||
additionalProperties: {}
|
||||
nullable: true
|
||||
type: object
|
||||
resource:
|
||||
additionalProperties: {}
|
||||
nullable: true
|
||||
type: object
|
||||
type: object
|
||||
SpantypesUpdatableSpanMapper:
|
||||
properties:
|
||||
config:
|
||||
@@ -12797,6 +12851,69 @@ paths:
|
||||
summary: Update a span mapper
|
||||
tags:
|
||||
- spanmapper
|
||||
/api/v1/span_mapper_groups/test:
|
||||
post:
|
||||
deprecated: false
|
||||
description: Tests how span mappers would transform sample spans
|
||||
operationId: TestSpanMappers
|
||||
requestBody:
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/SpantypesPostableSpanMapperTest'
|
||||
responses:
|
||||
"200":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
properties:
|
||||
data:
|
||||
$ref: '#/components/schemas/SpantypesGettableSpanMapperTest'
|
||||
status:
|
||||
type: string
|
||||
required:
|
||||
- status
|
||||
- data
|
||||
type: object
|
||||
description: OK
|
||||
"400":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/RenderErrorResponse'
|
||||
description: Bad Request
|
||||
"401":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/RenderErrorResponse'
|
||||
description: Unauthorized
|
||||
"403":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/RenderErrorResponse'
|
||||
description: Forbidden
|
||||
"404":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/RenderErrorResponse'
|
||||
description: Not Found
|
||||
"500":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/RenderErrorResponse'
|
||||
description: Internal Server Error
|
||||
security:
|
||||
- api_key:
|
||||
- VIEWER
|
||||
- tokenizer:
|
||||
- VIEWER
|
||||
summary: Test span mappers against sample spans
|
||||
tags:
|
||||
- spanmapper
|
||||
/api/v1/stats:
|
||||
get:
|
||||
deprecated: false
|
||||
|
||||
@@ -8135,6 +8135,44 @@ export interface SpantypesGettableSpanMapperGroupsDTO {
|
||||
items: SpantypesSpanMapperGroupDTO[];
|
||||
}
|
||||
|
||||
export type SpantypesSpanMapperTestSpanDTOAttributesAnyOf = {
|
||||
[key: string]: unknown;
|
||||
};
|
||||
|
||||
/**
|
||||
* @nullable
|
||||
*/
|
||||
export type SpantypesSpanMapperTestSpanDTOAttributes =
|
||||
SpantypesSpanMapperTestSpanDTOAttributesAnyOf | null;
|
||||
|
||||
export type SpantypesSpanMapperTestSpanDTOResourceAnyOf = {
|
||||
[key: string]: unknown;
|
||||
};
|
||||
|
||||
/**
|
||||
* @nullable
|
||||
*/
|
||||
export type SpantypesSpanMapperTestSpanDTOResource =
|
||||
SpantypesSpanMapperTestSpanDTOResourceAnyOf | null;
|
||||
|
||||
export interface SpantypesSpanMapperTestSpanDTO {
|
||||
/**
|
||||
* @type object,null
|
||||
*/
|
||||
attributes?: SpantypesSpanMapperTestSpanDTOAttributes;
|
||||
/**
|
||||
* @type object,null
|
||||
*/
|
||||
resource?: SpantypesSpanMapperTestSpanDTOResource;
|
||||
}
|
||||
|
||||
export interface SpantypesGettableSpanMapperTestDTO {
|
||||
/**
|
||||
* @type array,null
|
||||
*/
|
||||
spans: SpantypesSpanMapperTestSpanDTO[] | null;
|
||||
}
|
||||
|
||||
export enum SpantypesSpanAggregationTypeDTO {
|
||||
span_count = 'span_count',
|
||||
execution_time_percentage = 'execution_time_percentage',
|
||||
@@ -8430,6 +8468,33 @@ export interface SpantypesPostableSpanMapperGroupDTO {
|
||||
name: string;
|
||||
}
|
||||
|
||||
export interface SpantypesPostableSpanMapperTestGroupDTO {
|
||||
condition: SpantypesSpanMapperGroupConditionDTO | null;
|
||||
/**
|
||||
* @type boolean
|
||||
*/
|
||||
enabled?: boolean;
|
||||
/**
|
||||
* @type array,null
|
||||
*/
|
||||
mappers?: SpantypesPostableSpanMapperDTO[] | null;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
name: string;
|
||||
}
|
||||
|
||||
export interface SpantypesPostableSpanMapperTestDTO {
|
||||
/**
|
||||
* @type array,null
|
||||
*/
|
||||
groups: SpantypesPostableSpanMapperTestGroupDTO[] | null;
|
||||
/**
|
||||
* @type array,null
|
||||
*/
|
||||
spans: SpantypesSpanMapperTestSpanDTO[] | null;
|
||||
}
|
||||
|
||||
export interface SpantypesSpanAggregationDTO {
|
||||
aggregation: SpantypesSpanAggregationTypeDTO;
|
||||
field: TelemetrytypesTelemetryFieldKeyDTO;
|
||||
@@ -9798,6 +9863,14 @@ export type UpdateSpanMapperPathParameters = {
|
||||
groupId: string;
|
||||
mapperId: string;
|
||||
};
|
||||
export type TestSpanMappers200 = {
|
||||
data: SpantypesGettableSpanMapperTestDTO;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
status: string;
|
||||
};
|
||||
|
||||
export type GetStats200Data = { [key: string]: unknown };
|
||||
|
||||
export type GetStats200 = {
|
||||
|
||||
@@ -30,8 +30,10 @@ import type {
|
||||
RenderErrorResponseDTO,
|
||||
SpantypesPostableSpanMapperDTO,
|
||||
SpantypesPostableSpanMapperGroupDTO,
|
||||
SpantypesPostableSpanMapperTestDTO,
|
||||
SpantypesUpdatableSpanMapperDTO,
|
||||
SpantypesUpdatableSpanMapperGroupDTO,
|
||||
TestSpanMappers200,
|
||||
UpdateSpanMapperGroupPathParameters,
|
||||
UpdateSpanMapperPathParameters,
|
||||
} from '../sigNoz.schemas';
|
||||
@@ -780,3 +782,86 @@ export const useUpdateSpanMapper = <
|
||||
> => {
|
||||
return useMutation(getUpdateSpanMapperMutationOptions(options));
|
||||
};
|
||||
/**
|
||||
* Tests how span mappers would transform sample spans
|
||||
* @summary Test span mappers against sample spans
|
||||
*/
|
||||
export const testSpanMappers = (
|
||||
spantypesPostableSpanMapperTestDTO?: BodyType<SpantypesPostableSpanMapperTestDTO>,
|
||||
signal?: AbortSignal,
|
||||
) => {
|
||||
return GeneratedAPIInstance<TestSpanMappers200>({
|
||||
url: `/api/v1/span_mapper_groups/test`,
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
data: spantypesPostableSpanMapperTestDTO,
|
||||
signal,
|
||||
});
|
||||
};
|
||||
|
||||
export const getTestSpanMappersMutationOptions = <
|
||||
TError = ErrorType<RenderErrorResponseDTO>,
|
||||
TContext = unknown,
|
||||
>(options?: {
|
||||
mutation?: UseMutationOptions<
|
||||
Awaited<ReturnType<typeof testSpanMappers>>,
|
||||
TError,
|
||||
{ data?: BodyType<SpantypesPostableSpanMapperTestDTO> },
|
||||
TContext
|
||||
>;
|
||||
}): UseMutationOptions<
|
||||
Awaited<ReturnType<typeof testSpanMappers>>,
|
||||
TError,
|
||||
{ data?: BodyType<SpantypesPostableSpanMapperTestDTO> },
|
||||
TContext
|
||||
> => {
|
||||
const mutationKey = ['testSpanMappers'];
|
||||
const { mutation: mutationOptions } = options
|
||||
? options.mutation &&
|
||||
'mutationKey' in options.mutation &&
|
||||
options.mutation.mutationKey
|
||||
? options
|
||||
: { ...options, mutation: { ...options.mutation, mutationKey } }
|
||||
: { mutation: { mutationKey } };
|
||||
|
||||
const mutationFn: MutationFunction<
|
||||
Awaited<ReturnType<typeof testSpanMappers>>,
|
||||
{ data?: BodyType<SpantypesPostableSpanMapperTestDTO> }
|
||||
> = (props) => {
|
||||
const { data } = props ?? {};
|
||||
|
||||
return testSpanMappers(data);
|
||||
};
|
||||
|
||||
return { mutationFn, ...mutationOptions };
|
||||
};
|
||||
|
||||
export type TestSpanMappersMutationResult = NonNullable<
|
||||
Awaited<ReturnType<typeof testSpanMappers>>
|
||||
>;
|
||||
export type TestSpanMappersMutationBody =
|
||||
| BodyType<SpantypesPostableSpanMapperTestDTO>
|
||||
| undefined;
|
||||
export type TestSpanMappersMutationError = ErrorType<RenderErrorResponseDTO>;
|
||||
|
||||
/**
|
||||
* @summary Test span mappers against sample spans
|
||||
*/
|
||||
export const useTestSpanMappers = <
|
||||
TError = ErrorType<RenderErrorResponseDTO>,
|
||||
TContext = unknown,
|
||||
>(options?: {
|
||||
mutation?: UseMutationOptions<
|
||||
Awaited<ReturnType<typeof testSpanMappers>>,
|
||||
TError,
|
||||
{ data?: BodyType<SpantypesPostableSpanMapperTestDTO> },
|
||||
TContext
|
||||
>;
|
||||
}): UseMutationResult<
|
||||
Awaited<ReturnType<typeof testSpanMappers>>,
|
||||
TError,
|
||||
{ data?: BodyType<SpantypesPostableSpanMapperTestDTO> },
|
||||
TContext
|
||||
> => {
|
||||
return useMutation(getTestSpanMappersMutationOptions(options));
|
||||
};
|
||||
|
||||
@@ -51,6 +51,26 @@ func (provider *provider) addSpanMapperRoutes(router *mux.Router) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := router.Handle("/api/v1/span_mapper_groups/test", handler.New(
|
||||
provider.authzMiddleware.ViewAccess(provider.spanMapperHandler.TestMappers),
|
||||
handler.OpenAPIDef{
|
||||
ID: "TestSpanMappers",
|
||||
Tags: []string{"spanmapper"},
|
||||
Summary: "Test span mappers against sample spans",
|
||||
Description: "Tests how span mappers would transform sample spans",
|
||||
Request: new(spantypes.PostableSpanMapperTest),
|
||||
RequestContentType: "application/json",
|
||||
Response: new(spantypes.GettableSpanMapperTest),
|
||||
ResponseContentType: "application/json",
|
||||
SuccessStatusCode: http.StatusOK,
|
||||
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound},
|
||||
Deprecated: false,
|
||||
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
|
||||
},
|
||||
)).Methods(http.MethodPost).GetError(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := router.Handle("/api/v1/span_mapper_groups/{groupId}", handler.New(
|
||||
provider.authzMiddleware.AdminAccess(provider.spanMapperHandler.UpdateGroup),
|
||||
handler.OpenAPIDef{
|
||||
|
||||
@@ -273,6 +273,35 @@ func (h *handler) DeleteMapper(rw http.ResponseWriter, r *http.Request) {
|
||||
render.Success(rw, http.StatusNoContent, nil)
|
||||
}
|
||||
|
||||
// TestMappers handles POST /api/v1/span_mapper_groups/test.
|
||||
func (h *handler) TestMappers(rw http.ResponseWriter, r *http.Request) {
|
||||
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
claims, err := authtypes.ClaimsFromContext(ctx)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
orgID := valuer.MustNewUUID(claims.OrgID)
|
||||
|
||||
req := new(spantypes.PostableSpanMapperTest)
|
||||
if err := binding.JSON.BindBody(r.Body, req); err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
groups := spantypes.NewSpanMapperGroupsWithMappersFromPostable(orgID, req.Groups)
|
||||
out, err := h.module.TestMappers(ctx, orgID, req.Spans, groups)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
render.Success(rw, http.StatusOK, &spantypes.GettableSpanMapperTest{Spans: out})
|
||||
}
|
||||
|
||||
// groupIDFromPath extracts and validates the {id} or {groupId} path variable.
|
||||
func groupIDFromPath(r *http.Request) (valuer.UUID, error) {
|
||||
vars := mux.Vars(r)
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/modules/spanmapper"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
|
||||
"github.com/SigNoz/signoz/pkg/types/opamptypes"
|
||||
@@ -102,6 +103,53 @@ func (module *module) DeleteMapper(ctx context.Context, orgID, groupID, id value
|
||||
return nil
|
||||
}
|
||||
|
||||
func (module *module) TestMappers(ctx context.Context, orgID valuer.UUID, spans []spantypes.SpanMapperTestSpan, groups []*spantypes.SpanMapperGroupWithMappers) ([]spantypes.SpanMapperTestSpan, error) {
|
||||
if len(spans) == 0 {
|
||||
return nil, errors.New(errors.TypeInvalidInput, spantypes.ErrCodeMappingInvalidInput, "'spans' must contain at least one span")
|
||||
}
|
||||
|
||||
resolved, err := module.backfillMappers(ctx, orgID, groups)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
out, _, err := spantypes.SimulateSpanMappersProcessing(ctx, resolved, spans)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// backfillMappers loads saved mappers for any group whose Mappers is nil.
|
||||
func (module *module) backfillMappers(ctx context.Context, orgID valuer.UUID, groups []*spantypes.SpanMapperGroupWithMappers) ([]*spantypes.SpanMapperGroupWithMappers, error) {
|
||||
// Load all the saved groups for this org, so we can look up by name.
|
||||
savedGroups, err := module.store.ListGroups(ctx, orgID, &spantypes.ListSpanMapperGroupsQuery{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
savedByName := make(map[string]*spantypes.SpanMapperGroup, len(savedGroups))
|
||||
for _, g := range savedGroups {
|
||||
savedByName[g.Name] = g
|
||||
}
|
||||
|
||||
// For each group in the request, if Mappers is nil, load the saved mappers for that group name.
|
||||
for _, g := range groups {
|
||||
if g.Mappers != nil {
|
||||
continue
|
||||
}
|
||||
saved, ok := savedByName[g.Group.Name]
|
||||
if !ok {
|
||||
return nil, errors.Newf(errors.TypeInvalidInput, spantypes.ErrCodeMappingGroupNotFound, "no saved group named %q to load mappers from; send 'mappers' for new or edited groups", g.Group.Name)
|
||||
}
|
||||
loaded, err := module.store.ListMappers(ctx, orgID, saved.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
g.Mappers = loaded
|
||||
}
|
||||
return groups, nil
|
||||
}
|
||||
|
||||
func (module *module) AgentFeatureType() agentConf.AgentFeatureType {
|
||||
return spantypes.SpanAttrMappingFeatureType
|
||||
}
|
||||
|
||||
@@ -27,6 +27,7 @@ type Module interface {
|
||||
CreateMapper(ctx context.Context, orgID, groupID valuer.UUID, mapper *spantypes.SpanMapper) error
|
||||
UpdateMapper(ctx context.Context, orgID, groupID, id valuer.UUID, fieldContext spantypes.FieldContext, config *spantypes.SpanMapperConfig, enabled *bool, updatedBy string) error
|
||||
DeleteMapper(ctx context.Context, orgID, groupID, id valuer.UUID) error
|
||||
TestMappers(ctx context.Context, orgID valuer.UUID, spans []spantypes.SpanMapperTestSpan, groups []*spantypes.SpanMapperGroupWithMappers) ([]spantypes.SpanMapperTestSpan, error)
|
||||
}
|
||||
|
||||
// Handler defines the HTTP handler interface for mapping group and mapper endpoints.
|
||||
@@ -42,4 +43,5 @@ type Handler interface {
|
||||
CreateMapper(rw http.ResponseWriter, r *http.Request)
|
||||
UpdateMapper(rw http.ResponseWriter, r *http.Request)
|
||||
DeleteMapper(rw http.ResponseWriter, r *http.Request)
|
||||
TestMappers(rw http.ResponseWriter, r *http.Request)
|
||||
}
|
||||
|
||||
193
pkg/types/spantypes/spanmappersimulator.go
Normal file
193
pkg/types/spantypes/spanmappersimulator.go
Normal file
@@ -0,0 +1,193 @@
|
||||
package spantypes
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz-otel-collector/pkg/collectorsimulator"
|
||||
"github.com/SigNoz/signoz-otel-collector/processor/signozspanmapperprocessor"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"go.opentelemetry.io/collector/otelcol"
|
||||
"go.opentelemetry.io/collector/pdata/ptrace"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrCodeProcessorFactoryMapFailed = errors.MustNewCode("processor_factory_map_failed")
|
||||
ErrCodeSpanMapperSimulationFailed = errors.MustNewCode("span_mapper_simulation_failed")
|
||||
)
|
||||
|
||||
const spanInputOrderAttr = "__signoz_input_idx__"
|
||||
|
||||
// SimulateSpanMappersProcessing runs the given spans through an in-memory
|
||||
// collector pipeline that hosts signozspanmapperprocessor configured by the
|
||||
// supplied groups, and returns the transformed spans. Mirrors
|
||||
// SimulatePipelinesProcessing in pkg/query-service/app/logparsingpipeline.
|
||||
func SimulateSpanMappersProcessing(ctx context.Context, groups []*SpanMapperGroupWithMappers, spans []SpanMapperTestSpan) ([]SpanMapperTestSpan, []string, error) {
|
||||
enabled := filterEnabledGroupsWithMappers(groups)
|
||||
if len(enabled) < 1 {
|
||||
return spans, nil, nil
|
||||
}
|
||||
|
||||
for i := range spans {
|
||||
if spans[i].Attributes == nil {
|
||||
spans[i].Attributes = map[string]any{}
|
||||
}
|
||||
spans[i].Attributes[spanInputOrderAttr] = int64(i)
|
||||
}
|
||||
simulatorInput := SpansToPTraces(spans)
|
||||
|
||||
processorFactories, err := otelcol.MakeFactoryMap(signozspanmapperprocessor.NewFactory())
|
||||
if err != nil {
|
||||
return nil, nil, errors.WrapInternalf(err, ErrCodeProcessorFactoryMapFailed, "could not construct processor factory map")
|
||||
}
|
||||
|
||||
configGenerator := func(baseConf []byte) ([]byte, error) {
|
||||
withProcessor, err := GenerateCollectorConfigWithSpanMapperProcessor(baseConf, enabled)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return wireSpanMapperIntoTracesPipeline(withProcessor)
|
||||
}
|
||||
|
||||
// signozspanmapperprocessor does no batching; spans flow through immediately.
|
||||
timeout := 200 * time.Millisecond
|
||||
|
||||
outputTraces, collectorErrs, simErr := collectorsimulator.SimulateTracesProcessing(
|
||||
ctx,
|
||||
processorFactories,
|
||||
configGenerator,
|
||||
simulatorInput,
|
||||
timeout,
|
||||
)
|
||||
if simErr != nil {
|
||||
if errors.Is(simErr, collectorsimulator.ErrInvalidConfig) {
|
||||
return nil, nil, errors.WrapInvalidInputf(simErr, errors.CodeInvalidInput, "invalid config")
|
||||
}
|
||||
return nil, nil, errors.WrapInternalf(simErr, ErrCodeSpanMapperSimulationFailed, "could not simulate span mapper processing")
|
||||
}
|
||||
|
||||
outputSpans := PTracesToSpans(outputTraces)
|
||||
|
||||
sort.Slice(outputSpans, func(i, j int) bool {
|
||||
iIdx, _ := outputSpans[i].Attributes[spanInputOrderAttr].(int64)
|
||||
jIdx, _ := outputSpans[j].Attributes[spanInputOrderAttr].(int64)
|
||||
return iIdx < jIdx
|
||||
})
|
||||
for _, s := range outputSpans {
|
||||
delete(s.Attributes, spanInputOrderAttr)
|
||||
}
|
||||
|
||||
collectorWarnAndErrorLogs := []string{}
|
||||
for _, log := range collectorErrs {
|
||||
if log == "" || strings.Contains(log, "featuregate.go") {
|
||||
continue
|
||||
}
|
||||
collectorWarnAndErrorLogs = append(collectorWarnAndErrorLogs, log)
|
||||
}
|
||||
|
||||
return outputSpans, collectorWarnAndErrorLogs, nil
|
||||
}
|
||||
|
||||
// SpansToPTraces packs each input span into its own ptrace.Traces with one
|
||||
// ResourceSpans / ScopeSpans / Span carrying its attribute and resource maps.
|
||||
func SpansToPTraces(spans []SpanMapperTestSpan) []ptrace.Traces {
|
||||
result := make([]ptrace.Traces, 0, len(spans))
|
||||
for _, s := range spans {
|
||||
td := ptrace.NewTraces()
|
||||
rs := td.ResourceSpans().AppendEmpty()
|
||||
if s.Resource != nil {
|
||||
_ = rs.Resource().Attributes().FromRaw(s.Resource)
|
||||
}
|
||||
sl := rs.ScopeSpans().AppendEmpty()
|
||||
span := sl.Spans().AppendEmpty()
|
||||
if s.Attributes != nil {
|
||||
_ = span.Attributes().FromRaw(s.Attributes)
|
||||
}
|
||||
result = append(result, td)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// PTracesToSpans flattens simulator output back into SpanMapperTestSpan: one
|
||||
// entry per individual Span across all ResourceSpans / ScopeSpans.
|
||||
func PTracesToSpans(traces []ptrace.Traces) []SpanMapperTestSpan {
|
||||
result := []SpanMapperTestSpan{}
|
||||
for _, td := range traces {
|
||||
rss := td.ResourceSpans()
|
||||
for i := 0; i < rss.Len(); i++ {
|
||||
rs := rss.At(i)
|
||||
resourceAttrs := rs.Resource().Attributes().AsRaw()
|
||||
ilss := rs.ScopeSpans()
|
||||
for j := 0; j < ilss.Len(); j++ {
|
||||
spans := ilss.At(j).Spans()
|
||||
for k := 0; k < spans.Len(); k++ {
|
||||
result = append(result, SpanMapperTestSpan{
|
||||
Attributes: spans.At(k).Attributes().AsRaw(),
|
||||
Resource: resourceAttrs,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// wireSpanMapperIntoTracesPipeline appends "signozspanmapper" to
|
||||
// service.pipelines.traces.processors so the processor defined by
|
||||
// GenerateCollectorConfigWithSpanMapperProcessor actually runs against the
|
||||
// traces flowing through the simulator. Idempotent: skips appending if the
|
||||
// processor name is already present.
|
||||
func wireSpanMapperIntoTracesPipeline(confYaml []byte) ([]byte, error) {
|
||||
var conf map[string]any
|
||||
if err := yaml.Unmarshal(confYaml, &conf); err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInvalidInput, ErrCodeInvalidCollectorConfig, "failed to unmarshal collector config for pipeline wiring")
|
||||
}
|
||||
service, _ := conf["service"].(map[string]any)
|
||||
if service == nil {
|
||||
return confYaml, nil
|
||||
}
|
||||
pipelines, _ := service["pipelines"].(map[string]any)
|
||||
if pipelines == nil {
|
||||
return confYaml, nil
|
||||
}
|
||||
traces, _ := pipelines["traces"].(map[string]any)
|
||||
if traces == nil {
|
||||
return confYaml, nil
|
||||
}
|
||||
|
||||
procs, _ := traces["processors"].([]any)
|
||||
for _, p := range procs {
|
||||
if name, ok := p.(string); ok && name == ProcessorName {
|
||||
return confYaml, nil
|
||||
}
|
||||
}
|
||||
traces["processors"] = append(procs, ProcessorName)
|
||||
|
||||
out, err := yaml.Marshal(conf)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, ErrCodeBuildMappingProcessorConfig, "failed to marshal collector config after pipeline wiring")
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func filterEnabledGroupsWithMappers(groups []*SpanMapperGroupWithMappers) []*SpanMapperGroupWithMappers {
|
||||
out := make([]*SpanMapperGroupWithMappers, 0, len(groups))
|
||||
for _, gm := range groups {
|
||||
if gm == nil || gm.Group == nil || !gm.Group.Enabled {
|
||||
continue
|
||||
}
|
||||
enabled := make([]*SpanMapper, 0, len(gm.Mappers))
|
||||
for _, m := range gm.Mappers {
|
||||
if m != nil && m.Enabled {
|
||||
enabled = append(enabled, m)
|
||||
}
|
||||
}
|
||||
if len(enabled) > 0 {
|
||||
out = append(out, &SpanMapperGroupWithMappers{Group: gm.Group, Mappers: enabled})
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
43
pkg/types/spantypes/spanmappersimulator_test.go
Normal file
43
pkg/types/spantypes/spanmappersimulator_test.go
Normal file
@@ -0,0 +1,43 @@
|
||||
package spantypes
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TestSimulateSpanMappersProcessing_EndToEnd is an integration test: it spins
|
||||
// up an actual in-memory otel-collector pipeline with signozspanmapperprocessor
|
||||
// and verifies the produced span has the expected target attribute.
|
||||
func TestSimulateSpanMappersProcessing_EndToEnd(t *testing.T) {
|
||||
groups := []*SpanMapperGroupWithMappers{{
|
||||
Group: &SpanMapperGroup{
|
||||
Name: "llm",
|
||||
Condition: SpanMapperGroupCondition{Attributes: []string{"model"}},
|
||||
Enabled: true,
|
||||
},
|
||||
Mappers: []*SpanMapper{{
|
||||
Name: "gen_ai.request.model",
|
||||
FieldContext: FieldContextSpanAttribute,
|
||||
Config: SpanMapperConfig{Sources: []SpanMapperSource{
|
||||
{Key: "llm.model", Context: FieldContextSpanAttribute, Operation: SpanMapperOperationCopy, Priority: 1},
|
||||
}},
|
||||
Enabled: true,
|
||||
}},
|
||||
}}
|
||||
|
||||
spans := []SpanMapperTestSpan{{Attributes: map[string]any{"llm.model": "gpt-4"}}}
|
||||
|
||||
out, _, err := SimulateSpanMappersProcessing(context.Background(), groups, spans)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Len(t, out, 1)
|
||||
assert.Equal(t, "gpt-4", out[0].Attributes["gen_ai.request.model"], "target attribute must be populated by the spanmapper processor")
|
||||
// Source attribute is preserved (copy, not move).
|
||||
assert.Equal(t, "gpt-4", out[0].Attributes["llm.model"])
|
||||
// Order-tracking attribute is stripped from the output.
|
||||
_, hasOrderAttr := out[0].Attributes[spanInputOrderAttr]
|
||||
assert.False(t, hasOrderAttr, "internal ordering attribute must be removed from the response")
|
||||
}
|
||||
53
pkg/types/spantypes/spanmappertest.go
Normal file
53
pkg/types/spantypes/spanmappertest.go
Normal file
@@ -0,0 +1,53 @@
|
||||
package spantypes
|
||||
|
||||
import (
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
type SpanMapperTestSpan struct {
|
||||
Attributes map[string]any `json:"attributes"`
|
||||
Resource map[string]any `json:"resource"`
|
||||
}
|
||||
|
||||
// Mappers is optional because the module can backfill from the store by Group.Name.
|
||||
type PostableSpanMapperTestGroup struct {
|
||||
PostableSpanMapperGroup
|
||||
Mappers []PostableSpanMapper `json:"mappers"`
|
||||
}
|
||||
|
||||
type PostableSpanMapperTest struct {
|
||||
Spans []SpanMapperTestSpan `json:"spans" required:"true"`
|
||||
Groups []PostableSpanMapperTestGroup `json:"groups" required:"true"`
|
||||
}
|
||||
|
||||
type GettableSpanMapperTest struct {
|
||||
Spans []SpanMapperTestSpan `json:"spans" required:"true"`
|
||||
}
|
||||
|
||||
func NewSpanMapperGroupsWithMappersFromPostable(orgID valuer.UUID, in []PostableSpanMapperTestGroup) []*SpanMapperGroupWithMappers {
|
||||
out := make([]*SpanMapperGroupWithMappers, 0, len(in))
|
||||
for _, pg := range in {
|
||||
var mappers []*SpanMapper
|
||||
if pg.Mappers != nil {
|
||||
mappers = make([]*SpanMapper, 0, len(pg.Mappers))
|
||||
for _, pm := range pg.Mappers {
|
||||
mappers = append(mappers, &SpanMapper{
|
||||
Name: pm.Name,
|
||||
FieldContext: pm.FieldContext,
|
||||
Config: pm.Config,
|
||||
Enabled: pm.Enabled,
|
||||
})
|
||||
}
|
||||
}
|
||||
out = append(out, &SpanMapperGroupWithMappers{
|
||||
Group: &SpanMapperGroup{
|
||||
OrgID: orgID,
|
||||
Name: pg.Name,
|
||||
Condition: pg.Condition,
|
||||
Enabled: pg.Enabled,
|
||||
},
|
||||
Mappers: mappers,
|
||||
})
|
||||
}
|
||||
return out
|
||||
}
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/SigNoz/signoz-otel-collector/exporter/jsontypeexporter"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
@@ -22,7 +21,7 @@ const (
|
||||
// BodyJSONStringSearchPrefix is the prefix used for body JSON search queries.
|
||||
// e.g., "body.status" where "body." is the prefix.
|
||||
BodyJSONStringSearchPrefix = "body."
|
||||
ArraySep = jsontypeexporter.ArraySeparator
|
||||
ArraySep = "[]."
|
||||
ArraySepSuffix = "[]"
|
||||
// TODO(Piyush): Remove once we've migrated to the new array syntax.
|
||||
ArrayAnyIndex = "[*]."
|
||||
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
"slices"
|
||||
"strings"
|
||||
|
||||
"github.com/SigNoz/signoz-otel-collector/exporter/jsontypeexporter"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
@@ -76,7 +75,7 @@ func (n *JSONAccessNode) Alias() string {
|
||||
parentAlias := strings.TrimLeft(n.Parent.Alias(), "`")
|
||||
parentAlias = strings.TrimRight(parentAlias, "`")
|
||||
|
||||
sep := jsontypeexporter.ArraySeparator
|
||||
sep := "[]."
|
||||
if n.Parent.isRoot {
|
||||
sep = "."
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user