Compare commits

..

2 Commits

Author SHA1 Message Date
nityanandagohain
2a50f46387 Merge remote-tracking branch 'origin/main' into feat/test_endpoint_attributemapping 2026-06-17 21:27:15 +05:30
Pradeep Kumar
0e4f5d1c4e feat(spanmapper): add preview endpoint for span attribute mapping
Adds POST /api/v1/span_mapper_groups/preview so users can see what their
attribute mappings will do to a span before/while configuring them.

You send a sample input either a set of attributes, a single OTLP
span, or a full OTLP trace and get back the transformed result in the
same form. By default it runs against all the org's enabled mappings;
pass groupId to scope it to one group.

The actual signozspanmapper collector processor not merged yet
https://github.com/SigNoz/signoz-otel-collector/pull/796
2026-06-15 16:39:01 +05:30
14 changed files with 705 additions and 106 deletions

View File

@@ -7240,6 +7240,41 @@ components:
- operation
- priority
type: object
SpantypesSpanMappingPreviewGroup:
properties:
group:
$ref: '#/components/schemas/SpantypesPostableSpanMapperGroup'
mappers:
items:
$ref: '#/components/schemas/SpantypesPostableSpanMapper'
nullable: true
type: array
required:
- group
- mappers
type: object
SpantypesSpanMappingPreviewRequest:
properties:
groupId:
nullable: true
type: string
groups:
items:
$ref: '#/components/schemas/SpantypesSpanMappingPreviewGroup'
nullable: true
type: array
span:
additionalProperties: {}
nullable: true
type: object
type: object
SpantypesSpanMappingPreviewResponse:
properties:
span:
additionalProperties: {}
nullable: true
type: object
type: object
SpantypesUpdatableSpanMapper:
properties:
config:
@@ -12797,6 +12832,69 @@ paths:
summary: Update a span mapper
tags:
- spanmapper
/api/v1/span_mapper_groups/preview:
post:
deprecated: false
description: Previews how attribute mappings would transform a sample span.
operationId: PreviewSpanMapping
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/SpantypesSpanMappingPreviewRequest'
responses:
"200":
content:
application/json:
schema:
properties:
data:
$ref: '#/components/schemas/SpantypesSpanMappingPreviewResponse'
status:
type: string
required:
- status
- data
type: object
description: OK
"400":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Bad Request
"401":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unauthorized
"403":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Forbidden
"404":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Not Found
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Internal Server Error
security:
- api_key:
- VIEWER
- tokenizer:
- VIEWER
summary: Preview span attribute mapping against a sample span
tags:
- spanmapper
/api/v1/stats:
get:
deprecated: false

View File

