mirror of
https://github.com/SigNoz/signoz.git
synced 2026-06-12 11:50:28 +01:00
Compare commits
1 Commits
nv/functio
...
feat/test_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
39e64973aa |
@@ -7185,6 +7185,39 @@ components:
|
||||
- operation
|
||||
- priority
|
||||
type: object
|
||||
SpantypesSpanMappingPreviewRequest:
|
||||
properties:
|
||||
attributes:
|
||||
$ref: '#/components/schemas/SpantypesSpanMappingPreviewSpan'
|
||||
groupId:
|
||||
nullable: true
|
||||
type: string
|
||||
otlpTraces:
|
||||
additionalProperties: {}
|
||||
nullable: true
|
||||
type: object
|
||||
type: object
|
||||
SpantypesSpanMappingPreviewResponse:
|
||||
properties:
|
||||
attributes:
|
||||
$ref: '#/components/schemas/SpantypesSpanMappingPreviewSpan'
|
||||
otlpTraces:
|
||||
additionalProperties: {}
|
||||
nullable: true
|
||||
type: object
|
||||
type: object
|
||||
SpantypesSpanMappingPreviewSpan:
|
||||
nullable: true
|
||||
properties:
|
||||
resourceAttributes:
|
||||
additionalProperties: {}
|
||||
nullable: true
|
||||
type: object
|
||||
spanAttributes:
|
||||
additionalProperties: {}
|
||||
nullable: true
|
||||
type: object
|
||||
type: object
|
||||
SpantypesUpdatableSpanMapper:
|
||||
properties:
|
||||
config:
|
||||
@@ -12790,6 +12823,70 @@ paths:
|
||||
summary: Update a span mapper
|
||||
tags:
|
||||
- spanmapper
|
||||
/api/v1/span_mapper_groups/preview:
|
||||
post:
|
||||
deprecated: false
|
||||
description: Previews how the org's saved attribute mappings would transform
|
||||
a sample span.
|
||||
operationId: PreviewSpanMapping
|
||||
requestBody:
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/SpantypesSpanMappingPreviewRequest'
|
||||
responses:
|
||||
"200":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
properties:
|
||||
data:
|
||||
$ref: '#/components/schemas/SpantypesSpanMappingPreviewResponse'
|
||||
status:
|
||||
type: string
|
||||
required:
|
||||
- status
|
||||
- data
|
||||
type: object
|
||||
description: OK
|
||||
"400":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/RenderErrorResponse'
|
||||
description: Bad Request
|
||||
"401":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/RenderErrorResponse'
|
||||
description: Unauthorized
|
||||
"403":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/RenderErrorResponse'
|
||||
description: Forbidden
|
||||
"404":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/RenderErrorResponse'
|
||||
description: Not Found
|
||||
"500":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/RenderErrorResponse'
|
||||
description: Internal Server Error
|
||||
security:
|
||||
- api_key:
|
||||
- VIEWER
|
||||
- tokenizer:
|
||||
- VIEWER
|
||||
summary: Preview span attribute mapping against a sample span
|
||||
tags:
|
||||
- spanmapper
|
||||
/api/v1/testChannel:
|
||||
post:
|
||||
deprecated: true
|
||||
|
||||
@@ -1,101 +0,0 @@
|
||||
package postgressqlschema
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/ee/sqlstore/postgressqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
"github.com/SigNoz/signoz/pkg/sqlschema"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// devenvPostgresDSN is the DSN for the postgres started by `make devenv-postgres`.
|
||||
// Override with TEST_POSTGRES_DSN to point at a different instance.
|
||||
const devenvPostgresDSN = "postgres://postgres:password@localhost:5432/signoz?sslmode=disable"
|
||||
|
||||
// TestSignozDBTagUniqueIndex inspects the real postgres database the enterprise
|
||||
// server migrates and verifies the functional unique index added by migration
|
||||
// 094 on the "tag" table.
|
||||
//
|
||||
// - "MigrationCreatedIndex" is the ground-truth check: it reads the index
|
||||
// definition straight out of pg_indexes and confirms the functional unique
|
||||
// index physically exists. This proves the migration ran and postgres
|
||||
// accepted it.
|
||||
// - "GetIndicesRoundTrip" exercises the engine's GetIndices read-back path and
|
||||
// checks it reconstructs the same index. This is the part your colleague
|
||||
// asked about.
|
||||
//
|
||||
// It mirrors the sqlite signoz.db test, but talks to the devenv postgres
|
||||
// container instead of a local file. The test skips if postgres is unreachable
|
||||
// (run `make devenv-postgres` and the enterprise server first).
|
||||
func TestSignozDBTagUniqueIndex(t *testing.T) {
|
||||
dsn := os.Getenv("TEST_POSTGRES_DSN")
|
||||
if dsn == "" {
|
||||
dsn = devenvPostgresDSN
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
cfg := sqlstore.Config{
|
||||
Provider: "postgres",
|
||||
Postgres: sqlstore.PostgresConfig{DSN: dsn},
|
||||
Connection: sqlstore.ConnectionConfig{MaxOpenConns: 10, MaxConnLifetime: time.Minute},
|
||||
}
|
||||
|
||||
providerSettings := instrumentationtest.New().ToProviderSettings()
|
||||
store, err := postgressqlstore.New(ctx, providerSettings, cfg)
|
||||
if err != nil {
|
||||
t.Skipf("postgres unreachable at %s (run `make devenv-postgres` and the enterprise server): %v", dsn, err)
|
||||
}
|
||||
|
||||
pingCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
|
||||
defer cancel()
|
||||
if err := store.SQLDB().PingContext(pingCtx); err != nil {
|
||||
t.Skipf("postgres unreachable at %s (run `make devenv-postgres` and the enterprise server): %v", dsn, err)
|
||||
}
|
||||
t.Logf("using postgres at %s", dsn)
|
||||
|
||||
schema, err := New(ctx, providerSettings, sqlschema.Config{}, store)
|
||||
require.NoError(t, err)
|
||||
|
||||
expected := &sqlschema.UniqueIndex{
|
||||
TableName: "tag",
|
||||
ColumnNames: []sqlschema.ColumnName{"org_id", "kind", "key", "value"},
|
||||
Expressions: []string{"org_id", "kind", "LOWER(key)", "LOWER(value)"},
|
||||
}
|
||||
|
||||
t.Run("MigrationCreatedIndex", func(t *testing.T) {
|
||||
var def string
|
||||
err := store.
|
||||
BunDB().
|
||||
NewRaw("SELECT indexdef FROM pg_indexes WHERE tablename = 'tag' AND indexname = ?", expected.Name()).
|
||||
Scan(ctx, &def)
|
||||
require.NoError(t, err, "expected unique index %q to exist in postgres", expected.Name())
|
||||
t.Logf("stored indexdef: %s", def)
|
||||
|
||||
require.Contains(t, def, "UNIQUE")
|
||||
// postgres normalizes function names to lowercase.
|
||||
require.Contains(t, def, "lower(key)")
|
||||
require.Contains(t, def, "lower(value)")
|
||||
})
|
||||
|
||||
t.Run("GetIndicesRoundTrip", func(t *testing.T) {
|
||||
indices, err := schema.GetIndices(ctx, "tag")
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Logf("GetIndices returned %d indices", len(indices))
|
||||
var got sqlschema.Index
|
||||
for _, idx := range indices {
|
||||
t.Logf(" name=%q type=%s columns=%v create=%s", idx.Name(), idx.Type(), idx.Columns(), string(idx.ToCreateSQL(schema.Formatter())))
|
||||
if idx.Name() == expected.Name() {
|
||||
got = idx
|
||||
}
|
||||
}
|
||||
|
||||
require.NotNil(t, got, "GetIndices did not return the functional unique index %q", expected.Name())
|
||||
require.True(t, expected.Equals(got), "round-tripped index should equal the original definition")
|
||||
})
|
||||
}
|
||||
@@ -8440,6 +8440,83 @@ export interface SpantypesSpanMapperDTO {
|
||||
updatedBy?: string;
|
||||
}
|
||||
|
||||
export type SpantypesSpanMappingPreviewRequestDTOOtlpTracesAnyOf = {
|
||||
[key: string]: unknown;
|
||||
};
|
||||
|
||||
/**
|
||||
* @nullable
|
||||
*/
|
||||
export type SpantypesSpanMappingPreviewRequestDTOOtlpTraces =
|
||||
SpantypesSpanMappingPreviewRequestDTOOtlpTracesAnyOf | null;
|
||||
|
||||
export type SpantypesSpanMappingPreviewSpanDTOAnyOfResourceAttributesAnyOf = {
|
||||
[key: string]: unknown;
|
||||
};
|
||||
|
||||
/**
|
||||
* @nullable
|
||||
*/
|
||||
export type SpantypesSpanMappingPreviewSpanDTOAnyOfResourceAttributes =
|
||||
SpantypesSpanMappingPreviewSpanDTOAnyOfResourceAttributesAnyOf | null;
|
||||
|
||||
export type SpantypesSpanMappingPreviewSpanDTOAnyOfSpanAttributesAnyOf = {
|
||||
[key: string]: unknown;
|
||||
};
|
||||
|
||||
/**
|
||||
* @nullable
|
||||
*/
|
||||
export type SpantypesSpanMappingPreviewSpanDTOAnyOfSpanAttributes =
|
||||
SpantypesSpanMappingPreviewSpanDTOAnyOfSpanAttributesAnyOf | null;
|
||||
|
||||
export type SpantypesSpanMappingPreviewSpanDTOAnyOf = {
|
||||
/**
|
||||
* @type object,null
|
||||
*/
|
||||
resourceAttributes?: SpantypesSpanMappingPreviewSpanDTOAnyOfResourceAttributes;
|
||||
/**
|
||||
* @type object,null
|
||||
*/
|
||||
spanAttributes?: SpantypesSpanMappingPreviewSpanDTOAnyOfSpanAttributes;
|
||||
};
|
||||
|
||||
/**
|
||||
* @nullable
|
||||
*/
|
||||
export type SpantypesSpanMappingPreviewSpanDTO =
|
||||
SpantypesSpanMappingPreviewSpanDTOAnyOf | null;
|
||||
|
||||
export interface SpantypesSpanMappingPreviewRequestDTO {
|
||||
attributes?: SpantypesSpanMappingPreviewSpanDTO | null;
|
||||
/**
|
||||
* @type string,null
|
||||
*/
|
||||
groupId?: string | null;
|
||||
/**
|
||||
* @type object,null
|
||||
*/
|
||||
otlpTraces?: SpantypesSpanMappingPreviewRequestDTOOtlpTraces;
|
||||
}
|
||||
|
||||
export type SpantypesSpanMappingPreviewResponseDTOOtlpTracesAnyOf = {
|
||||
[key: string]: unknown;
|
||||
};
|
||||
|
||||
/**
|
||||
* @nullable
|
||||
*/
|
||||
export type SpantypesSpanMappingPreviewResponseDTOOtlpTraces =
|
||||
SpantypesSpanMappingPreviewResponseDTOOtlpTracesAnyOf | null;
|
||||
|
||||
export interface SpantypesSpanMappingPreviewResponseDTO {
|
||||
attributes?: SpantypesSpanMappingPreviewSpanDTO | null;
|
||||
/**
|
||||
* @type object,null
|
||||
*/
|
||||
otlpTraces?: SpantypesSpanMappingPreviewResponseDTOOtlpTraces;
|
||||
}
|
||||
|
||||
export interface SpantypesUpdatableSpanMapperDTO {
|
||||
config?: SpantypesSpanMapperConfigDTO;
|
||||
/**
|
||||
@@ -9746,6 +9823,14 @@ export type UpdateSpanMapperPathParameters = {
|
||||
groupId: string;
|
||||
mapperId: string;
|
||||
};
|
||||
export type PreviewSpanMapping200 = {
|
||||
data: SpantypesSpanMappingPreviewResponseDTO;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
status: string;
|
||||
};
|
||||
|
||||
export type GetTraceAggregationsPathParameters = {
|
||||
traceID: string;
|
||||
};
|
||||
|
||||
@@ -27,9 +27,11 @@ import type {
|
||||
ListSpanMapperGroupsParams,
|
||||
ListSpanMappers200,
|
||||
ListSpanMappersPathParameters,
|
||||
PreviewSpanMapping200,
|
||||
RenderErrorResponseDTO,
|
||||
SpantypesPostableSpanMapperDTO,
|
||||
SpantypesPostableSpanMapperGroupDTO,
|
||||
SpantypesSpanMappingPreviewRequestDTO,
|
||||
SpantypesUpdatableSpanMapperDTO,
|
||||
SpantypesUpdatableSpanMapperGroupDTO,
|
||||
UpdateSpanMapperGroupPathParameters,
|
||||
@@ -780,3 +782,86 @@ export const useUpdateSpanMapper = <
|
||||
> => {
|
||||
return useMutation(getUpdateSpanMapperMutationOptions(options));
|
||||
};
|
||||
/**
|
||||
* Previews how the org's saved attribute mappings would transform a sample span.
|
||||
* @summary Preview span attribute mapping against a sample span
|
||||
*/
|
||||
export const previewSpanMapping = (
|
||||
spantypesSpanMappingPreviewRequestDTO?: BodyType<SpantypesSpanMappingPreviewRequestDTO>,
|
||||
signal?: AbortSignal,
|
||||
) => {
|
||||
return GeneratedAPIInstance<PreviewSpanMapping200>({
|
||||
url: `/api/v1/span_mapper_groups/preview`,
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
data: spantypesSpanMappingPreviewRequestDTO,
|
||||
signal,
|
||||
});
|
||||
};
|
||||
|
||||
export const getPreviewSpanMappingMutationOptions = <
|
||||
TError = ErrorType<RenderErrorResponseDTO>,
|
||||
TContext = unknown,
|
||||
>(options?: {
|
||||
mutation?: UseMutationOptions<
|
||||
Awaited<ReturnType<typeof previewSpanMapping>>,
|
||||
TError,
|
||||
{ data?: BodyType<SpantypesSpanMappingPreviewRequestDTO> },
|
||||
TContext
|
||||
>;
|
||||
}): UseMutationOptions<
|
||||
Awaited<ReturnType<typeof previewSpanMapping>>,
|
||||
TError,
|
||||
{ data?: BodyType<SpantypesSpanMappingPreviewRequestDTO> },
|
||||
TContext
|
||||
> => {
|
||||
const mutationKey = ['previewSpanMapping'];
|
||||
const { mutation: mutationOptions } = options
|
||||
? options.mutation &&
|
||||
'mutationKey' in options.mutation &&
|
||||
options.mutation.mutationKey
|
||||
? options
|
||||
: { ...options, mutation: { ...options.mutation, mutationKey } }
|
||||
: { mutation: { mutationKey } };
|
||||
|
||||
const mutationFn: MutationFunction<
|
||||
Awaited<ReturnType<typeof previewSpanMapping>>,
|
||||
{ data?: BodyType<SpantypesSpanMappingPreviewRequestDTO> }
|
||||
> = (props) => {
|
||||
const { data } = props ?? {};
|
||||
|
||||
return previewSpanMapping(data);
|
||||
};
|
||||
|
||||
return { mutationFn, ...mutationOptions };
|
||||
};
|
||||
|
||||
export type PreviewSpanMappingMutationResult = NonNullable<
|
||||
Awaited<ReturnType<typeof previewSpanMapping>>
|
||||
>;
|
||||
export type PreviewSpanMappingMutationBody =
|
||||
| BodyType<SpantypesSpanMappingPreviewRequestDTO>
|
||||
| undefined;
|
||||
export type PreviewSpanMappingMutationError = ErrorType<RenderErrorResponseDTO>;
|
||||
|
||||
/**
|
||||
* @summary Preview span attribute mapping against a sample span
|
||||
*/
|
||||
export const usePreviewSpanMapping = <
|
||||
TError = ErrorType<RenderErrorResponseDTO>,
|
||||
TContext = unknown,
|
||||
>(options?: {
|
||||
mutation?: UseMutationOptions<
|
||||
Awaited<ReturnType<typeof previewSpanMapping>>,
|
||||
TError,
|
||||
{ data?: BodyType<SpantypesSpanMappingPreviewRequestDTO> },
|
||||
TContext
|
||||
>;
|
||||
}): UseMutationResult<
|
||||
Awaited<ReturnType<typeof previewSpanMapping>>,
|
||||
TError,
|
||||
{ data?: BodyType<SpantypesSpanMappingPreviewRequestDTO> },
|
||||
TContext
|
||||
> => {
|
||||
return useMutation(getPreviewSpanMappingMutationOptions(options));
|
||||
};
|
||||
|
||||
@@ -51,6 +51,26 @@ func (provider *provider) addSpanMapperRoutes(router *mux.Router) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := router.Handle("/api/v1/span_mapper_groups/preview", handler.New(
|
||||
provider.authzMiddleware.ViewAccess(provider.spanMapperHandler.PreviewMapping),
|
||||
handler.OpenAPIDef{
|
||||
ID: "PreviewSpanMapping",
|
||||
Tags: []string{"spanmapper"},
|
||||
Summary: "Preview span attribute mapping against a sample span",
|
||||
Description: "Previews how the org's saved attribute mappings would transform a sample span.",
|
||||
Request: new(spantypes.SpanMappingPreviewRequest),
|
||||
RequestContentType: "application/json",
|
||||
Response: new(spantypes.SpanMappingPreviewResponse),
|
||||
ResponseContentType: "application/json",
|
||||
SuccessStatusCode: http.StatusOK,
|
||||
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound},
|
||||
Deprecated: false,
|
||||
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
|
||||
},
|
||||
)).Methods(http.MethodPost).GetError(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := router.Handle("/api/v1/span_mapper_groups/{groupId}", handler.New(
|
||||
provider.authzMiddleware.AdminAccess(provider.spanMapperHandler.UpdateGroup),
|
||||
handler.OpenAPIDef{
|
||||
|
||||
@@ -273,6 +273,35 @@ func (h *handler) DeleteMapper(rw http.ResponseWriter, r *http.Request) {
|
||||
render.Success(rw, http.StatusNoContent, nil)
|
||||
}
|
||||
|
||||
// PreviewMapping handles POST /api/v1/span_mapper_groups/preview.
|
||||
// used to get preview of attributes after remapping.
|
||||
func (h *handler) PreviewMapping(rw http.ResponseWriter, r *http.Request) {
|
||||
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
claims, err := authtypes.ClaimsFromContext(ctx)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
orgID := valuer.MustNewUUID(claims.OrgID)
|
||||
|
||||
req := new(spantypes.SpanMappingPreviewRequest)
|
||||
if err := binding.JSON.BindBody(r.Body, req); err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
result, err := h.module.PreviewMapping(ctx, orgID, req)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
render.Success(rw, http.StatusOK, result)
|
||||
}
|
||||
|
||||
// groupIDFromPath extracts and validates the {id} or {groupId} path variable.
|
||||
func groupIDFromPath(r *http.Request) (valuer.UUID, error) {
|
||||
vars := mux.Vars(r)
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/modules/spanmapper"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
|
||||
"github.com/SigNoz/signoz/pkg/types/opamptypes"
|
||||
@@ -102,6 +103,64 @@ func (module *module) DeleteMapper(ctx context.Context, orgID, groupID, id value
|
||||
return nil
|
||||
}
|
||||
|
||||
// PreviewMapping resolves the org's saved mappings for a sample input
|
||||
// and returns the transformed result.
|
||||
func (module *module) PreviewMapping(ctx context.Context, orgID valuer.UUID, req *spantypes.SpanMappingPreviewRequest) (*spantypes.SpanMappingPreviewResponse, error) {
|
||||
groups, err := module.resolvePreviewGroups(ctx, orgID, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
hasAttrs := req.Attributes != nil
|
||||
hasOTLP := len(req.OtlpTraces) > 0
|
||||
if hasAttrs == hasOTLP {
|
||||
return nil, errors.New(errors.TypeInvalidInput, spantypes.ErrCodeMappingInvalidInput, "exactly one of 'attributes' or 'otlpTraces' must be provided")
|
||||
}
|
||||
|
||||
if hasAttrs {
|
||||
outResource, outSpan := spantypes.SimulateSpanMapping(groups, req.Attributes.ResourceAttributes, req.Attributes.SpanAttributes)
|
||||
return &spantypes.SpanMappingPreviewResponse{
|
||||
Attributes: &spantypes.SpanMappingPreviewSpan{ResourceAttributes: outResource, SpanAttributes: outSpan},
|
||||
}, nil
|
||||
}
|
||||
|
||||
in, err := json.Marshal(req.OtlpTraces)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInvalidInput, spantypes.ErrCodeMappingInvalidInput, "could not serialize otlpTraces payload")
|
||||
}
|
||||
out, err := spantypes.SimulateSpanMappingOTLP(groups, in)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var transformed map[string]any
|
||||
if err := json.Unmarshal(out, &transformed); err != nil {
|
||||
return nil, errors.WrapInternalf(err, spantypes.ErrCodeMappingPreviewFailed, "could not deserialize transformed traces")
|
||||
}
|
||||
return &spantypes.SpanMappingPreviewResponse{OtlpTraces: transformed}, nil
|
||||
}
|
||||
|
||||
// resolvePreviewGroups resolves the config to preview against a specific saved
|
||||
// group when GroupID is set, otherwise all the org's enabled saved mappings.
|
||||
func (module *module) resolvePreviewGroups(ctx context.Context, orgID valuer.UUID, req *spantypes.SpanMappingPreviewRequest) ([]*spantypes.SpanMapperGroupWithMappers, error) {
|
||||
if req.GroupID != nil && *req.GroupID != "" {
|
||||
id, err := valuer.NewUUID(*req.GroupID)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInvalidInput, spantypes.ErrCodeMappingInvalidInput, "group id is not a valid uuid")
|
||||
}
|
||||
group, err := module.store.GetGroup(ctx, orgID, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
mappers, err := module.store.ListMappers(ctx, orgID, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return []*spantypes.SpanMapperGroupWithMappers{{Group: group, Mappers: mappers}}, nil
|
||||
}
|
||||
|
||||
return module.listEnabledGroupsWithMappers(ctx, orgID)
|
||||
}
|
||||
|
||||
func (module *module) AgentFeatureType() agentConf.AgentFeatureType {
|
||||
return spantypes.SpanAttrMappingFeatureType
|
||||
}
|
||||
|
||||
@@ -27,6 +27,7 @@ type Module interface {
|
||||
CreateMapper(ctx context.Context, orgID, groupID valuer.UUID, mapper *spantypes.SpanMapper) error
|
||||
UpdateMapper(ctx context.Context, orgID, groupID, id valuer.UUID, fieldContext spantypes.FieldContext, config *spantypes.SpanMapperConfig, enabled *bool, updatedBy string) error
|
||||
DeleteMapper(ctx context.Context, orgID, groupID, id valuer.UUID) error
|
||||
PreviewMapping(ctx context.Context, orgID valuer.UUID, req *spantypes.SpanMappingPreviewRequest) (*spantypes.SpanMappingPreviewResponse, error)
|
||||
}
|
||||
|
||||
// Handler defines the HTTP handler interface for mapping group and mapper endpoints.
|
||||
@@ -42,4 +43,5 @@ type Handler interface {
|
||||
CreateMapper(rw http.ResponseWriter, r *http.Request)
|
||||
UpdateMapper(rw http.ResponseWriter, r *http.Request)
|
||||
DeleteMapper(rw http.ResponseWriter, r *http.Request)
|
||||
PreviewMapping(rw http.ResponseWriter, r *http.Request)
|
||||
}
|
||||
|
||||
@@ -213,7 +213,6 @@ func NewSQLMigrationProviderFactories(
|
||||
sqlmigration.NewCloudIntegrationRemoveCascadeDeleteFactory(sqlschema),
|
||||
sqlmigration.NewAddUserDashboardPreferenceFactory(sqlstore, sqlschema),
|
||||
sqlmigration.NewRecreateUserDashboardPreferenceFactory(sqlstore, sqlschema),
|
||||
sqlmigration.NewAddTagUniqueIndexFactory(sqlstore, sqlschema),
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -62,6 +62,8 @@ func (migration *addTags) Up(ctx context.Context, db *bun.DB) error {
|
||||
})
|
||||
sqls = append(sqls, tagTableSQLs...)
|
||||
|
||||
// TODO (@namanverma): add a unique index for tags: (org_id, kind, (LOWER(key)), (LOWER(value)))
|
||||
|
||||
tagRelationsTableSQLs := migration.sqlschema.Operator().CreateTable(&sqlschema.Table{
|
||||
Name: "tag_relation",
|
||||
Columns: []*sqlschema.Column{
|
||||
|
||||
@@ -1,60 +0,0 @@
|
||||
package sqlmigration
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/sqlschema"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/uptrace/bun"
|
||||
"github.com/uptrace/bun/migrate"
|
||||
)
|
||||
|
||||
type addTagUniqueIndex struct {
|
||||
sqlstore sqlstore.SQLStore
|
||||
sqlschema sqlschema.SQLSchema
|
||||
}
|
||||
|
||||
func NewAddTagUniqueIndexFactory(sqlstore sqlstore.SQLStore, sqlschema sqlschema.SQLSchema) factory.ProviderFactory[SQLMigration, Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("add_tag_unique_index"), func(ctx context.Context, ps factory.ProviderSettings, c Config) (SQLMigration, error) {
|
||||
return &addTagUniqueIndex{
|
||||
sqlstore: sqlstore,
|
||||
sqlschema: sqlschema,
|
||||
}, nil
|
||||
})
|
||||
}
|
||||
|
||||
func (migration *addTagUniqueIndex) Register(migrations *migrate.Migrations) error {
|
||||
return migrations.Register(migration.Up, migration.Down)
|
||||
}
|
||||
|
||||
func (migration *addTagUniqueIndex) Up(ctx context.Context, db *bun.DB) error {
|
||||
tx, err := db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
_ = tx.Rollback()
|
||||
}()
|
||||
|
||||
sqls := migration.sqlschema.Operator().CreateIndex(
|
||||
&sqlschema.UniqueIndex{
|
||||
TableName: "tag",
|
||||
ColumnNames: []sqlschema.ColumnName{"org_id", "kind", "key", "value"},
|
||||
Expressions: []string{"org_id", "kind", "LOWER(key)", "LOWER(value)"},
|
||||
},
|
||||
)
|
||||
|
||||
for _, sql := range sqls {
|
||||
if _, err := tx.ExecContext(ctx, string(sql)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (migration *addTagUniqueIndex) Down(_ context.Context, _ *bun.DB) error {
|
||||
return nil
|
||||
}
|
||||
@@ -1,8 +1,6 @@
|
||||
package sqlschema
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"hash/fnv"
|
||||
"slices"
|
||||
"strings"
|
||||
|
||||
@@ -51,23 +49,9 @@ type Index interface {
|
||||
ToDropSQL(fmter SQLFormatter) []byte
|
||||
}
|
||||
|
||||
// UniqueIndex models a unique index on a table.
|
||||
//
|
||||
// In the common case the index keys on plain columns: set only ColumnNames and
|
||||
// the SQL is emitted with each column identifier-quoted by the formatter
|
||||
// (`CREATE UNIQUE INDEX uq_t_a_b ON t (a, b)`).
|
||||
//
|
||||
// For functional indexes (e.g. case-insensitive uniqueness on `LOWER(col)`),
|
||||
// set Expressions to the raw SQL parts and use ColumnNames as metadata for
|
||||
// "which columns does this index touch". When Expressions is non-empty, it
|
||||
// overrides ColumnNames for SQL emission — each entry is written verbatim, so
|
||||
// the caller owns well-formedness — and the auto-generated name uses a hash
|
||||
// suffix instead of a readable column join because expressions aren't valid
|
||||
// identifier fragments.
|
||||
type UniqueIndex struct {
|
||||
TableName TableName
|
||||
ColumnNames []ColumnName
|
||||
Expressions []string
|
||||
name string
|
||||
}
|
||||
|
||||
@@ -87,28 +71,16 @@ func (index *UniqueIndex) Name() string {
|
||||
}
|
||||
b.WriteString(string(column))
|
||||
}
|
||||
|
||||
if len(index.Expressions) > 0 {
|
||||
if len(index.ColumnNames) > 0 {
|
||||
b.WriteString("_")
|
||||
}
|
||||
hasher := fnv.New32a()
|
||||
_, _ = hasher.Write([]byte(strings.Join(index.Expressions, "\x00")))
|
||||
fmt.Fprintf(&b, "%08x", hasher.Sum32())
|
||||
}
|
||||
return b.String()
|
||||
}
|
||||
|
||||
func (index *UniqueIndex) Named(name string) Index {
|
||||
copyOfColumnNames := make([]ColumnName, len(index.ColumnNames))
|
||||
copy(copyOfColumnNames, index.ColumnNames)
|
||||
copyOfExpressions := make([]string, len(index.Expressions))
|
||||
copy(copyOfExpressions, index.Expressions)
|
||||
|
||||
return &UniqueIndex{
|
||||
TableName: index.TableName,
|
||||
ColumnNames: copyOfColumnNames,
|
||||
Expressions: copyOfExpressions,
|
||||
name: name,
|
||||
}
|
||||
}
|
||||
@@ -129,18 +101,7 @@ func (index *UniqueIndex) Equals(other Index) bool {
|
||||
if other.Type() != IndexTypeUnique {
|
||||
return false
|
||||
}
|
||||
otherUnique, ok := other.(*UniqueIndex)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
// Plain and functional indexes produce different SQL even if their column
|
||||
// sets overlap; require both shapes to match.
|
||||
if (len(index.Expressions) == 0) != (len(otherUnique.Expressions) == 0) {
|
||||
return false
|
||||
}
|
||||
if len(index.Expressions) > 0 && !slices.Equal(index.Expressions, otherUnique.Expressions) {
|
||||
return false
|
||||
}
|
||||
|
||||
return index.Name() == other.Name() && slices.Equal(index.Columns(), other.Columns())
|
||||
}
|
||||
|
||||
@@ -153,20 +114,12 @@ func (index *UniqueIndex) ToCreateSQL(fmter SQLFormatter) []byte {
|
||||
sql = fmter.AppendIdent(sql, string(index.TableName))
|
||||
sql = append(sql, " ("...)
|
||||
|
||||
if len(index.Expressions) > 0 {
|
||||
for i, expr := range index.Expressions {
|
||||
if i > 0 {
|
||||
sql = append(sql, ", "...)
|
||||
}
|
||||
sql = append(sql, expr...)
|
||||
}
|
||||
} else {
|
||||
for i, column := range index.ColumnNames {
|
||||
if i > 0 {
|
||||
sql = append(sql, ", "...)
|
||||
}
|
||||
sql = fmter.AppendIdent(sql, string(column))
|
||||
for i, column := range index.ColumnNames {
|
||||
if i > 0 {
|
||||
sql = append(sql, ", "...)
|
||||
}
|
||||
|
||||
sql = fmter.AppendIdent(sql, string(column))
|
||||
}
|
||||
|
||||
sql = append(sql, ")"...)
|
||||
|
||||
@@ -38,43 +38,6 @@ func TestIndexToCreateSQL(t *testing.T) {
|
||||
},
|
||||
sql: `CREATE UNIQUE INDEX IF NOT EXISTS "my_index" ON "users" ("id", "name", "email")`,
|
||||
},
|
||||
{
|
||||
name: "Unique_Functional_SingleExpression",
|
||||
index: &UniqueIndex{
|
||||
TableName: "users",
|
||||
ColumnNames: []ColumnName{"email"},
|
||||
Expressions: []string{"LOWER(email)"},
|
||||
},
|
||||
sql: `CREATE UNIQUE INDEX IF NOT EXISTS "uq_users_email_1e5a87f1" ON "users" (LOWER(email))`,
|
||||
},
|
||||
{
|
||||
name: "Unique_Functional_MixedColumnsAndExpressions",
|
||||
index: &UniqueIndex{
|
||||
TableName: "tag",
|
||||
ColumnNames: []ColumnName{"org_id", "kind", "key", "value"},
|
||||
Expressions: []string{"org_id", "kind", "LOWER(key)", "LOWER(value)"},
|
||||
},
|
||||
sql: `CREATE UNIQUE INDEX IF NOT EXISTS "uq_tag_org_id_kind_key_value_57e8f81f" ON "tag" (org_id, kind, LOWER(key), LOWER(value))`,
|
||||
},
|
||||
{
|
||||
name: "Unique_Functional_ComplexExpression",
|
||||
index: &UniqueIndex{
|
||||
TableName: "users",
|
||||
ColumnNames: []ColumnName{"first_name", "last_name"},
|
||||
Expressions: []string{"LOWER(TRIM(first_name) || ' ' || TRIM(last_name))"},
|
||||
},
|
||||
sql: `CREATE UNIQUE INDEX IF NOT EXISTS "uq_users_first_name_last_name_adb1ff53" ON "users" (LOWER(TRIM(first_name) || ' ' || TRIM(last_name)))`,
|
||||
},
|
||||
{
|
||||
name: "Unique_Functional_Named",
|
||||
index: &UniqueIndex{
|
||||
TableName: "tag",
|
||||
ColumnNames: []ColumnName{"org_id", "kind", "key", "value"},
|
||||
Expressions: []string{"org_id", "kind", "LOWER(key)", "LOWER(value)"},
|
||||
name: "uq_tag_org_kind_lower_key_lower_value",
|
||||
},
|
||||
sql: `CREATE UNIQUE INDEX IF NOT EXISTS "uq_tag_org_kind_lower_key_lower_value" ON "tag" (org_id, kind, LOWER(key), LOWER(value))`,
|
||||
},
|
||||
{
|
||||
name: "PartialUnique_1Column",
|
||||
index: &PartialUniqueIndex{
|
||||
@@ -266,47 +229,6 @@ func TestIndexEquals(t *testing.T) {
|
||||
},
|
||||
equals: false,
|
||||
},
|
||||
{
|
||||
name: "Unique_Functional_Same",
|
||||
a: &UniqueIndex{
|
||||
TableName: "users",
|
||||
ColumnNames: []ColumnName{"email"},
|
||||
Expressions: []string{"LOWER(email)"},
|
||||
},
|
||||
b: &UniqueIndex{
|
||||
TableName: "users",
|
||||
ColumnNames: []ColumnName{"email"},
|
||||
Expressions: []string{"LOWER(email)"},
|
||||
},
|
||||
equals: true,
|
||||
},
|
||||
{
|
||||
name: "Unique_Functional_DifferentExpressions",
|
||||
a: &UniqueIndex{
|
||||
TableName: "users",
|
||||
ColumnNames: []ColumnName{"email"},
|
||||
Expressions: []string{"LOWER(email)"},
|
||||
},
|
||||
b: &UniqueIndex{
|
||||
TableName: "users",
|
||||
ColumnNames: []ColumnName{"email"},
|
||||
Expressions: []string{"UPPER(email)"},
|
||||
},
|
||||
equals: false,
|
||||
},
|
||||
{
|
||||
name: "Unique_Functional_NotEqualToPlainSameColumns",
|
||||
a: &UniqueIndex{
|
||||
TableName: "users",
|
||||
ColumnNames: []ColumnName{"email"},
|
||||
Expressions: []string{"LOWER(email)"},
|
||||
},
|
||||
b: &UniqueIndex{
|
||||
TableName: "users",
|
||||
ColumnNames: []ColumnName{"email"},
|
||||
},
|
||||
equals: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, testCase := range testCases {
|
||||
@@ -316,75 +238,6 @@ func TestIndexEquals(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestUniqueIndexFunctionalName(t *testing.T) {
|
||||
t.Run("autogen uses uq_<table>_<hash>", func(t *testing.T) {
|
||||
idx := &UniqueIndex{
|
||||
TableName: "tag",
|
||||
ColumnNames: []ColumnName{"org_id", "kind", "key", "value"},
|
||||
Expressions: []string{"org_id", "kind", "LOWER(key)", "LOWER(value)"},
|
||||
}
|
||||
assert.Equal(t, "uq_tag_org_id_kind_key_value_57e8f81f", idx.Name())
|
||||
})
|
||||
|
||||
t.Run("same expressions produce the same name", func(t *testing.T) {
|
||||
a := &UniqueIndex{
|
||||
TableName: "users",
|
||||
Expressions: []string{"LOWER(email)"},
|
||||
}
|
||||
b := &UniqueIndex{
|
||||
TableName: "users",
|
||||
Expressions: []string{"LOWER(email)"},
|
||||
}
|
||||
assert.Equal(t, a.Name(), b.Name())
|
||||
})
|
||||
|
||||
t.Run("different expressions produce different names", func(t *testing.T) {
|
||||
a := &UniqueIndex{
|
||||
TableName: "users",
|
||||
Expressions: []string{"LOWER(email)"},
|
||||
}
|
||||
b := &UniqueIndex{
|
||||
TableName: "users",
|
||||
Expressions: []string{"UPPER(email)"},
|
||||
}
|
||||
assert.NotEqual(t, a.Name(), b.Name())
|
||||
})
|
||||
|
||||
t.Run("expressions in different order produce different names", func(t *testing.T) {
|
||||
a := &UniqueIndex{
|
||||
TableName: "tag",
|
||||
Expressions: []string{"org_id", "LOWER(key)"},
|
||||
}
|
||||
b := &UniqueIndex{
|
||||
TableName: "tag",
|
||||
Expressions: []string{"LOWER(key)", "org_id"},
|
||||
}
|
||||
assert.NotEqual(t, a.Name(), b.Name())
|
||||
})
|
||||
|
||||
t.Run("functional autogen differs from plain autogen for same columns", func(t *testing.T) {
|
||||
plain := &UniqueIndex{
|
||||
TableName: "users",
|
||||
ColumnNames: []ColumnName{"email"},
|
||||
}
|
||||
functional := &UniqueIndex{
|
||||
TableName: "users",
|
||||
ColumnNames: []ColumnName{"email"},
|
||||
Expressions: []string{"LOWER(email)"},
|
||||
}
|
||||
assert.Equal(t, "uq_users_email", plain.Name())
|
||||
assert.NotEqual(t, plain.Name(), functional.Name())
|
||||
})
|
||||
|
||||
t.Run("Named() override wins over hash", func(t *testing.T) {
|
||||
idx := (&UniqueIndex{
|
||||
TableName: "tag",
|
||||
Expressions: []string{"org_id", "LOWER(key)"},
|
||||
}).Named("my_functional_index")
|
||||
assert.Equal(t, "my_functional_index", idx.Name())
|
||||
})
|
||||
}
|
||||
|
||||
func TestPartialUniqueIndexName(t *testing.T) {
|
||||
a := &PartialUniqueIndex{
|
||||
TableName: "users",
|
||||
|
||||
@@ -1,111 +0,0 @@
|
||||
package sqlitesqlschema
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
"github.com/SigNoz/signoz/pkg/sqlschema"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore/sqlitesqlstore"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// findSignozDB walks up from the test's working directory looking for a
|
||||
// signoz.db file (the one the community server creates at the repo root).
|
||||
func findSignozDB(t *testing.T) string {
|
||||
t.Helper()
|
||||
|
||||
dir, err := os.Getwd()
|
||||
require.NoError(t, err)
|
||||
|
||||
for {
|
||||
candidate := filepath.Join(dir, "signoz.db")
|
||||
if _, err := os.Stat(candidate); err == nil {
|
||||
return candidate
|
||||
}
|
||||
|
||||
parent := filepath.Dir(dir)
|
||||
if parent == dir {
|
||||
return ""
|
||||
}
|
||||
dir = parent
|
||||
}
|
||||
}
|
||||
|
||||
// TestSignozDBTagUniqueIndex inspects the real signoz.db produced by running the
|
||||
// community server and verifies the functional unique index added by migration
|
||||
// 094 on the "tag" table.
|
||||
//
|
||||
// - "MigrationCreatedIndex" is the ground-truth check: it reads the index DDL
|
||||
// straight out of sqlite_master and confirms the functional unique index
|
||||
// physically exists. This proves the migration ran and sqlite accepted it.
|
||||
// - "GetIndicesRoundTrip" exercises the engine's GetIndices read-back path and
|
||||
// checks it reconstructs the same index. This is the part your colleague
|
||||
// asked about.
|
||||
func TestSignozDBTagUniqueIndex(t *testing.T) {
|
||||
dbPath := findSignozDB(t)
|
||||
if dbPath == "" {
|
||||
t.Skip("signoz.db not found; start the community server first so it creates the file and runs migrations")
|
||||
}
|
||||
t.Logf("using signoz.db at %s", dbPath)
|
||||
|
||||
ctx := context.Background()
|
||||
cfg := sqlstore.Config{
|
||||
Provider: "sqlite",
|
||||
Sqlite: sqlstore.SqliteConfig{
|
||||
Path: dbPath,
|
||||
Mode: "wal",
|
||||
BusyTimeout: 10 * time.Second,
|
||||
TransactionMode: "deferred",
|
||||
},
|
||||
Connection: sqlstore.ConnectionConfig{MaxOpenConns: 10},
|
||||
}
|
||||
|
||||
providerSettings := instrumentationtest.New().ToProviderSettings()
|
||||
store, err := sqlitesqlstore.New(ctx, providerSettings, cfg)
|
||||
require.NoError(t, err)
|
||||
|
||||
schema, err := New(ctx, providerSettings, sqlschema.Config{}, store)
|
||||
require.NoError(t, err)
|
||||
|
||||
expected := &sqlschema.UniqueIndex{
|
||||
TableName: "tag",
|
||||
ColumnNames: []sqlschema.ColumnName{"org_id", "kind", "key", "value"},
|
||||
Expressions: []string{"org_id", "kind", "LOWER(key)", "LOWER(value)"},
|
||||
}
|
||||
|
||||
t.Run("MigrationCreatedIndex", func(t *testing.T) {
|
||||
var ddl string
|
||||
err := store.
|
||||
BunDB().
|
||||
NewRaw("SELECT sql FROM sqlite_master WHERE type = 'index' AND tbl_name = 'tag' AND name = ?", expected.Name()).
|
||||
Scan(ctx, &ddl)
|
||||
require.NoError(t, err, "expected unique index %q to exist in signoz.db", expected.Name())
|
||||
t.Logf("stored DDL: %s", ddl)
|
||||
|
||||
require.Contains(t, ddl, "UNIQUE")
|
||||
require.Contains(t, ddl, "LOWER(key)")
|
||||
require.Contains(t, ddl, "LOWER(value)")
|
||||
})
|
||||
|
||||
t.Run("GetIndicesRoundTrip", func(t *testing.T) {
|
||||
indices, err := schema.GetIndices(ctx, "tag")
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Logf("GetIndices returned %d indices", len(indices))
|
||||
var got sqlschema.Index
|
||||
for _, idx := range indices {
|
||||
t.Logf(" name=%q type=%s columns=%v create=%s", idx.Name(), idx.Type(), idx.Columns(), string(idx.ToCreateSQL(schema.Formatter())))
|
||||
if idx.Name() == expected.Name() {
|
||||
got = idx
|
||||
}
|
||||
}
|
||||
|
||||
require.NotNil(t, got, "GetIndices did not return the functional unique index %q", expected.Name())
|
||||
require.True(t, expected.Equals(got), "round-tripped index should equal the original definition")
|
||||
})
|
||||
}
|
||||
160
pkg/types/spantypes/spanmappersimulator.go
Normal file
160
pkg/types/spantypes/spanmappersimulator.go
Normal file
@@ -0,0 +1,160 @@
|
||||
package spantypes
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"go.opentelemetry.io/collector/pdata/ptrace"
|
||||
)
|
||||
|
||||
var resourceSourcePrefix = FieldContextResource.StringValue() + "."
|
||||
|
||||
var ErrCodeMappingPreviewFailed = errors.MustNewCode("span_attribute_mapping_preview_failed")
|
||||
|
||||
type SpanMappingPreviewSpan struct {
|
||||
ResourceAttributes map[string]any `json:"resourceAttributes" nullable:"true"`
|
||||
SpanAttributes map[string]any `json:"spanAttributes" nullable:"true"`
|
||||
}
|
||||
|
||||
type SpanMappingPreviewRequest struct {
|
||||
Attributes *SpanMappingPreviewSpan `json:"attributes" nullable:"true"`
|
||||
OtlpTraces map[string]any `json:"otlpTraces" nullable:"true"`
|
||||
GroupID *string `json:"groupId" nullable:"true"`
|
||||
}
|
||||
|
||||
type SpanMappingPreviewResponse struct {
|
||||
Attributes *SpanMappingPreviewSpan `json:"attributes,omitempty" nullable:"true"`
|
||||
OtlpTraces map[string]any `json:"otlpTraces,omitempty" nullable:"true"`
|
||||
}
|
||||
|
||||
func SimulateSpanMapping(groups []*SpanMapperGroupWithMappers, resourceAttrs, spanAttrs map[string]any) (outResource, outSpan map[string]any) {
|
||||
cfg := buildProcessorConfig(filterEnabledGroupsWithMappers(groups))
|
||||
|
||||
outResource = cloneAttrs(resourceAttrs)
|
||||
outSpan = cloneAttrs(spanAttrs)
|
||||
|
||||
applyEnabledGroups(cfg, outSpan, outResource)
|
||||
return outResource, outSpan
|
||||
}
|
||||
|
||||
func SimulateSpanMappingOTLP(groups []*SpanMapperGroupWithMappers, otlp []byte) ([]byte, error) {
|
||||
td, err := (&ptrace.JSONUnmarshaler{}).UnmarshalTraces(otlp)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInvalidInputf(err, ErrCodeMappingInvalidInput, "invalid OTLP traces payload")
|
||||
}
|
||||
|
||||
cfg := buildProcessorConfig(filterEnabledGroupsWithMappers(groups))
|
||||
|
||||
rss := td.ResourceSpans()
|
||||
for i := 0; i < rss.Len(); i++ {
|
||||
rs := rss.At(i)
|
||||
resourceAttrs := rs.Resource().Attributes().AsRaw()
|
||||
|
||||
scopeSpans := rs.ScopeSpans()
|
||||
for j := 0; j < scopeSpans.Len(); j++ {
|
||||
spans := scopeSpans.At(j).Spans()
|
||||
for k := 0; k < spans.Len(); k++ {
|
||||
span := spans.At(k)
|
||||
spanAttrs := span.Attributes().AsRaw()
|
||||
applyEnabledGroups(cfg, spanAttrs, resourceAttrs)
|
||||
if err := span.Attributes().FromRaw(spanAttrs); err != nil {
|
||||
return nil, errors.WrapInternalf(err, ErrCodeMappingPreviewFailed, "could not write transformed span attributes")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := rs.Resource().Attributes().FromRaw(resourceAttrs); err != nil {
|
||||
return nil, errors.WrapInternalf(err, ErrCodeMappingPreviewFailed, "could not write transformed resource attributes")
|
||||
}
|
||||
}
|
||||
|
||||
out, err := (&ptrace.JSONMarshaler{}).MarshalTraces(td)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(err, ErrCodeMappingPreviewFailed, "could not marshal transformed traces")
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// TODO(spanmapper-preview): the apply logic below is a temporary copy of the signozspanmapper processor — remove it and call the real processor once signoz-otel-collector#796 is merged.
|
||||
func applyEnabledGroups(cfg *spanMapperProcessorConfig, spanAttrs, resourceAttrs map[string]any) {
|
||||
for i := range cfg.Groups {
|
||||
g := &cfg.Groups[i]
|
||||
if !spanMapperConditionMet(g.ExistsAny, spanAttrs, resourceAttrs) {
|
||||
continue
|
||||
}
|
||||
for j := range g.Attributes {
|
||||
applySpanMapperRule(&g.Attributes[j], spanAttrs, resourceAttrs)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// filterEnabledGroupsWithMappers keeps only enabled groups and their enabled
|
||||
// mappers, dropping groups left with no enabled mappers.
|
||||
func filterEnabledGroupsWithMappers(groups []*SpanMapperGroupWithMappers) []*SpanMapperGroupWithMappers {
|
||||
out := make([]*SpanMapperGroupWithMappers, 0, len(groups))
|
||||
for _, gm := range groups {
|
||||
if gm == nil || gm.Group == nil || !gm.Group.Enabled {
|
||||
continue
|
||||
}
|
||||
enabled := make([]*SpanMapper, 0, len(gm.Mappers))
|
||||
for _, m := range gm.Mappers {
|
||||
if m != nil && m.Enabled {
|
||||
enabled = append(enabled, m)
|
||||
}
|
||||
}
|
||||
if len(enabled) > 0 {
|
||||
out = append(out, &SpanMapperGroupWithMappers{Group: gm.Group, Mappers: enabled})
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func spanMapperConditionMet(cond spanMapperProcessorExistsAny, spanAttrs, resourceAttrs map[string]any) bool {
|
||||
return anyKeyContains(spanAttrs, cond.Attributes) || anyKeyContains(resourceAttrs, cond.Resource)
|
||||
}
|
||||
|
||||
func anyKeyContains(attrs map[string]any, patterns []string) bool {
|
||||
for k := range attrs {
|
||||
for _, p := range patterns {
|
||||
if strings.Contains(k, p) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func applySpanMapperRule(rule *spanMapperProcessorAttribute, spanAttrs, resourceAttrs map[string]any) {
|
||||
dst := spanAttrs
|
||||
if rule.Context == FieldContextResource.StringValue() {
|
||||
dst = resourceAttrs
|
||||
}
|
||||
|
||||
for i := range rule.Sources {
|
||||
src := &rule.Sources[i]
|
||||
bare, isResource := strings.CutPrefix(src.Key, resourceSourcePrefix)
|
||||
|
||||
from := spanAttrs
|
||||
if isResource {
|
||||
from = resourceAttrs
|
||||
}
|
||||
val, ok := from[bare]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
dst[rule.Target] = val
|
||||
if src.Action == SpanMapperOperationMove.StringValue() {
|
||||
delete(from, bare)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func cloneAttrs(in map[string]any) map[string]any {
|
||||
out := make(map[string]any, len(in))
|
||||
for k, v := range in {
|
||||
out[k] = v
|
||||
}
|
||||
return out
|
||||
}
|
||||
192
pkg/types/spantypes/spanmappersimulator_test.go
Normal file
192
pkg/types/spantypes/spanmappersimulator_test.go
Normal file
@@ -0,0 +1,192 @@
|
||||
package spantypes
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.opentelemetry.io/collector/pdata/ptrace"
|
||||
)
|
||||
|
||||
func simGroup(name string, attrCond, resCond []string, mappers ...*SpanMapper) *SpanMapperGroupWithMappers {
|
||||
return &SpanMapperGroupWithMappers{
|
||||
Group: &SpanMapperGroup{
|
||||
Name: name,
|
||||
Condition: SpanMapperGroupCondition{Attributes: attrCond, Resource: resCond},
|
||||
Enabled: true,
|
||||
},
|
||||
Mappers: mappers,
|
||||
}
|
||||
}
|
||||
|
||||
func simMapper(target string, ctx FieldContext, sources ...SpanMapperSource) *SpanMapper {
|
||||
return &SpanMapper{
|
||||
Name: target,
|
||||
FieldContext: ctx,
|
||||
Config: SpanMapperConfig{Sources: sources},
|
||||
Enabled: true,
|
||||
}
|
||||
}
|
||||
|
||||
func simAttrSrc(key string, op SpanMapperOperation, priority int) SpanMapperSource {
|
||||
return SpanMapperSource{Key: key, Context: FieldContextSpanAttribute, Operation: op, Priority: priority}
|
||||
}
|
||||
|
||||
func simResSrc(key string, op SpanMapperOperation, priority int) SpanMapperSource {
|
||||
return SpanMapperSource{Key: key, Context: FieldContextResource, Operation: op, Priority: priority}
|
||||
}
|
||||
|
||||
func TestSimulate_MatchInSpanAttrs(t *testing.T) {
|
||||
groups := []*SpanMapperGroupWithMappers{
|
||||
simGroup("llm", []string{"model"}, nil,
|
||||
simMapper("gen_ai.request.model", FieldContextSpanAttribute,
|
||||
simAttrSrc("llm.model", SpanMapperOperationCopy, 1)),
|
||||
),
|
||||
}
|
||||
_, outSpan := SimulateSpanMapping(groups, nil, map[string]any{"llm.model": "gpt-4", "gen_ai.llm.model": "gpt-40"})
|
||||
|
||||
assert.Equal(t, "gpt-4", outSpan["gen_ai.request.model"])
|
||||
}
|
||||
|
||||
func TestSimulate_MatchInResourceAttrs(t *testing.T) {
|
||||
groups := []*SpanMapperGroupWithMappers{
|
||||
simGroup("llm", nil, []string{"service.name"},
|
||||
simMapper("gen_ai.request.model", FieldContextSpanAttribute,
|
||||
simResSrc("service.name", SpanMapperOperationCopy, 1)),
|
||||
),
|
||||
}
|
||||
_, outSpan := SimulateSpanMapping(groups, map[string]any{"service.name": "my-llm-service"}, nil)
|
||||
|
||||
assert.Equal(t, "my-llm-service", outSpan["gen_ai.request.model"])
|
||||
}
|
||||
|
||||
func TestSimulate_NoMatchSkipsGroup(t *testing.T) {
|
||||
groups := []*SpanMapperGroupWithMappers{
|
||||
simGroup("llm", []string{"model"}, nil,
|
||||
simMapper("gen_ai.request.model", FieldContextSpanAttribute,
|
||||
simAttrSrc("llm.model", SpanMapperOperationCopy, 1)),
|
||||
),
|
||||
}
|
||||
_, outSpan := SimulateSpanMapping(groups, nil, map[string]any{"some.other.key": "value"})
|
||||
|
||||
_, ok := outSpan["gen_ai.request.model"]
|
||||
assert.False(t, ok, "target must not be set when condition is not met")
|
||||
}
|
||||
|
||||
func TestSimulate_SourceFirstMatchWins(t *testing.T) {
|
||||
groups := []*SpanMapperGroupWithMappers{
|
||||
simGroup("tokens", []string{"llm"}, nil,
|
||||
simMapper("gen_ai.request.tokens", FieldContextSpanAttribute,
|
||||
simAttrSrc("gen_ai.request_tokens", SpanMapperOperationCopy, 2),
|
||||
simAttrSrc("llm.tokens", SpanMapperOperationCopy, 1)),
|
||||
),
|
||||
}
|
||||
_, outSpan := SimulateSpanMapping(groups, nil, map[string]any{"gen_ai.request_tokens": "100", "llm.tokens": "200"})
|
||||
|
||||
assert.Equal(t, "100", outSpan["gen_ai.request.tokens"])
|
||||
}
|
||||
|
||||
func TestSimulate_SourceFallsBackToSecond(t *testing.T) {
|
||||
groups := []*SpanMapperGroupWithMappers{
|
||||
simGroup("tokens", []string{"llm"}, nil,
|
||||
simMapper("gen_ai.request.tokens", FieldContextSpanAttribute,
|
||||
simAttrSrc("gen_ai.request_tokens", SpanMapperOperationCopy, 2),
|
||||
simAttrSrc("llm.tokens", SpanMapperOperationCopy, 1)),
|
||||
),
|
||||
}
|
||||
_, outSpan := SimulateSpanMapping(groups, nil, map[string]any{"llm.tokens": "200"})
|
||||
|
||||
assert.Equal(t, "200", outSpan["gen_ai.request.tokens"])
|
||||
}
|
||||
|
||||
func TestSimulate_ActionMove(t *testing.T) {
|
||||
groups := []*SpanMapperGroupWithMappers{
|
||||
simGroup("input", []string{"gen_ai"}, nil,
|
||||
simMapper("gen_ai.request.input", FieldContextSpanAttribute,
|
||||
simAttrSrc("gen_ai.input", SpanMapperOperationMove, 1)),
|
||||
),
|
||||
}
|
||||
_, outSpan := SimulateSpanMapping(groups, nil, map[string]any{"gen_ai.input": "hello"})
|
||||
|
||||
assert.Equal(t, "hello", outSpan["gen_ai.request.input"])
|
||||
_, srcPresent := outSpan["gen_ai.input"]
|
||||
assert.False(t, srcPresent, "source key must be deleted when action=move")
|
||||
}
|
||||
|
||||
func TestSimulate_WriteToResourceContext(t *testing.T) {
|
||||
groups := []*SpanMapperGroupWithMappers{
|
||||
simGroup("llm", []string{"llm"}, nil,
|
||||
simMapper("gen_ai.request.model", FieldContextResource,
|
||||
simAttrSrc("llm.model", SpanMapperOperationCopy, 1)),
|
||||
),
|
||||
}
|
||||
outResource, outSpan := SimulateSpanMapping(groups, nil, map[string]any{"llm.model": "gpt-4"})
|
||||
|
||||
assert.Equal(t, "gpt-4", outResource["gen_ai.request.model"], "target must be written to resource attributes")
|
||||
_, inSpan := outSpan["gen_ai.request.model"]
|
||||
assert.False(t, inSpan)
|
||||
}
|
||||
|
||||
func TestSimulate_DisabledGroupsAndMappersSkipped(t *testing.T) {
|
||||
disabledGroup := simGroup("g1", []string{"llm"}, nil,
|
||||
simMapper("gen_ai.request.model", FieldContextSpanAttribute,
|
||||
simAttrSrc("llm.model", SpanMapperOperationCopy, 1)))
|
||||
disabledGroup.Group.Enabled = false
|
||||
|
||||
_, outSpan := SimulateSpanMapping([]*SpanMapperGroupWithMappers{disabledGroup}, nil, map[string]any{"llm.model": "gpt-4"})
|
||||
|
||||
_, ok := outSpan["gen_ai.request.model"]
|
||||
assert.False(t, ok, "disabled groups must not be evaluated")
|
||||
}
|
||||
|
||||
func TestSimulate_DoesNotMutateInput(t *testing.T) {
|
||||
input := map[string]any{"gen_ai.input": "hi"}
|
||||
groups := []*SpanMapperGroupWithMappers{
|
||||
simGroup("input", []string{"gen_ai"}, nil,
|
||||
simMapper("gen_ai.request.input", FieldContextSpanAttribute,
|
||||
simAttrSrc("gen_ai.input", SpanMapperOperationMove, 1))),
|
||||
}
|
||||
_, _ = SimulateSpanMapping(groups, nil, input)
|
||||
|
||||
// Original input map must be untouched (move would have deleted the key).
|
||||
_, ok := input["gen_ai.input"]
|
||||
assert.True(t, ok, "input map must not be mutated by the preview")
|
||||
}
|
||||
|
||||
func TestSimulateOTLP_TransformsSpan(t *testing.T) {
|
||||
groups := []*SpanMapperGroupWithMappers{
|
||||
simGroup("llm", []string{"model"}, nil,
|
||||
simMapper("gen_ai.request.model", FieldContextSpanAttribute,
|
||||
simAttrSrc("llm.model", SpanMapperOperationMove, 1)),
|
||||
),
|
||||
}
|
||||
|
||||
otlp := []byte(`{
|
||||
"resourceSpans": [{
|
||||
"resource": {"attributes": [{"key": "service.name", "value": {"stringValue": "checkout"}}]},
|
||||
"scopeSpans": [{
|
||||
"spans": [{"attributes": [{"key": "llm.model", "value": {"stringValue": "gpt-4"}}]}]
|
||||
}]
|
||||
}]
|
||||
}`)
|
||||
|
||||
out, err := SimulateSpanMappingOTLP(groups, otlp)
|
||||
require.NoError(t, err)
|
||||
|
||||
td, err := (&ptrace.JSONUnmarshaler{}).UnmarshalTraces(out)
|
||||
require.NoError(t, err)
|
||||
|
||||
span := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0)
|
||||
v, ok := span.Attributes().Get("gen_ai.request.model")
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, "gpt-4", v.Str())
|
||||
|
||||
// move must have deleted the source.
|
||||
_, srcPresent := span.Attributes().Get("llm.model")
|
||||
assert.False(t, srcPresent)
|
||||
}
|
||||
|
||||
func TestSimulateOTLP_Invalid(t *testing.T) {
|
||||
_, err := SimulateSpanMappingOTLP(nil, []byte(`{ not json`))
|
||||
assert.Error(t, err)
|
||||
}
|
||||
Reference in New Issue
Block a user