@@ -8492,6 +8492,56 @@ export interface SpantypesSpanMapperDTO {
updatedBy?: string;
}
export interface SpantypesSpanMappingPreviewGroupDTO {
group: SpantypesPostableSpanMapperGroupDTO;
/**
* @type array,null
*/
mappers: SpantypesPostableSpanMapperDTO[] | null;
}
export type SpantypesSpanMappingPreviewRequestDTOSpanAnyOf = {
[key: string]: unknown;
};
/**
* @nullable
*/
export type SpantypesSpanMappingPreviewRequestDTOSpan =
SpantypesSpanMappingPreviewRequestDTOSpanAnyOf | null;
export interface SpantypesSpanMappingPreviewRequestDTO {
/**
* @type string,null
*/
groupId?: string | null;
/**
* @type array,null
*/
groups?: SpantypesSpanMappingPreviewGroupDTO[] | null;
/**
* @type object,null
*/
span?: SpantypesSpanMappingPreviewRequestDTOSpan;
}
export type SpantypesSpanMappingPreviewResponseDTOSpanAnyOf = {
[key: string]: unknown;
};
/**
* @nullable
*/
export type SpantypesSpanMappingPreviewResponseDTOSpan =
SpantypesSpanMappingPreviewResponseDTOSpanAnyOf | null;
export interface SpantypesSpanMappingPreviewResponseDTO {
/**
* @type object,null
*/
span?: SpantypesSpanMappingPreviewResponseDTOSpan;
}
export interface SpantypesUpdatableSpanMapperDTO {
config?: SpantypesSpanMapperConfigDTO;
/**
@@ -9798,6 +9848,14 @@ export type UpdateSpanMapperPathParameters = {
groupId: string;
mapperId: string;
};
export type PreviewSpanMapping200 = {
data: SpantypesSpanMappingPreviewResponseDTO;
/**
* @type string
*/
status: string;
};
export type GetStats200Data = { [key: string]: unknown };
export type GetStats200 = {

View File

@@ -27,9 +27,11 @@ import type {
ListSpanMapperGroupsParams,
ListSpanMappers200,
ListSpanMappersPathParameters,
PreviewSpanMapping200,
RenderErrorResponseDTO,
SpantypesPostableSpanMapperDTO,
SpantypesPostableSpanMapperGroupDTO,
SpantypesSpanMappingPreviewRequestDTO,
SpantypesUpdatableSpanMapperDTO,
SpantypesUpdatableSpanMapperGroupDTO,
UpdateSpanMapperGroupPathParameters,
@@ -780,3 +782,86 @@ export const useUpdateSpanMapper = <
> => {
return useMutation(getUpdateSpanMapperMutationOptions(options));
};
/**
* Previews how attribute mappings would transform a sample span.
* @summary Preview span attribute mapping against a sample span
*/
export const previewSpanMapping = (
spantypesSpanMappingPreviewRequestDTO?: BodyType<SpantypesSpanMappingPreviewRequestDTO>,
signal?: AbortSignal,
) => {
return GeneratedAPIInstance<PreviewSpanMapping200>({
url: `/api/v1/span_mapper_groups/preview`,
method: 'POST',
headers: { 'Content-Type': 'application/json' },
data: spantypesSpanMappingPreviewRequestDTO,
signal,
});
};
export const getPreviewSpanMappingMutationOptions = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof previewSpanMapping>>,
TError,
{ data?: BodyType<SpantypesSpanMappingPreviewRequestDTO> },
TContext
>;
}): UseMutationOptions<
Awaited<ReturnType<typeof previewSpanMapping>>,
TError,
{ data?: BodyType<SpantypesSpanMappingPreviewRequestDTO> },
TContext
> => {
const mutationKey = ['previewSpanMapping'];
const { mutation: mutationOptions } = options
? options.mutation &&
'mutationKey' in options.mutation &&
options.mutation.mutationKey
? options
: { ...options, mutation: { ...options.mutation, mutationKey } }
: { mutation: { mutationKey } };
const mutationFn: MutationFunction<
Awaited<ReturnType<typeof previewSpanMapping>>,
{ data?: BodyType<SpantypesSpanMappingPreviewRequestDTO> }
> = (props) => {
const { data } = props ?? {};
return previewSpanMapping(data);
};
return { mutationFn, ...mutationOptions };
};
export type PreviewSpanMappingMutationResult = NonNullable<
Awaited<ReturnType<typeof previewSpanMapping>>
>;
export type PreviewSpanMappingMutationBody =
| BodyType<SpantypesSpanMappingPreviewRequestDTO>
| undefined;
export type PreviewSpanMappingMutationError = ErrorType<RenderErrorResponseDTO>;
/**
* @summary Preview span attribute mapping against a sample span
*/
export const usePreviewSpanMapping = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof previewSpanMapping>>,
TError,
{ data?: BodyType<SpantypesSpanMappingPreviewRequestDTO> },
TContext
>;
}): UseMutationResult<
Awaited<ReturnType<typeof previewSpanMapping>>,
TError,
{ data?: BodyType<SpantypesSpanMappingPreviewRequestDTO> },
TContext
> => {
return useMutation(getPreviewSpanMappingMutationOptions(options));
};

View File

@@ -51,6 +51,26 @@ func (provider *provider) addSpanMapperRoutes(router *mux.Router) error {
return err
}
if err := router.Handle("/api/v1/span_mapper_groups/preview", handler.New(
provider.authzMiddleware.ViewAccess(provider.spanMapperHandler.PreviewMapping),
handler.OpenAPIDef{
ID: "PreviewSpanMapping",
Tags: []string{"spanmapper"},
Summary: "Preview span attribute mapping against a sample span",
Description: "Previews how attribute mappings would transform a sample span.",
Request: new(spantypes.SpanMappingPreviewRequest),
RequestContentType: "application/json",
Response: new(spantypes.SpanMappingPreviewResponse),
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusOK,
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound},
Deprecated: false,
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
},
)).Methods(http.MethodPost).GetError(); err != nil {
return err
}
if err := router.Handle("/api/v1/span_mapper_groups/{groupId}", handler.New(
provider.authzMiddleware.AdminAccess(provider.spanMapperHandler.UpdateGroup),
handler.OpenAPIDef{

View File

@@ -273,6 +273,35 @@ func (h *handler) DeleteMapper(rw http.ResponseWriter, r *http.Request) {
render.Success(rw, http.StatusNoContent, nil)
}
// PreviewMapping handles POST /api/v1/span_mapper_groups/preview.
// used to get preview of attributes/resources after remapping.
func (h *handler) PreviewMapping(rw http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()
claims, err := authtypes.ClaimsFromContext(ctx)
if err != nil {
render.Error(rw, err)
return
}
orgID := valuer.MustNewUUID(claims.OrgID)
req := new(spantypes.SpanMappingPreviewRequest)
if err := binding.JSON.BindBody(r.Body, req); err != nil {
render.Error(rw, err)
return
}
result, err := h.module.PreviewMapping(ctx, orgID, req)
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusOK, result)
}
// groupIDFromPath extracts and validates the {id} or {groupId} path variable.
func groupIDFromPath(r *http.Request) (valuer.UUID, error) {
vars := mux.Vars(r)

View File

@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/modules/spanmapper"
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
@@ -102,6 +103,97 @@ func (module *module) DeleteMapper(ctx context.Context, orgID, groupID, id value
return nil
}
// PreviewMapping resolves the mappings to preview (from the request body, a
// saved group, or all enabled saved mappings) and returns the input span with
// its "attributes" and "resource" maps transformed.
func (module *module) PreviewMapping(ctx context.Context, orgID valuer.UUID, req *spantypes.SpanMappingPreviewRequest) (*spantypes.SpanMappingPreviewResponse, error) {
groups, err := module.resolvePreviewGroups(ctx, orgID, req)
if err != nil {
return nil, err
}
if len(req.Span) == 0 {
return nil, errors.New(errors.TypeInvalidInput, spantypes.ErrCodeMappingInvalidInput, "'span' must be provided")
}
outResource, outSpan := spantypes.SimulateMappingForAttributes(groups, spanAttrMap(req.Span["resource"]), spanAttrMap(req.Span["attributes"]))
result := make(map[string]any, len(req.Span))
for k, v := range req.Span {
result[k] = v
}
setAttrMap(result, "attributes", req.Span, outSpan)
setAttrMap(result, "resource", req.Span, outResource)
return &spantypes.SpanMappingPreviewResponse{Span: result}, nil
}
func spanAttrMap(v any) map[string]any {
if m, ok := v.(map[string]any); ok {
return m
}
return nil
}
func setAttrMap(dst map[string]any, key string, in map[string]any, transformed map[string]any) {
if _, present := in[key]; present || len(transformed) > 0 {
dst[key] = transformed
}
}
// resolvePreviewGroups picks the mappings to preview against: the groups in the
// request body, else a specific saved group (GroupID), else all of the org's
// enabled saved mappings.
func (module *module) resolvePreviewGroups(ctx context.Context, orgID valuer.UUID, req *spantypes.SpanMappingPreviewRequest) ([]*spantypes.SpanMapperGroupWithMappers, error) {
hasGroups := len(req.Groups) > 0
hasGroupID := req.GroupID != nil && *req.GroupID != ""
if hasGroups && hasGroupID {
return nil, errors.New(errors.TypeInvalidInput, spantypes.ErrCodeMappingInvalidInput, "provide either 'groups' or 'groupId', not both")
}
if hasGroups {
groups := make([]*spantypes.SpanMapperGroupWithMappers, 0, len(req.Groups))
for _, spec := range req.Groups {
group := &spantypes.SpanMapperGroup{
OrgID: orgID,
Name: spec.Group.Name,
Condition: spec.Group.Condition,
Enabled: spec.Group.Enabled,
}
mappers := make([]*spantypes.SpanMapper, 0, len(spec.Mappers))
for _, pm := range spec.Mappers {
mappers = append(mappers, &spantypes.SpanMapper{
Name: pm.Name,
FieldContext: pm.FieldContext,
Config: pm.Config,
Enabled: pm.Enabled,
})
}
groups = append(groups, &spantypes.SpanMapperGroupWithMappers{Group: group, Mappers: mappers})
}
return groups, nil
}
if hasGroupID {
id, err := valuer.NewUUID(*req.GroupID)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInvalidInput, spantypes.ErrCodeMappingInvalidInput, "group id is not a valid uuid")
}
group, err := module.store.GetGroup(ctx, orgID, id)
if err != nil {
return nil, err
}
mappers, err := module.store.ListMappers(ctx, orgID, id)
if err != nil {
return nil, err
}
return []*spantypes.SpanMapperGroupWithMappers{{Group: group, Mappers: mappers}}, nil
}
return module.listEnabledGroupsWithMappers(ctx, orgID)
}
func (module *module) AgentFeatureType() agentConf.AgentFeatureType {
return spantypes.SpanAttrMappingFeatureType
}

View File

@@ -27,6 +27,7 @@ type Module interface {
CreateMapper(ctx context.Context, orgID, groupID valuer.UUID, mapper *spantypes.SpanMapper) error
UpdateMapper(ctx context.Context, orgID, groupID, id valuer.UUID, fieldContext spantypes.FieldContext, config *spantypes.SpanMapperConfig, enabled *bool, updatedBy string) error
DeleteMapper(ctx context.Context, orgID, groupID, id valuer.UUID) error
PreviewMapping(ctx context.Context, orgID valuer.UUID, req *spantypes.SpanMappingPreviewRequest) (*spantypes.SpanMappingPreviewResponse, error)
}
// Handler defines the HTTP handler interface for mapping group and mapper endpoints.
@@ -42,4 +43,5 @@ type Handler interface {
CreateMapper(rw http.ResponseWriter, r *http.Request)
UpdateMapper(rw http.ResponseWriter, r *http.Request)
DeleteMapper(rw http.ResponseWriter, r *http.Request)
PreviewMapping(rw http.ResponseWriter, r *http.Request)
}

View File

@@ -119,7 +119,11 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
queries := make(map[string]qbtypes.Query)
steps := make(map[string]qbtypes.Step)
missingMetricQueries, metricWarnings, err := q.resolveMetricMetadata(ctx, req.CompositeQuery.Queries, req.Start, req.End)
// Resolve metric metadata once per request: patches each metric-aggregation
// query's spec in place, returns the queries whose every aggregation was
// missing (used for preseeded empty results), and any dormant-metric
// warning string. NotFound errors for never-seen metrics are propagated.
missingMetricQueries, dormantMetricsWarningMsg, err := q.resolveMetricMetadata(ctx, req.CompositeQuery.Queries, req.Start, req.End)
if err != nil {
return nil, err
}
@@ -236,15 +240,13 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
}
}
}
if len(metricWarnings) > 0 {
if dormantMetricsWarningMsg != "" {
if qbResp.Warning == nil {
qbResp.Warning = &qbtypes.QueryWarnData{}
}
for _, w := range metricWarnings {
qbResp.Warning.Warnings = append(qbResp.Warning.Warnings, qbtypes.QueryWarnDataAdditional{
Message: w,
})
}
qbResp.Warning.Warnings = append(qbResp.Warning.Warnings, qbtypes.QueryWarnDataAdditional{
Message: dormantMetricsWarningMsg,
})
}
}
return qbResp, qbErr
@@ -300,11 +302,12 @@ func (q *querier) populateQBEvent(event *qbtypes.QBEvent, queries []qbtypes.Quer
// - missingMetricQueries: names of queries whose every aggregation was
// missing. Used downstream to preseed empty result placeholders so the
// response still has an entry per requested query name.
// - metricWarnings: human-readable warnings for metrics that could not be
// resolved: never-seen metrics and dormant metrics (seen but no data in
// the query window).
// - err: Internal when a metadata fetch fails.
func (q *querier) resolveMetricMetadata(ctx context.Context, queries []qbtypes.QueryEnvelope, start, end uint64) (missingMetricQueries []string, metricWarnings []string, err error) {
// - dormantWarning: a human-readable warning describing metrics that exist in
// the store but produced no data within the query window. Empty when no
// such metrics are present.
// - err: NotFound when one or more referenced metrics have never been seen,
// or Internal when a metadata fetch fails.
func (q *querier) resolveMetricMetadata(ctx context.Context, queries []qbtypes.QueryEnvelope, start, end uint64) (missingMetricQueries []string, dormantWarning string, err error) {
metricNames := make([]string, 0)
for idx := range queries {
if queries[idx].Type != qbtypes.QueryTypeBuilder {
@@ -322,13 +325,13 @@ func (q *querier) resolveMetricMetadata(ctx context.Context, queries []qbtypes.Q
}
if len(metricNames) == 0 {
return nil, nil, nil
return nil, "", nil
}
metricTemporality, metricTypes, err := q.metadataStore.FetchTemporalityAndTypeMulti(ctx, start, end, metricNames...)
if err != nil {
q.logger.WarnContext(ctx, "failed to fetch metric temporality", errors.Attr(err), slog.Any("metrics", metricNames))
return nil, nil, errors.NewInternalf(errors.CodeInternal, "failed to fetch metrics temporality")
return nil, "", errors.NewInternalf(errors.CodeInternal, "failed to fetch metrics temporality")
}
q.logger.DebugContext(ctx, "fetched metric temporalities and types", slog.Any("metric_temporality", metricTemporality), slog.Any("metric_types", metricTypes))
@@ -360,7 +363,7 @@ func (q *querier) resolveMetricMetadata(ctx context.Context, queries []qbtypes.Q
}
// Type is resolved now; validate aggregation compatibility against it.
if err := spec.Aggregations[i].ValidateForType(); err != nil {
return nil, nil, err
return nil, "", err
}
presentAggregations = append(presentAggregations, spec.Aggregations[i])
}
@@ -373,7 +376,7 @@ func (q *querier) resolveMetricMetadata(ctx context.Context, queries []qbtypes.Q
}
if len(missingMetrics) == 0 {
return missingMetricQueries, nil, nil
return missingMetricQueries, "", nil
}
isInternalMetric := func(n string) bool { return strings.HasPrefix(n, "signoz.") || strings.HasPrefix(n, "signoz_") }
@@ -384,33 +387,29 @@ func (q *querier) resolveMetricMetadata(ctx context.Context, queries []qbtypes.Q
}
}
if len(externalMissingMetrics) == 0 {
return missingMetricQueries, nil, nil
// this means all missing metrics are internal, and since internal metrics
// aren't user-controlled, skip errors/warnings for them since users can't act on them
return missingMetricQueries, "", nil
}
// Classify each missing metric: never-seen -> warning with empty result;
// seen-but-no-data-in-window -> dormant warning.
// Classify each missing metric: never-seen → NotFound error; seen-but-no-
// data-in-window dormant warning.
lastSeenInfo, _ := q.metadataStore.FetchLastSeenInfoMulti(ctx, externalMissingMetrics...)
var nonExistentMetrics []string
var dormantMetrics []string
nonExistentMetrics := []string{}
for _, name := range externalMissingMetrics {
if ts, ok := lastSeenInfo[name]; ok && ts > 0 {
dormantMetrics = append(dormantMetrics, name)
continue
}
nonExistentMetrics = append(nonExistentMetrics, name)
}
var warnings []string
// Never-seen metrics: the query already gets a preseeded empty result
// via the aggregation-dropping path above; we just attach a warning.
if len(nonExistentMetrics) == 1 {
warnings = append(warnings, fmt.Sprintf("metric %s has never been received. Check the metric name and instrumentation", nonExistentMetrics[0]))
} else if len(nonExistentMetrics) > 1 {
warnings = append(warnings, fmt.Sprintf("the following metrics have never been received. Check the metric names and instrumentation: %s", strings.Join(nonExistentMetrics, ", ")))
return nil, "", errors.NewNotFoundf(errors.CodeNotFound, "could not find the metric %s", nonExistentMetrics[0])
}
if len(nonExistentMetrics) > 1 {
return nil, "", errors.NewNotFoundf(errors.CodeNotFound, "the following metrics were not found: %s", strings.Join(nonExistentMetrics, ", "))
}
// Dormant metrics: seen before but no data in the query window.
// All missing metrics are dormant — assemble the warning string.
lastSeenStr := func(name string) string {
if ts, ok := lastSeenInfo[name]; ok && ts > 0 {
ago := humanize.RelTime(time.UnixMilli(ts), time.Now(), "ago", "from now")
@@ -418,16 +417,16 @@ func (q *querier) resolveMetricMetadata(ctx context.Context, queries []qbtypes.Q
}
return name
}
if len(dormantMetrics) == 1 {
warnings = append(warnings, fmt.Sprintf("no data found for the metric %s in the query time range", lastSeenStr(dormantMetrics[0])))
} else if len(dormantMetrics) > 1 {
parts := make([]string, len(dormantMetrics))
for i, m := range dormantMetrics {
if len(externalMissingMetrics) == 1 {
dormantWarning = fmt.Sprintf("no data found for the metric %s in the query time range", lastSeenStr(missingMetrics[0]))
} else {
parts := make([]string, len(externalMissingMetrics))
for i, m := range externalMissingMetrics {
parts[i] = lastSeenStr(m)
}
warnings = append(warnings, fmt.Sprintf("no data found for the following metrics in the query time range: %s", strings.Join(parts, ", ")))
dormantWarning = fmt.Sprintf("no data found for the following metrics in the query time range: %s", strings.Join(parts, ", "))
}
return missingMetricQueries, warnings, nil
return missingMetricQueries, dormantWarning, nil
}
func (q *querier) QueryRawStream(ctx context.Context, orgID valuer.UUID, req *qbtypes.QueryRangeRequest, client *qbtypes.RawStream) {

View File

@@ -37,7 +37,7 @@ func (m *mockMetricStmtBuilder) Build(_ context.Context, _, _ uint64, _ qbtypes.
func TestQueryRange_MetricTypeMissing(t *testing.T) {
// When a metric has UnspecifiedType and is not found in the metadata store,
// the querier should return an empty result with a warning instead of an error.
// the querier should return a not-found error, even if the request provides a temporality
providerSettings := instrumentationtest.New().ToProviderSettings()
metadataStore := telemetrytypestest.NewMockMetadataStore()
@@ -80,14 +80,9 @@ func TestQueryRange_MetricTypeMissing(t *testing.T) {
},
}
resp, err := q.QueryRange(context.Background(), valuer.GenerateUUID(), req)
require.NoError(t, err)
require.NotNil(t, resp)
require.NotNil(t, resp.Warning)
require.Len(t, resp.Warning.Warnings, 1)
assert.Contains(t, resp.Warning.Warnings[0].Message, "unknown_metric")
assert.Contains(t, resp.Warning.Warnings[0].Message, "has never been received")
_, err := q.QueryRange(context.Background(), valuer.GenerateUUID(), req)
require.Error(t, err)
assert.Contains(t, err.Error(), "could not find the metric unknown_metric")
}
func TestQueryRange_MetricTypeFromStore(t *testing.T) {

View File

@@ -101,29 +101,9 @@ func (b *MetricQueryStatementBuilder) Build(
return nil, err
}
var pairFallbackWarnings []string
for _, sel := range keySelectors {
if _, ok := keys[sel.Name]; !ok {
keys[sel.Name] = []*telemetrytypes.TelemetryFieldKey{{
Name: sel.Name,
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeString,
Signal: telemetrytypes.SignalMetrics,
}}
pairFallbackWarnings = append(pairFallbackWarnings,
fmt.Sprintf("key `%s` not found on metric %s", sel.Name, query.Aggregations[0].MetricName),
)
}
}
start, end = querybuilder.AdjustedMetricTimeRange(start, end, uint64(query.StepInterval.Seconds()), query)
stmt, err := b.buildPipelineStatement(ctx, start, end, query, keys, variables)
if err != nil {
return nil, err
}
stmt.Warnings = append(stmt.Warnings, pairFallbackWarnings...)
return stmt, nil
return b.buildPipelineStatement(ctx, start, end, query, keys, variables)
}
func (b *MetricQueryStatementBuilder) buildPipelineStatement(

View File

@@ -217,39 +217,6 @@ func TestStatementBuilder(t *testing.T) {
},
expectedErr: nil,
},
{
name: "test_missing_key_falls_back_to_labels",
requestType: qbtypes.RequestTypeTimeSeries,
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Signal: telemetrytypes.SignalMetrics,
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "signoz_calls_total",
Type: metrictypes.SumType,
Temporality: metrictypes.Cumulative,
TimeAggregation: metrictypes.TimeAggregationRate,
SpaceAggregation: metrictypes.SpaceAggregationSum,
},
},
Filter: &qbtypes.Filter{
Expression: "k8s.statefulset.name = 'my-statefulset'",
},
GroupBy: []qbtypes.GroupByKey{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "k8s.statefulset.name",
},
},
},
},
expected: qbtypes.Statement{
Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `k8s.statefulset.name`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `k8s.statefulset.name`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'k8s.statefulset.name') AS `k8s.statefulset.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'k8s.statefulset.name') = ? GROUP BY fingerprint, `k8s.statefulset.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `k8s.statefulset.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `k8s.statefulset.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `k8s.statefulset.name`) SELECT * FROM __spatial_aggregation_cte ORDER BY `k8s.statefulset.name`, ts",
Args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "my-statefulset", "signoz_calls_total", uint64(1747947360000), uint64(1747983420000), 0},
Warnings: []string{"key `k8s.statefulset.name` not found on metric signoz_calls_total"},
},
expectedErr: nil,
},
}
fm := NewFieldMapper()

View File

@@ -0,0 +1,117 @@
package spantypes
import (
"strings"
)
// resourceSourcePrefix marks a source that reads from resource attributes:
// buildAttributeRule prefixes those keys with "resource." (e.g. "resource.service.name").
var resourceSourcePrefix = FieldContextResource.StringValue() + "."
type SpanMappingPreviewGroup struct {
Group PostableSpanMapperGroup `json:"group" required:"true"`
Mappers []PostableSpanMapper `json:"mappers" required:"true" nullable:"true"`
}
type SpanMappingPreviewRequest struct {
Span map[string]any `json:"span" nullable:"true"`
Groups []SpanMappingPreviewGroup `json:"groups" nullable:"true"`
GroupID *string `json:"groupId" nullable:"true"`
}
type SpanMappingPreviewResponse struct {
Span map[string]any `json:"span" nullable:"true"`
}
func SimulateMappingForAttributes(groups []*SpanMapperGroupWithMappers, resourceAttrs, spanAttrs map[string]any) (outResource, outSpan map[string]any) {
cfg := buildProcessorConfig(filterEnabledGroupsWithMappers(groups))
outResource = cloneAttrs(resourceAttrs)
outSpan = cloneAttrs(spanAttrs)
applyEnabledGroups(cfg, outSpan, outResource)
return outResource, outSpan
}
func filterEnabledGroupsWithMappers(groups []*SpanMapperGroupWithMappers) []*SpanMapperGroupWithMappers {
out := make([]*SpanMapperGroupWithMappers, 0, len(groups))
for _, gm := range groups {
if gm == nil || gm.Group == nil || !gm.Group.Enabled {
continue
}
enabled := make([]*SpanMapper, 0, len(gm.Mappers))
for _, m := range gm.Mappers {
if m != nil && m.Enabled {
enabled = append(enabled, m)
}
}
if len(enabled) > 0 {
out = append(out, &SpanMapperGroupWithMappers{Group: gm.Group, Mappers: enabled})
}
}
return out
}
func cloneAttrs(in map[string]any) map[string]any {
out := make(map[string]any, len(in))
for k, v := range in {
out[k] = v
}
return out
}
// The functions below are copied from signoz-otel-collector (processor/signozspanmappingprocessor, PR #796):
// TODO(spanmapper-preview): delete them and call the real processor once that PR merges and the dependency is bumped.
func applyEnabledGroups(cfg *spanMapperProcessorConfig, spanAttrs, resourceAttrs map[string]any) {
for i := range cfg.Groups {
g := &cfg.Groups[i]
if !spanMapperConditionMet(g.ExistsAny, spanAttrs, resourceAttrs) {
continue
}
for j := range g.Attributes {
applySpanMapperRule(&g.Attributes[j], spanAttrs, resourceAttrs)
}
}
}
func spanMapperConditionMet(cond spanMapperProcessorExistsAny, spanAttrs, resourceAttrs map[string]any) bool {
return anyKeyContains(spanAttrs, cond.Attributes) || anyKeyContains(resourceAttrs, cond.Resource)
}
func anyKeyContains(attrs map[string]any, patterns []string) bool {
for k := range attrs {
for _, p := range patterns {
if strings.Contains(k, p) {
return true
}
}
}
return false
}
func applySpanMapperRule(rule *spanMapperProcessorAttribute, spanAttrs, resourceAttrs map[string]any) {
dst := spanAttrs
if rule.Context == FieldContextResource.StringValue() {
dst = resourceAttrs
}
for i := range rule.Sources {
src := &rule.Sources[i]
sourceKey, isResource := strings.CutPrefix(src.Key, resourceSourcePrefix)
from := spanAttrs
if isResource {
from = resourceAttrs
}
val, ok := from[sourceKey]
if !ok {
continue
}
dst[rule.Target] = val
if src.Action == SpanMapperOperationMove.StringValue() {
delete(from, sourceKey)
}
return
}
}

View File

@@ -0,0 +1,159 @@
package spantypes
import (
"testing"
"github.com/stretchr/testify/assert"
)
func simGroup(name string, attrCond, resCond []string, mappers ...*SpanMapper) *SpanMapperGroupWithMappers {
return &SpanMapperGroupWithMappers{
Group: &SpanMapperGroup{
Name: name,
Condition: SpanMapperGroupCondition{Attributes: attrCond, Resource: resCond},
Enabled: true,
},
Mappers: mappers,
}
}
func simMapper(target string, ctx FieldContext, sources ...SpanMapperSource) *SpanMapper {
return &SpanMapper{
Name: target,
FieldContext: ctx,
Config: SpanMapperConfig{Sources: sources},
Enabled: true,
}
}
func simAttrSrc(key string, op SpanMapperOperation, priority int) SpanMapperSource {
return SpanMapperSource{Key: key, Context: FieldContextSpanAttribute, Operation: op, Priority: priority}
}
func simResSrc(key string, op SpanMapperOperation, priority int) SpanMapperSource {
return SpanMapperSource{Key: key, Context: FieldContextResource, Operation: op, Priority: priority}
}
func TestSimulate_MatchInSpanAttrs(t *testing.T) {
groups := []*SpanMapperGroupWithMappers{
simGroup("llm", []string{"model"}, nil,
simMapper("gen_ai.request.model", FieldContextSpanAttribute,
simAttrSrc("llm.model", SpanMapperOperationCopy, 1)),
),
}
_, outSpan := SimulateMappingForAttributes(groups, nil, map[string]any{"llm.model": "gpt-4", "gen_ai.llm.model": "gpt-40"})
assert.Equal(t, "gpt-4", outSpan["gen_ai.request.model"])
}
func TestSimulate_MatchInResourceAttrs(t *testing.T) {
groups := []*SpanMapperGroupWithMappers{
simGroup("llm", nil, []string{"service.name"},
simMapper("gen_ai.request.model", FieldContextSpanAttribute,
simResSrc("service.name", SpanMapperOperationCopy, 1)),
),
}
_, outSpan := SimulateMappingForAttributes(groups, map[string]any{"service.name": "my-llm-service"}, nil)
assert.Equal(t, "my-llm-service", outSpan["gen_ai.request.model"])
}
func TestSimulate_NoMatchSkipsGroup(t *testing.T) {
groups := []*SpanMapperGroupWithMappers{
simGroup("llm", []string{"model"}, nil,
simMapper("gen_ai.request.model", FieldContextSpanAttribute,
simAttrSrc("llm.model", SpanMapperOperationCopy, 1)),
),
}
_, outSpan := SimulateMappingForAttributes(groups, nil, map[string]any{"some.other.key": "value"})
_, ok := outSpan["gen_ai.request.model"]
assert.False(t, ok, "target must not be set when condition is not met")
}
func TestSimulate_SourceFirstMatchWins(t *testing.T) {
groups := []*SpanMapperGroupWithMappers{
simGroup("tokens", []string{"llm"}, nil,
simMapper("gen_ai.request.tokens", FieldContextSpanAttribute,
simAttrSrc("gen_ai.request_tokens", SpanMapperOperationCopy, 2),
simAttrSrc("llm.tokens", SpanMapperOperationCopy, 1)),
),
}
_, outSpan := SimulateMappingForAttributes(groups, nil, map[string]any{"gen_ai.request_tokens": "100", "llm.tokens": "200"})
assert.Equal(t, "100", outSpan["gen_ai.request.tokens"])
}
func TestSimulate_SourceFallsBackToSecond(t *testing.T) {
groups := []*SpanMapperGroupWithMappers{
simGroup("tokens", []string{"llm"}, nil,
simMapper("gen_ai.request.tokens", FieldContextSpanAttribute,
simAttrSrc("gen_ai.request_tokens", SpanMapperOperationCopy, 2),
simAttrSrc("llm.tokens", SpanMapperOperationCopy, 1)),
),
}
_, outSpan := SimulateMappingForAttributes(groups, nil, map[string]any{"llm.tokens": "200"})
assert.Equal(t, "200", outSpan["gen_ai.request.tokens"])
}
func TestSimulate_ActionMove(t *testing.T) {
groups := []*SpanMapperGroupWithMappers{
simGroup("input", []string{"gen_ai"}, nil,
simMapper("gen_ai.request.input", FieldContextSpanAttribute,
simAttrSrc("gen_ai.input", SpanMapperOperationMove, 1)),
),
}
_, outSpan := SimulateMappingForAttributes(groups, nil, map[string]any{"gen_ai.input": "hello"})
assert.Equal(t, "hello", outSpan["gen_ai.request.input"])
_, srcPresent := outSpan["gen_ai.input"]
assert.False(t, srcPresent, "source key must be deleted when action=move")
}
func TestSimulate_WriteToResourceContext(t *testing.T) {
groups := []*SpanMapperGroupWithMappers{
simGroup("llm", []string{"llm"}, nil,
simMapper("gen_ai.request.model", FieldContextResource,
simAttrSrc("llm.model", SpanMapperOperationCopy, 1)),
),
}
outResource, outSpan := SimulateMappingForAttributes(groups, nil, map[string]any{"llm.model": "gpt-4"})
assert.Equal(t, "gpt-4", outResource["gen_ai.request.model"], "target must be written to resource attributes")
_, inSpan := outSpan["gen_ai.request.model"]
assert.False(t, inSpan)
}
func TestSimulate_DisabledGroupsAndMappersSkipped(t *testing.T) {
disabledGroup := simGroup("g1", []string{"llm"}, nil,
simMapper("gen_ai.request.model", FieldContextSpanAttribute,
simAttrSrc("llm.model", SpanMapperOperationCopy, 1)))
disabledGroup.Group.Enabled = false
_, outSpan := SimulateMappingForAttributes([]*SpanMapperGroupWithMappers{disabledGroup}, nil, map[string]any{"llm.model": "gpt-4"})
_, ok := outSpan["gen_ai.request.model"]
assert.False(t, ok, "disabled groups must not be evaluated")
}
func TestSimulate_NoMappingsReturnsInputUnchanged(t *testing.T) {
outResource, outSpan := SimulateMappingForAttributes(nil, map[string]any{"host.name": "h1"}, map[string]any{"model": "gpt-5"})
assert.Equal(t, map[string]any{"host.name": "h1"}, outResource, "resource attributes returned unchanged")
assert.Equal(t, map[string]any{"model": "gpt-5"}, outSpan, "span attributes returned unchanged")
}
func TestSimulate_DoesNotMutateInput(t *testing.T) {
input := map[string]any{"gen_ai.input": "hi"}
groups := []*SpanMapperGroupWithMappers{
simGroup("input", []string{"gen_ai"}, nil,
simMapper("gen_ai.request.input", FieldContextSpanAttribute,
simAttrSrc("gen_ai.input", SpanMapperOperationMove, 1))),
}
_, _ = SimulateMappingForAttributes(groups, nil, input)
// Original input map must be untouched (move would have deleted the key).
_, ok := input["gen_ai.input"]
assert.True(t, ok, "input map must not be mutated by the preview")
}

View File

@@ -614,7 +614,7 @@ def test_histogram_p90_returns_warning_outside_data_window(
assert warnings[0]["message"].startswith(f"no data found for the metric {metric_name}")
def test_non_existent_metrics_returns_warning(
def test_non_existent_metrics_returns_404(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
@@ -635,11 +635,9 @@ def test_non_existent_metrics_returns_warning(
start_2h = int((now - timedelta(hours=2)).timestamp() * 1000)
response = make_query_request(signoz, token, start_2h, end_ms, [query])
assert response.status_code == HTTPStatus.OK
assert response.status_code == HTTPStatus.NOT_FOUND
data = response.json()
warnings = get_all_warnings(data)
assert any("whatevergoennnsgoeshere" in w["message"] and "has never been received" in w["message"] for w in warnings), f"expected never-seen metric warning, got: {warnings}"
assert get_error_message(response.json()) == "could not find the metric whatevergoennnsgoeshere"
def test_non_existent_internal_metrics_returns_no_warning(