Compare commits

..

23 Commits

Author SHA1 Message Date
nityanandagohain
dc63dc0b28 fix: integration with opamp spanmapping 2026-04-24 19:25:32 +05:30
nityanandagohain
fbb47f7fb9 Merge remote-tracking branch 'origin/main' into issue_4361 2026-04-24 11:46:32 +05:30
nityanandagohain
7aa69a3265 fix: use mustnewuuid 2026-04-24 00:16:29 +05:30
nityanandagohain
6de9ebfcac fix: address comments 2026-04-23 22:14:06 +05:30
nityanandagohain
85c7ce0680 fix: address comments 2026-04-23 18:46:32 +05:30
nityanandagohain
910cbf55cf Merge remote-tracking branch 'origin/main' into issue_4361 2026-04-23 16:24:32 +05:30
nityanandagohain
c30b3870f2 fix: address comments 2026-04-23 16:17:36 +05:30
nityanandagohain
a4da71eb52 fix: more changes 2026-04-22 14:05:56 +05:30
nityanandagohain
f5ed4c90b0 fix: address more comments 2026-04-22 13:54:27 +05:30
nityanandagohain
c03e97221a fix: address comments 2026-04-21 23:31:54 +05:30
nityanandagohain
cd224616f0 fix: address comments 2026-04-21 23:19:49 +05:30
nityanandagohain
50b42aca8c fix: lint issues 2026-04-20 16:09:50 +05:30
nityanandagohain
dbad6cff96 Merge remote-tracking branch 'origin/issue_4361' into issue_4361 2026-04-20 16:09:35 +05:30
nityanandagohain
73a44cee77 Merge remote-tracking branch 'origin/main' into issue_4361 2026-04-20 16:09:12 +05:30
nityanandagohain
b9b7c10af9 fix: remove lint issues 2026-04-20 16:09:09 +05:30
Nityananda Gohain
3d71707f0b Merge branch 'main' into issue_4361 2026-04-20 11:02:57 +05:30
nityanandagohain
f59f611ea2 fix: remove stutters 2026-04-20 11:00:38 +05:30
nityanandagohain
07131f10a9 fix: minor changes 2026-04-20 10:52:17 +05:30
nityanandagohain
d930e77870 fix: cleanup 2026-04-20 10:40:36 +05:30
nityanandagohain
8a9f6bee98 Merge remote-tracking branch 'origin/main' into issue_4361 2026-04-20 09:14:50 +05:30
nityanandagohain
4b06b4546f Merge remote-tracking branch 'origin/issue_4361' into issue_4361 2026-04-12 22:02:12 +05:30
nityanandagohain
526b08a179 feat: 1.types and handler for ai-o11y attribute mapping 2026-04-12 17:32:30 +05:30
nityanandagohain
c7ab610bd8 feat: 1.types and handler for ai-o11y attribute mapping 2026-04-12 17:31:40 +05:30
44 changed files with 3509 additions and 2474 deletions

View File

@@ -51,7 +51,6 @@ jobs:
- role
- rootuser
- serviceaccount
- querier_json_body
- ttl
sqlstore-provider:
- postgres
@@ -62,7 +61,7 @@ jobs:
- 25.5.6
- 25.12.5
schema-migrator-version:
- v0.144.3
- v0.142.0
postgres-version:
- 15
if: |

View File

@@ -2926,8 +2926,8 @@ components:
type: object
PromotetypesWrappedIndex:
properties:
fieldDataType:
$ref: '#/components/schemas/TelemetrytypesFieldDataType'
column_type:
type: string
granularity:
type: integer
type:
@@ -4489,6 +4489,184 @@ components:
type: object
Sigv4SigV4Config:
type: object
SpantypesFieldContext:
enum:
- attribute
- resource
type: string
SpantypesGettableSpanMapperGroups:
properties:
items:
items:
$ref: '#/components/schemas/SpantypesSpanMapperGroup'
type: array
required:
- items
type: object
SpantypesPostableSpanMapper:
properties:
config:
$ref: '#/components/schemas/SpantypesSpanMapperConfig'
enabled:
type: boolean
field_context:
$ref: '#/components/schemas/SpantypesFieldContext'
name:
type: string
required:
- name
- field_context
- config
type: object
SpantypesPostableSpanMapperGroup:
properties:
category:
$ref: '#/components/schemas/SpantypesSpanMapperGroupCategory'
condition:
$ref: '#/components/schemas/SpantypesSpanMapperGroupCondition'
enabled:
type: boolean
name:
type: string
required:
- name
- category
- condition
type: object
SpantypesSpanMapper:
properties:
config:
$ref: '#/components/schemas/SpantypesSpanMapperConfig'
createdAt:
format: date-time
type: string
createdBy:
type: string
enabled:
type: boolean
field_context:
$ref: '#/components/schemas/SpantypesFieldContext'
group_id:
type: string
id:
type: string
name:
type: string
updatedAt:
format: date-time
type: string
updatedBy:
type: string
required:
- id
- group_id
- name
- field_context
- config
- enabled
type: object
SpantypesSpanMapperConfig:
properties:
sources:
items:
$ref: '#/components/schemas/SpantypesSpanMapperSource'
nullable: true
type: array
required:
- sources
type: object
SpantypesSpanMapperGroup:
properties:
category:
$ref: '#/components/schemas/SpantypesSpanMapperGroupCategory'
condition:
$ref: '#/components/schemas/SpantypesSpanMapperGroupCondition'
createdAt:
format: date-time
type: string
createdBy:
type: string
enabled:
type: boolean
id:
type: string
name:
type: string
orgId:
type: string
updatedAt:
format: date-time
type: string
updatedBy:
type: string
required:
- id
- orgId
- name
- category
- condition
- enabled
type: object
SpantypesSpanMapperGroupCategory:
type: object
SpantypesSpanMapperGroupCondition:
properties:
attributes:
items:
type: string
nullable: true
type: array
resource:
items:
type: string
nullable: true
type: array
required:
- attributes
- resource
type: object
SpantypesSpanMapperOperation:
enum:
- move
- copy
type: string
SpantypesSpanMapperSource:
properties:
context:
$ref: '#/components/schemas/SpantypesFieldContext'
key:
type: string
operation:
$ref: '#/components/schemas/SpantypesSpanMapperOperation'
priority:
type: integer
required:
- key
- context
- operation
- priority
type: object
SpantypesUpdatableSpanMapper:
properties:
config:
$ref: '#/components/schemas/SpantypesSpanMapperConfig'
enabled:
nullable: true
type: boolean
field_context:
$ref: '#/components/schemas/SpantypesFieldContext'
type: object
SpantypesUpdatableSpanMapperGroup:
properties:
condition:
$ref: '#/components/schemas/SpantypesSpanMapperGroupCondition'
enabled:
nullable: true
type: boolean
name:
nullable: true
type: string
type: object
TelemetrytypesFieldContext:
enum:
- metric
@@ -9231,6 +9409,487 @@ paths:
summary: Updates my service account
tags:
- serviceaccount
/api/v1/span_mapper_groups:
get:
deprecated: false
description: Returns all span attribute mapping groups for the authenticated
org.
operationId: ListSpanMapperGroups
parameters:
- explode: true
in: query
name: category
schema:
$ref: '#/components/schemas/SpantypesSpanMapperGroupCategory'
style: deepObject
- in: query
name: enabled
schema:
nullable: true
type: boolean
responses:
"200":
content:
application/json:
schema:
properties:
data:
$ref: '#/components/schemas/SpantypesGettableSpanMapperGroups'
status:
type: string
required:
- status
- data
type: object
description: OK
"400":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Bad Request
"401":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unauthorized
"403":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Forbidden
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Internal Server Error
security:
- api_key:
- VIEWER
- tokenizer:
- VIEWER
summary: List span attribute mapping groups
tags:
- spanmapper
post:
deprecated: false
description: Creates a new span attribute mapping group for the org.
operationId: CreateSpanMapperGroup
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/SpantypesPostableSpanMapperGroup'
responses:
"201":
content:
application/json:
schema:
properties:
data:
$ref: '#/components/schemas/SpantypesSpanMapperGroup'
status:
type: string
required:
- status
- data
type: object
description: Created
"400":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Bad Request
"401":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unauthorized
"403":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Forbidden
"409":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Conflict
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Internal Server Error
security:
- api_key:
- ADMIN
- tokenizer:
- ADMIN
summary: Create a span attribute mapping group
tags:
- spanmapper
/api/v1/span_mapper_groups/{groupId}:
delete:
deprecated: false
description: Hard-deletes a mapping group and cascades to all its mappers.
operationId: DeleteSpanMapperGroup
parameters:
- in: path
name: groupId
required: true
schema:
type: string
responses:
"204":
description: No Content
"401":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unauthorized
"403":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Forbidden
"404":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Not Found
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Internal Server Error
security:
- api_key:
- ADMIN
- tokenizer:
- ADMIN
summary: Delete a span attribute mapping group
tags:
- spanmapper
patch:
deprecated: false
description: Partially updates an existing mapping group's name, condition,
or enabled state.
operationId: UpdateSpanMapperGroup
parameters:
- in: path
name: groupId
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/SpantypesUpdatableSpanMapperGroup'
responses:
"204":
description: No Content
"400":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Bad Request
"401":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unauthorized
"403":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Forbidden
"404":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Not Found
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Internal Server Error
security:
- api_key:
- ADMIN
- tokenizer:
- ADMIN
summary: Update a span attribute mapping group
tags:
- spanmapper
/api/v1/span_mapper_groups/{groupId}/span_mappers:
get:
deprecated: false
description: Returns all mappers belonging to a mapping group.
operationId: ListSpanMappers
parameters:
- in: path
name: groupId
required: true
schema:
type: string
responses:
"200":
content:
application/json:
schema:
properties:
data:
$ref: '#/components/schemas/SpantypesGettableSpanMapperGroups'
status:
type: string
required:
- status
- data
type: object
description: OK
"400":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Bad Request
"401":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unauthorized
"403":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Forbidden
"404":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Not Found
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Internal Server Error
security:
- api_key:
- VIEWER
- tokenizer:
- VIEWER
summary: List span mappers for a group
tags:
- spanmapper
post:
deprecated: false
description: Adds a new mapper to the specified mapping group.
operationId: CreateSpanMapper
parameters:
- in: path
name: groupId
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/SpantypesPostableSpanMapper'
responses:
"201":
content:
application/json:
schema:
properties:
data:
$ref: '#/components/schemas/SpantypesSpanMapper'
status:
type: string
required:
- status
- data
type: object
description: Created
"400":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Bad Request
"401":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unauthorized
"403":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Forbidden
"404":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Not Found
"409":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Conflict
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Internal Server Error
security:
- api_key:
- ADMIN
- tokenizer:
- ADMIN
summary: Create a span mapper
tags:
- spanmapper
/api/v1/span_mapper_groups/{groupId}/span_mappers/{mapperId}:
delete:
deprecated: false
description: Hard-deletes a mapper from a mapping group.
operationId: DeleteSpanMapper
parameters:
- in: path
name: groupId
required: true
schema:
type: string
- in: path
name: mapperId
required: true
schema:
type: string
responses:
"204":
description: No Content
"401":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unauthorized
"403":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Forbidden
"404":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Not Found
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Internal Server Error
security:
- api_key:
- ADMIN
- tokenizer:
- ADMIN
summary: Delete a span mapper
tags:
- spanmapper
patch:
deprecated: false
description: Partially updates an existing mapper's field context, config, or
enabled state.
operationId: UpdateSpanMapper
parameters:
- in: path
name: groupId
required: true
schema:
type: string
- in: path
name: mapperId
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/SpantypesUpdatableSpanMapper'
responses:
"204":
description: No Content
"400":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Bad Request
"401":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unauthorized
"403":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Forbidden
"404":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Not Found
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Internal Server Error
security:
- api_key:
- ADMIN
- tokenizer:
- ADMIN
summary: Update a span mapper
tags:
- spanmapper
/api/v1/testChannel:
post:
deprecated: true

View File

@@ -12,6 +12,7 @@ import (
"github.com/SigNoz/signoz/pkg/cache/memorycache"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/modules/spanmapper/implspanmapper"
"github.com/gorilla/handlers"
@@ -111,9 +112,11 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
}
// initiate agent config handler
spanAttrMappingFeature := implspanmapper.NewSpanAttrMappingFeature(signoz.Modules.SpanMapper)
agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{
Store: signoz.SQLStore,
AgentFeatures: []agentConf.AgentFeature{logParsingPipelineController},
AgentFeatures: []agentConf.AgentFeature{logParsingPipelineController, spanAttrMappingFeature},
})
if err != nil {
return nil, err

View File

@@ -3696,7 +3696,10 @@ export interface PromotetypesPromotePathDTO {
}
export interface PromotetypesWrappedIndexDTO {
fieldDataType?: TelemetrytypesFieldDataTypeDTO;
/**
* @type string
*/
column_type?: string;
/**
* @type integer
*/
@@ -5467,6 +5470,187 @@ export interface Sigv4SigV4ConfigDTO {
[key: string]: unknown;
}
export enum SpantypesFieldContextDTO {
attribute = 'attribute',
resource = 'resource',
}
export interface SpantypesGettableSpanMapperGroupsDTO {
/**
* @type array
*/
items: SpantypesSpanMapperGroupDTO[];
}
export interface SpantypesPostableSpanMapperDTO {
config: SpantypesSpanMapperConfigDTO;
/**
* @type boolean
*/
enabled?: boolean;
field_context: SpantypesFieldContextDTO;
/**
* @type string
*/
name: string;
}
export interface SpantypesPostableSpanMapperGroupDTO {
category: SpantypesSpanMapperGroupCategoryDTO;
condition: SpantypesSpanMapperGroupConditionDTO;
/**
* @type boolean
*/
enabled?: boolean;
/**
* @type string
*/
name: string;
}
export interface SpantypesSpanMapperDTO {
config: SpantypesSpanMapperConfigDTO;
/**
* @type string
* @format date-time
*/
createdAt?: Date;
/**
* @type string
*/
createdBy?: string;
/**
* @type boolean
*/
enabled: boolean;
field_context: SpantypesFieldContextDTO;
/**
* @type string
*/
group_id: string;
/**
* @type string
*/
id: string;
/**
* @type string
*/
name: string;
/**
* @type string
* @format date-time
*/
updatedAt?: Date;
/**
* @type string
*/
updatedBy?: string;
}
export interface SpantypesSpanMapperConfigDTO {
/**
* @type array
* @nullable true
*/
sources: SpantypesSpanMapperSourceDTO[] | null;
}
export interface SpantypesSpanMapperGroupDTO {
category: SpantypesSpanMapperGroupCategoryDTO;
condition: SpantypesSpanMapperGroupConditionDTO;
/**
* @type string
* @format date-time
*/
createdAt?: Date;
/**
* @type string
*/
createdBy?: string;
/**
* @type boolean
*/
enabled: boolean;
/**
* @type string
*/
id: string;
/**
* @type string
*/
name: string;
/**
* @type string
*/
orgId: string;
/**
* @type string
* @format date-time
*/
updatedAt?: Date;
/**
* @type string
*/
updatedBy?: string;
}
export interface SpantypesSpanMapperGroupCategoryDTO {
[key: string]: unknown;
}
export interface SpantypesSpanMapperGroupConditionDTO {
/**
* @type array
* @nullable true
*/
attributes: string[] | null;
/**
* @type array
* @nullable true
*/
resource: string[] | null;
}
export enum SpantypesSpanMapperOperationDTO {
move = 'move',
copy = 'copy',
}
export interface SpantypesSpanMapperSourceDTO {
context: SpantypesFieldContextDTO;
/**
* @type string
*/
key: string;
operation: SpantypesSpanMapperOperationDTO;
/**
* @type integer
*/
priority: number;
}
export interface SpantypesUpdatableSpanMapperDTO {
config?: SpantypesSpanMapperConfigDTO;
/**
* @type boolean
* @nullable true
*/
enabled?: boolean | null;
field_context?: SpantypesFieldContextDTO;
}
export interface SpantypesUpdatableSpanMapperGroupDTO {
condition?: SpantypesSpanMapperGroupConditionDTO;
/**
* @type boolean
* @nullable true
*/
enabled?: boolean | null;
/**
* @type string
* @nullable true
*/
name?: string | null;
}
export enum TelemetrytypesFieldContextDTO {
metric = 'metric',
log = 'log',
@@ -6917,6 +7101,71 @@ export type GetMyServiceAccount200 = {
status: string;
};
export type ListSpanMapperGroupsParams = {
/**
* @description undefined
*/
category?: SpantypesSpanMapperGroupCategoryDTO;
/**
* @type boolean
* @nullable true
* @description undefined
*/
enabled?: boolean | null;
};
export type ListSpanMapperGroups200 = {
data: SpantypesGettableSpanMapperGroupsDTO;
/**
* @type string
*/
status: string;
};
export type CreateSpanMapperGroup201 = {
data: SpantypesSpanMapperGroupDTO;
/**
* @type string
*/
status: string;
};
export type DeleteSpanMapperGroupPathParameters = {
groupId: string;
};
export type UpdateSpanMapperGroupPathParameters = {
groupId: string;
};
export type ListSpanMappersPathParameters = {
groupId: string;
};
export type ListSpanMappers200 = {
data: SpantypesGettableSpanMapperGroupsDTO;
/**
* @type string
*/
status: string;
};
export type CreateSpanMapperPathParameters = {
groupId: string;
};
export type CreateSpanMapper201 = {
data: SpantypesSpanMapperDTO;
/**
* @type string
*/
status: string;
};
export type DeleteSpanMapperPathParameters = {
groupId: string;
mapperId: string;
};
export type UpdateSpanMapperPathParameters = {
groupId: string;
mapperId: string;
};
export type ListUsersDeprecated200 = {
/**
* @type array

View File

@@ -0,0 +1,787 @@
/**
* ! Do not edit manually
* * The file has been auto-generated using Orval for SigNoz
* * regenerate with 'yarn generate:api'
* SigNoz
*/
import { useMutation, useQuery } from 'react-query';
import type {
InvalidateOptions,
MutationFunction,
QueryClient,
QueryFunction,
QueryKey,
UseMutationOptions,
UseMutationResult,
UseQueryOptions,
UseQueryResult,
} from 'react-query';
import type {
CreateSpanMapper201,
CreateSpanMapperGroup201,
CreateSpanMapperPathParameters,
DeleteSpanMapperGroupPathParameters,
DeleteSpanMapperPathParameters,
ListSpanMapperGroups200,
ListSpanMapperGroupsParams,
ListSpanMappers200,
ListSpanMappersPathParameters,
RenderErrorResponseDTO,
SpantypesPostableSpanMapperDTO,
SpantypesPostableSpanMapperGroupDTO,
SpantypesUpdatableSpanMapperDTO,
SpantypesUpdatableSpanMapperGroupDTO,
UpdateSpanMapperGroupPathParameters,
UpdateSpanMapperPathParameters,
} from '../sigNoz.schemas';
import { GeneratedAPIInstance } from '../../../generatedAPIInstance';
import type { ErrorType, BodyType } from '../../../generatedAPIInstance';
/**
* Returns all span attribute mapping groups for the authenticated org.
* @summary List span attribute mapping groups
*/
export const listSpanMapperGroups = (
params?: ListSpanMapperGroupsParams,
signal?: AbortSignal,
) => {
return GeneratedAPIInstance<ListSpanMapperGroups200>({
url: `/api/v1/span_mapper_groups`,
method: 'GET',
params,
signal,
});
};
export const getListSpanMapperGroupsQueryKey = (
params?: ListSpanMapperGroupsParams,
) => {
return [`/api/v1/span_mapper_groups`, ...(params ? [params] : [])] as const;
};
export const getListSpanMapperGroupsQueryOptions = <
TData = Awaited<ReturnType<typeof listSpanMapperGroups>>,
TError = ErrorType<RenderErrorResponseDTO>,
>(
params?: ListSpanMapperGroupsParams,
options?: {
query?: UseQueryOptions<
Awaited<ReturnType<typeof listSpanMapperGroups>>,
TError,
TData
>;
},
) => {
const { query: queryOptions } = options ?? {};
const queryKey =
queryOptions?.queryKey ?? getListSpanMapperGroupsQueryKey(params);
const queryFn: QueryFunction<
Awaited<ReturnType<typeof listSpanMapperGroups>>
> = ({ signal }) => listSpanMapperGroups(params, signal);
return { queryKey, queryFn, ...queryOptions } as UseQueryOptions<
Awaited<ReturnType<typeof listSpanMapperGroups>>,
TError,
TData
> & { queryKey: QueryKey };
};
export type ListSpanMapperGroupsQueryResult = NonNullable<
Awaited<ReturnType<typeof listSpanMapperGroups>>
>;
export type ListSpanMapperGroupsQueryError = ErrorType<RenderErrorResponseDTO>;
/**
* @summary List span attribute mapping groups
*/
export function useListSpanMapperGroups<
TData = Awaited<ReturnType<typeof listSpanMapperGroups>>,
TError = ErrorType<RenderErrorResponseDTO>,
>(
params?: ListSpanMapperGroupsParams,
options?: {
query?: UseQueryOptions<
Awaited<ReturnType<typeof listSpanMapperGroups>>,
TError,
TData
>;
},
): UseQueryResult<TData, TError> & { queryKey: QueryKey } {
const queryOptions = getListSpanMapperGroupsQueryOptions(params, options);
const query = useQuery(queryOptions) as UseQueryResult<TData, TError> & {
queryKey: QueryKey;
};
query.queryKey = queryOptions.queryKey;
return query;
}
/**
* @summary List span attribute mapping groups
*/
export const invalidateListSpanMapperGroups = async (
queryClient: QueryClient,
params?: ListSpanMapperGroupsParams,
options?: InvalidateOptions,
): Promise<QueryClient> => {
await queryClient.invalidateQueries(
{ queryKey: getListSpanMapperGroupsQueryKey(params) },
options,
);
return queryClient;
};
/**
* Creates a new span attribute mapping group for the org.
* @summary Create a span attribute mapping group
*/
export const createSpanMapperGroup = (
spantypesPostableSpanMapperGroupDTO: BodyType<SpantypesPostableSpanMapperGroupDTO>,
signal?: AbortSignal,
) => {
return GeneratedAPIInstance<CreateSpanMapperGroup201>({
url: `/api/v1/span_mapper_groups`,
method: 'POST',
headers: { 'Content-Type': 'application/json' },
data: spantypesPostableSpanMapperGroupDTO,
signal,
});
};
export const getCreateSpanMapperGroupMutationOptions = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof createSpanMapperGroup>>,
TError,
{ data: BodyType<SpantypesPostableSpanMapperGroupDTO> },
TContext
>;
}): UseMutationOptions<
Awaited<ReturnType<typeof createSpanMapperGroup>>,
TError,
{ data: BodyType<SpantypesPostableSpanMapperGroupDTO> },
TContext
> => {
const mutationKey = ['createSpanMapperGroup'];
const { mutation: mutationOptions } = options
? options.mutation &&
'mutationKey' in options.mutation &&
options.mutation.mutationKey
? options
: { ...options, mutation: { ...options.mutation, mutationKey } }
: { mutation: { mutationKey } };
const mutationFn: MutationFunction<
Awaited<ReturnType<typeof createSpanMapperGroup>>,
{ data: BodyType<SpantypesPostableSpanMapperGroupDTO> }
> = (props) => {
const { data } = props ?? {};
return createSpanMapperGroup(data);
};
return { mutationFn, ...mutationOptions };
};
export type CreateSpanMapperGroupMutationResult = NonNullable<
Awaited<ReturnType<typeof createSpanMapperGroup>>
>;
export type CreateSpanMapperGroupMutationBody =
BodyType<SpantypesPostableSpanMapperGroupDTO>;
export type CreateSpanMapperGroupMutationError =
ErrorType<RenderErrorResponseDTO>;
/**
* @summary Create a span attribute mapping group
*/
export const useCreateSpanMapperGroup = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof createSpanMapperGroup>>,
TError,
{ data: BodyType<SpantypesPostableSpanMapperGroupDTO> },
TContext
>;
}): UseMutationResult<
Awaited<ReturnType<typeof createSpanMapperGroup>>,
TError,
{ data: BodyType<SpantypesPostableSpanMapperGroupDTO> },
TContext
> => {
const mutationOptions = getCreateSpanMapperGroupMutationOptions(options);
return useMutation(mutationOptions);
};
/**
* Hard-deletes a mapping group and cascades to all its mappers.
* @summary Delete a span attribute mapping group
*/
export const deleteSpanMapperGroup = ({
groupId,
}: DeleteSpanMapperGroupPathParameters) => {
return GeneratedAPIInstance<void>({
url: `/api/v1/span_mapper_groups/${groupId}`,
method: 'DELETE',
});
};
export const getDeleteSpanMapperGroupMutationOptions = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof deleteSpanMapperGroup>>,
TError,
{ pathParams: DeleteSpanMapperGroupPathParameters },
TContext
>;
}): UseMutationOptions<
Awaited<ReturnType<typeof deleteSpanMapperGroup>>,
TError,
{ pathParams: DeleteSpanMapperGroupPathParameters },
TContext
> => {
const mutationKey = ['deleteSpanMapperGroup'];
const { mutation: mutationOptions } = options
? options.mutation &&
'mutationKey' in options.mutation &&
options.mutation.mutationKey
? options
: { ...options, mutation: { ...options.mutation, mutationKey } }
: { mutation: { mutationKey } };
const mutationFn: MutationFunction<
Awaited<ReturnType<typeof deleteSpanMapperGroup>>,
{ pathParams: DeleteSpanMapperGroupPathParameters }
> = (props) => {
const { pathParams } = props ?? {};
return deleteSpanMapperGroup(pathParams);
};
return { mutationFn, ...mutationOptions };
};
export type DeleteSpanMapperGroupMutationResult = NonNullable<
Awaited<ReturnType<typeof deleteSpanMapperGroup>>
>;
export type DeleteSpanMapperGroupMutationError =
ErrorType<RenderErrorResponseDTO>;
/**
* @summary Delete a span attribute mapping group
*/
export const useDeleteSpanMapperGroup = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof deleteSpanMapperGroup>>,
TError,
{ pathParams: DeleteSpanMapperGroupPathParameters },
TContext
>;
}): UseMutationResult<
Awaited<ReturnType<typeof deleteSpanMapperGroup>>,
TError,
{ pathParams: DeleteSpanMapperGroupPathParameters },
TContext
> => {
const mutationOptions = getDeleteSpanMapperGroupMutationOptions(options);
return useMutation(mutationOptions);
};
/**
* Partially updates an existing mapping group's name, condition, or enabled state.
* @summary Update a span attribute mapping group
*/
export const updateSpanMapperGroup = (
{ groupId }: UpdateSpanMapperGroupPathParameters,
spantypesUpdatableSpanMapperGroupDTO: BodyType<SpantypesUpdatableSpanMapperGroupDTO>,
) => {
return GeneratedAPIInstance<void>({
url: `/api/v1/span_mapper_groups/${groupId}`,
method: 'PATCH',
headers: { 'Content-Type': 'application/json' },
data: spantypesUpdatableSpanMapperGroupDTO,
});
};
export const getUpdateSpanMapperGroupMutationOptions = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof updateSpanMapperGroup>>,
TError,
{
pathParams: UpdateSpanMapperGroupPathParameters;
data: BodyType<SpantypesUpdatableSpanMapperGroupDTO>;
},
TContext
>;
}): UseMutationOptions<
Awaited<ReturnType<typeof updateSpanMapperGroup>>,
TError,
{
pathParams: UpdateSpanMapperGroupPathParameters;
data: BodyType<SpantypesUpdatableSpanMapperGroupDTO>;
},
TContext
> => {
const mutationKey = ['updateSpanMapperGroup'];
const { mutation: mutationOptions } = options
? options.mutation &&
'mutationKey' in options.mutation &&
options.mutation.mutationKey
? options
: { ...options, mutation: { ...options.mutation, mutationKey } }
: { mutation: { mutationKey } };
const mutationFn: MutationFunction<
Awaited<ReturnType<typeof updateSpanMapperGroup>>,
{
pathParams: UpdateSpanMapperGroupPathParameters;
data: BodyType<SpantypesUpdatableSpanMapperGroupDTO>;
}
> = (props) => {
const { pathParams, data } = props ?? {};
return updateSpanMapperGroup(pathParams, data);
};
return { mutationFn, ...mutationOptions };
};
export type UpdateSpanMapperGroupMutationResult = NonNullable<
Awaited<ReturnType<typeof updateSpanMapperGroup>>
>;
export type UpdateSpanMapperGroupMutationBody =
BodyType<SpantypesUpdatableSpanMapperGroupDTO>;
export type UpdateSpanMapperGroupMutationError =
ErrorType<RenderErrorResponseDTO>;
/**
* @summary Update a span attribute mapping group
*/
export const useUpdateSpanMapperGroup = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof updateSpanMapperGroup>>,
TError,
{
pathParams: UpdateSpanMapperGroupPathParameters;
data: BodyType<SpantypesUpdatableSpanMapperGroupDTO>;
},
TContext
>;
}): UseMutationResult<
Awaited<ReturnType<typeof updateSpanMapperGroup>>,
TError,
{
pathParams: UpdateSpanMapperGroupPathParameters;
data: BodyType<SpantypesUpdatableSpanMapperGroupDTO>;
},
TContext
> => {
const mutationOptions = getUpdateSpanMapperGroupMutationOptions(options);
return useMutation(mutationOptions);
};
/**
* Returns all mappers belonging to a mapping group.
* @summary List span mappers for a group
*/
export const listSpanMappers = (
{ groupId }: ListSpanMappersPathParameters,
signal?: AbortSignal,
) => {
return GeneratedAPIInstance<ListSpanMappers200>({
url: `/api/v1/span_mapper_groups/${groupId}/span_mappers`,
method: 'GET',
signal,
});
};
export const getListSpanMappersQueryKey = ({
groupId,
}: ListSpanMappersPathParameters) => {
return [`/api/v1/span_mapper_groups/${groupId}/span_mappers`] as const;
};
export const getListSpanMappersQueryOptions = <
TData = Awaited<ReturnType<typeof listSpanMappers>>,
TError = ErrorType<RenderErrorResponseDTO>,
>(
{ groupId }: ListSpanMappersPathParameters,
options?: {
query?: UseQueryOptions<
Awaited<ReturnType<typeof listSpanMappers>>,
TError,
TData
>;
},
) => {
const { query: queryOptions } = options ?? {};
const queryKey =
queryOptions?.queryKey ?? getListSpanMappersQueryKey({ groupId });
const queryFn: QueryFunction<Awaited<ReturnType<typeof listSpanMappers>>> = ({
signal,
}) => listSpanMappers({ groupId }, signal);
return {
queryKey,
queryFn,
enabled: !!groupId,
...queryOptions,
} as UseQueryOptions<
Awaited<ReturnType<typeof listSpanMappers>>,
TError,
TData
> & { queryKey: QueryKey };
};
export type ListSpanMappersQueryResult = NonNullable<
Awaited<ReturnType<typeof listSpanMappers>>
>;
export type ListSpanMappersQueryError = ErrorType<RenderErrorResponseDTO>;
/**
* @summary List span mappers for a group
*/
export function useListSpanMappers<
TData = Awaited<ReturnType<typeof listSpanMappers>>,
TError = ErrorType<RenderErrorResponseDTO>,
>(
{ groupId }: ListSpanMappersPathParameters,
options?: {
query?: UseQueryOptions<
Awaited<ReturnType<typeof listSpanMappers>>,
TError,
TData
>;
},
): UseQueryResult<TData, TError> & { queryKey: QueryKey } {
const queryOptions = getListSpanMappersQueryOptions({ groupId }, options);
const query = useQuery(queryOptions) as UseQueryResult<TData, TError> & {
queryKey: QueryKey;
};
query.queryKey = queryOptions.queryKey;
return query;
}
/**
* @summary List span mappers for a group
*/
export const invalidateListSpanMappers = async (
queryClient: QueryClient,
{ groupId }: ListSpanMappersPathParameters,
options?: InvalidateOptions,
): Promise<QueryClient> => {
await queryClient.invalidateQueries(
{ queryKey: getListSpanMappersQueryKey({ groupId }) },
options,
);
return queryClient;
};
/**
* Adds a new mapper to the specified mapping group.
* @summary Create a span mapper
*/
export const createSpanMapper = (
{ groupId }: CreateSpanMapperPathParameters,
spantypesPostableSpanMapperDTO: BodyType<SpantypesPostableSpanMapperDTO>,
signal?: AbortSignal,
) => {
return GeneratedAPIInstance<CreateSpanMapper201>({
url: `/api/v1/span_mapper_groups/${groupId}/span_mappers`,
method: 'POST',
headers: { 'Content-Type': 'application/json' },
data: spantypesPostableSpanMapperDTO,
signal,
});
};
export const getCreateSpanMapperMutationOptions = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof createSpanMapper>>,
TError,
{
pathParams: CreateSpanMapperPathParameters;
data: BodyType<SpantypesPostableSpanMapperDTO>;
},
TContext
>;
}): UseMutationOptions<
Awaited<ReturnType<typeof createSpanMapper>>,
TError,
{
pathParams: CreateSpanMapperPathParameters;
data: BodyType<SpantypesPostableSpanMapperDTO>;
},
TContext
> => {
const mutationKey = ['createSpanMapper'];
const { mutation: mutationOptions } = options
? options.mutation &&
'mutationKey' in options.mutation &&
options.mutation.mutationKey
? options
: { ...options, mutation: { ...options.mutation, mutationKey } }
: { mutation: { mutationKey } };
const mutationFn: MutationFunction<
Awaited<ReturnType<typeof createSpanMapper>>,
{
pathParams: CreateSpanMapperPathParameters;
data: BodyType<SpantypesPostableSpanMapperDTO>;
}
> = (props) => {
const { pathParams, data } = props ?? {};
return createSpanMapper(pathParams, data);
};
return { mutationFn, ...mutationOptions };
};
export type CreateSpanMapperMutationResult = NonNullable<
Awaited<ReturnType<typeof createSpanMapper>>
>;
export type CreateSpanMapperMutationBody =
BodyType<SpantypesPostableSpanMapperDTO>;
export type CreateSpanMapperMutationError = ErrorType<RenderErrorResponseDTO>;
/**
* @summary Create a span mapper
*/
export const useCreateSpanMapper = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof createSpanMapper>>,
TError,
{
pathParams: CreateSpanMapperPathParameters;
data: BodyType<SpantypesPostableSpanMapperDTO>;
},
TContext
>;
}): UseMutationResult<
Awaited<ReturnType<typeof createSpanMapper>>,
TError,
{
pathParams: CreateSpanMapperPathParameters;
data: BodyType<SpantypesPostableSpanMapperDTO>;
},
TContext
> => {
const mutationOptions = getCreateSpanMapperMutationOptions(options);
return useMutation(mutationOptions);
};
/**
* Hard-deletes a mapper from a mapping group.
* @summary Delete a span mapper
*/
export const deleteSpanMapper = ({
groupId,
mapperId,
}: DeleteSpanMapperPathParameters) => {
return GeneratedAPIInstance<void>({
url: `/api/v1/span_mapper_groups/${groupId}/span_mappers/${mapperId}`,
method: 'DELETE',
});
};
export const getDeleteSpanMapperMutationOptions = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof deleteSpanMapper>>,
TError,
{ pathParams: DeleteSpanMapperPathParameters },
TContext
>;
}): UseMutationOptions<
Awaited<ReturnType<typeof deleteSpanMapper>>,
TError,
{ pathParams: DeleteSpanMapperPathParameters },
TContext
> => {
const mutationKey = ['deleteSpanMapper'];
const { mutation: mutationOptions } = options
? options.mutation &&
'mutationKey' in options.mutation &&
options.mutation.mutationKey
? options
: { ...options, mutation: { ...options.mutation, mutationKey } }
: { mutation: { mutationKey } };
const mutationFn: MutationFunction<
Awaited<ReturnType<typeof deleteSpanMapper>>,
{ pathParams: DeleteSpanMapperPathParameters }
> = (props) => {
const { pathParams } = props ?? {};
return deleteSpanMapper(pathParams);
};
return { mutationFn, ...mutationOptions };
};
export type DeleteSpanMapperMutationResult = NonNullable<
Awaited<ReturnType<typeof deleteSpanMapper>>
>;
export type DeleteSpanMapperMutationError = ErrorType<RenderErrorResponseDTO>;
/**
* @summary Delete a span mapper
*/
export const useDeleteSpanMapper = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof deleteSpanMapper>>,
TError,
{ pathParams: DeleteSpanMapperPathParameters },
TContext
>;
}): UseMutationResult<
Awaited<ReturnType<typeof deleteSpanMapper>>,
TError,
{ pathParams: DeleteSpanMapperPathParameters },
TContext
> => {
const mutationOptions = getDeleteSpanMapperMutationOptions(options);
return useMutation(mutationOptions);
};
/**
* Partially updates an existing mapper's field context, config, or enabled state.
* @summary Update a span mapper
*/
export const updateSpanMapper = (
{ groupId, mapperId }: UpdateSpanMapperPathParameters,
spantypesUpdatableSpanMapperDTO: BodyType<SpantypesUpdatableSpanMapperDTO>,
) => {
return GeneratedAPIInstance<void>({
url: `/api/v1/span_mapper_groups/${groupId}/span_mappers/${mapperId}`,
method: 'PATCH',
headers: { 'Content-Type': 'application/json' },
data: spantypesUpdatableSpanMapperDTO,
});
};
export const getUpdateSpanMapperMutationOptions = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof updateSpanMapper>>,
TError,
{
pathParams: UpdateSpanMapperPathParameters;
data: BodyType<SpantypesUpdatableSpanMapperDTO>;
},
TContext
>;
}): UseMutationOptions<
Awaited<ReturnType<typeof updateSpanMapper>>,
TError,
{
pathParams: UpdateSpanMapperPathParameters;
data: BodyType<SpantypesUpdatableSpanMapperDTO>;
},
TContext
> => {
const mutationKey = ['updateSpanMapper'];
const { mutation: mutationOptions } = options
? options.mutation &&
'mutationKey' in options.mutation &&
options.mutation.mutationKey
? options
: { ...options, mutation: { ...options.mutation, mutationKey } }
: { mutation: { mutationKey } };
const mutationFn: MutationFunction<
Awaited<ReturnType<typeof updateSpanMapper>>,
{
pathParams: UpdateSpanMapperPathParameters;
data: BodyType<SpantypesUpdatableSpanMapperDTO>;
}
> = (props) => {
const { pathParams, data } = props ?? {};
return updateSpanMapper(pathParams, data);
};
return { mutationFn, ...mutationOptions };
};
export type UpdateSpanMapperMutationResult = NonNullable<
Awaited<ReturnType<typeof updateSpanMapper>>
>;
export type UpdateSpanMapperMutationBody =
BodyType<SpantypesUpdatableSpanMapperDTO>;
export type UpdateSpanMapperMutationError = ErrorType<RenderErrorResponseDTO>;
/**
* @summary Update a span mapper
*/
export const useUpdateSpanMapper = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof updateSpanMapper>>,
TError,
{
pathParams: UpdateSpanMapperPathParameters;
data: BodyType<SpantypesUpdatableSpanMapperDTO>;
},
TContext
>;
}): UseMutationResult<
Awaited<ReturnType<typeof updateSpanMapper>>,
TError,
{
pathParams: UpdateSpanMapperPathParameters;
data: BodyType<SpantypesUpdatableSpanMapperDTO>;
},
TContext
> => {
const mutationOptions = getUpdateSpanMapperMutationOptions(options);
return useMutation(mutationOptions);
};

2
go.mod
View File

@@ -8,7 +8,7 @@ require (
github.com/ClickHouse/clickhouse-go/v2 v2.40.1
github.com/DATA-DOG/go-sqlmock v1.5.2
github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd
github.com/SigNoz/signoz-otel-collector v0.144.3
github.com/SigNoz/signoz-otel-collector v0.144.3-rc.4
github.com/antlr4-go/antlr/v4 v4.13.1
github.com/antonmedv/expr v1.15.3
github.com/cespare/xxhash/v2 v2.3.0

4
go.sum
View File

@@ -108,8 +108,8 @@ github.com/SigNoz/expr v1.17.7-beta h1:FyZkleM5dTQ0O6muQfwGpoH5A2ohmN/XTasRCO72g
github.com/SigNoz/expr v1.17.7-beta/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4=
github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd h1:Bk43AsDYe0fhkbj57eGXx8H3ZJ4zhmQXBnrW523ktj8=
github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd/go.mod h1:nxRcH/OEdM8QxzH37xkGzomr1O0JpYBRS6pwjsWW6Pc=
github.com/SigNoz/signoz-otel-collector v0.144.3 h1:/7PPIqIpPsaWtrgnfHam2XVYP41ZlgEKLHzQO8oVxcA=
github.com/SigNoz/signoz-otel-collector v0.144.3/go.mod h1:9pLVpcIQvUT88ZiNnZN/MI+XLruvwC+Xm2UzPmGjNfA=
github.com/SigNoz/signoz-otel-collector v0.144.3-rc.4 h1:EskJkEMfuuIyArWhV8SleDV/fuKxiaEGTXrCZIFqDT4=
github.com/SigNoz/signoz-otel-collector v0.144.3-rc.4/go.mod h1:9pLVpcIQvUT88ZiNnZN/MI+XLruvwC+Xm2UzPmGjNfA=
github.com/Yiling-J/theine-go v0.6.2 h1:1GeoXeQ0O0AUkiwj2S9Jc0Mzx+hpqzmqsJ4kIC4M9AY=
github.com/Yiling-J/theine-go v0.6.2/go.mod h1:08QpMa5JZ2pKN+UJCRrCasWYO1IKCdl54Xa836rpmDU=
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c=

View File

@@ -16,8 +16,8 @@ import (
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
"github.com/SigNoz/signoz/pkg/modules/fields"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/inframonitoring"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/preference"
"github.com/SigNoz/signoz/pkg/modules/promote"
@@ -25,6 +25,7 @@ import (
"github.com/SigNoz/signoz/pkg/modules/rulestatehistory"
"github.com/SigNoz/signoz/pkg/modules/serviceaccount"
"github.com/SigNoz/signoz/pkg/modules/session"
"github.com/SigNoz/signoz/pkg/modules/spanmapper"
"github.com/SigNoz/signoz/pkg/modules/tracedetail"
"github.com/SigNoz/signoz/pkg/modules/user"
"github.com/SigNoz/signoz/pkg/querier"
@@ -62,6 +63,7 @@ type provider struct {
factoryHandler factory.Handler
cloudIntegrationHandler cloudintegration.Handler
ruleStateHistoryHandler rulestatehistory.Handler
spanMapperHandler spanmapper.Handler
alertmanagerHandler alertmanager.Handler
traceDetailHandler tracedetail.Handler
rulerHandler ruler.Handler
@@ -92,6 +94,7 @@ func NewFactory(
factoryHandler factory.Handler,
cloudIntegrationHandler cloudintegration.Handler,
ruleStateHistoryHandler rulestatehistory.Handler,
spanMapperHandler spanmapper.Handler,
alertmanagerHandler alertmanager.Handler,
traceDetailHandler tracedetail.Handler,
rulerHandler ruler.Handler,
@@ -125,6 +128,7 @@ func NewFactory(
factoryHandler,
cloudIntegrationHandler,
ruleStateHistoryHandler,
spanMapperHandler,
alertmanagerHandler,
traceDetailHandler,
rulerHandler,
@@ -160,6 +164,7 @@ func newProvider(
factoryHandler factory.Handler,
cloudIntegrationHandler cloudintegration.Handler,
ruleStateHistoryHandler rulestatehistory.Handler,
spanMapperHandler spanmapper.Handler,
alertmanagerHandler alertmanager.Handler,
traceDetailHandler tracedetail.Handler,
rulerHandler ruler.Handler,
@@ -193,6 +198,7 @@ func newProvider(
factoryHandler: factoryHandler,
cloudIntegrationHandler: cloudIntegrationHandler,
ruleStateHistoryHandler: ruleStateHistoryHandler,
spanMapperHandler: spanMapperHandler,
alertmanagerHandler: alertmanagerHandler,
traceDetailHandler: traceDetailHandler,
rulerHandler: rulerHandler,
@@ -300,6 +306,10 @@ func (provider *provider) AddToRouter(router *mux.Router) error {
return err
}
if err := provider.addSpanMapperRoutes(router); err != nil {
return err
}
if err := provider.addAlertmanagerRoutes(router); err != nil {
return err
}

View File

@@ -0,0 +1,171 @@
package signozapiserver
import (
"net/http"
"github.com/SigNoz/signoz/pkg/http/handler"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/spantypes"
"github.com/gorilla/mux"
)
func (provider *provider) addSpanMapperRoutes(router *mux.Router) error {
if err := router.Handle("/api/v1/span_mapper_groups", handler.New(
provider.authZ.ViewAccess(provider.spanMapperHandler.ListGroups),
handler.OpenAPIDef{
ID: "ListSpanMapperGroups",
Tags: []string{"spanmapper"},
Summary: "List span attribute mapping groups",
Description: "Returns all span attribute mapping groups for the authenticated org.",
Request: nil,
RequestContentType: "",
RequestQuery: new(spantypes.ListSpanMapperGroupsQuery),
Response: new(spantypes.GettableSpanMapperGroups),
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusOK,
ErrorStatusCodes: []int{http.StatusBadRequest},
Deprecated: false,
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
},
)).Methods(http.MethodGet).GetError(); err != nil {
return err
}
if err := router.Handle("/api/v1/span_mapper_groups", handler.New(
provider.authZ.AdminAccess(provider.spanMapperHandler.CreateGroup),
handler.OpenAPIDef{
ID: "CreateSpanMapperGroup",
Tags: []string{"spanmapper"},
Summary: "Create a span attribute mapping group",
Description: "Creates a new span attribute mapping group for the org.",
Request: new(spantypes.PostableSpanMapperGroup),
RequestContentType: "application/json",
Response: new(spantypes.GettableSpanMapperGroup),
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusCreated,
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusConflict},
Deprecated: false,
SecuritySchemes: newSecuritySchemes(types.RoleAdmin),
},
)).Methods(http.MethodPost).GetError(); err != nil {
return err
}
if err := router.Handle("/api/v1/span_mapper_groups/{groupId}", handler.New(
provider.authZ.AdminAccess(provider.spanMapperHandler.UpdateGroup),
handler.OpenAPIDef{
ID: "UpdateSpanMapperGroup",
Tags: []string{"spanmapper"},
Summary: "Update a span attribute mapping group",
Description: "Partially updates an existing mapping group's name, condition, or enabled state.",
Request: new(spantypes.UpdatableSpanMapperGroup),
RequestContentType: "application/json",
SuccessStatusCode: http.StatusNoContent,
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound},
Deprecated: false,
SecuritySchemes: newSecuritySchemes(types.RoleAdmin),
},
)).Methods(http.MethodPatch).GetError(); err != nil {
return err
}
if err := router.Handle("/api/v1/span_mapper_groups/{groupId}", handler.New(
provider.authZ.AdminAccess(provider.spanMapperHandler.DeleteGroup),
handler.OpenAPIDef{
ID: "DeleteSpanMapperGroup",
Tags: []string{"spanmapper"},
Summary: "Delete a span attribute mapping group",
Description: "Hard-deletes a mapping group and cascades to all its mappers.",
Request: nil,
RequestContentType: "",
Response: nil,
ResponseContentType: "",
SuccessStatusCode: http.StatusNoContent,
ErrorStatusCodes: []int{http.StatusNotFound},
Deprecated: false,
SecuritySchemes: newSecuritySchemes(types.RoleAdmin),
},
)).Methods(http.MethodDelete).GetError(); err != nil {
return err
}
if err := router.Handle("/api/v1/span_mapper_groups/{groupId}/span_mappers", handler.New(
provider.authZ.ViewAccess(provider.spanMapperHandler.ListMappers),
handler.OpenAPIDef{
ID: "ListSpanMappers",
Tags: []string{"spanmapper"},
Summary: "List span mappers for a group",
Description: "Returns all mappers belonging to a mapping group.",
Request: nil,
RequestContentType: "",
Response: new(spantypes.GettableSpanMapperGroups),
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusOK,
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound},
Deprecated: false,
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
},
)).Methods(http.MethodGet).GetError(); err != nil {
return err
}
if err := router.Handle("/api/v1/span_mapper_groups/{groupId}/span_mappers", handler.New(
provider.authZ.AdminAccess(provider.spanMapperHandler.CreateMapper),
handler.OpenAPIDef{
ID: "CreateSpanMapper",
Tags: []string{"spanmapper"},
Summary: "Create a span mapper",
Description: "Adds a new mapper to the specified mapping group.",
Request: new(spantypes.PostableSpanMapper),
RequestContentType: "application/json",
Response: new(spantypes.GettableSpanMapper),
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusCreated,
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound, http.StatusConflict},
Deprecated: false,
SecuritySchemes: newSecuritySchemes(types.RoleAdmin),
},
)).Methods(http.MethodPost).GetError(); err != nil {
return err
}
if err := router.Handle("/api/v1/span_mapper_groups/{groupId}/span_mappers/{mapperId}", handler.New(
provider.authZ.AdminAccess(provider.spanMapperHandler.UpdateMapper),
handler.OpenAPIDef{
ID: "UpdateSpanMapper",
Tags: []string{"spanmapper"},
Summary: "Update a span mapper",
Description: "Partially updates an existing mapper's field context, config, or enabled state.",
Request: new(spantypes.UpdatableSpanMapper),
RequestContentType: "application/json",
SuccessStatusCode: http.StatusNoContent,
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound},
Deprecated: false,
SecuritySchemes: newSecuritySchemes(types.RoleAdmin),
},
)).Methods(http.MethodPatch).GetError(); err != nil {
return err
}
if err := router.Handle("/api/v1/span_mapper_groups/{groupId}/span_mappers/{mapperId}", handler.New(
provider.authZ.AdminAccess(provider.spanMapperHandler.DeleteMapper),
handler.OpenAPIDef{
ID: "DeleteSpanMapper",
Tags: []string{"spanmapper"},
Summary: "Delete a span mapper",
Description: "Hard-deletes a mapper from a mapping group.",
Request: nil,
RequestContentType: "",
Response: nil,
ResponseContentType: "",
SuccessStatusCode: http.StatusNoContent,
ErrorStatusCodes: []int{http.StatusNotFound},
Deprecated: false,
SecuritySchemes: newSecuritySchemes(types.RoleAdmin),
},
)).Methods(http.MethodDelete).GetError(); err != nil {
return err
}
return nil
}

View File

@@ -32,17 +32,27 @@ func NewModule(metadataStore telemetrytypes.MetadataStore, telemetrystore teleme
}
func (m *module) ListPromotedAndIndexedPaths(ctx context.Context) ([]promotetypes.PromotePath, error) {
indexes, err := m.metadataStore.ListLogsJSONIndexes(ctx)
logsIndexes, err := m.metadataStore.ListLogsJSONIndexes(ctx)
if err != nil {
return nil, err
}
// Flatten the map values (which are slices) into a single slice
indexes := slices.Concat(slices.Collect(maps.Values(logsIndexes))...)
aggr := map[string][]promotetypes.WrappedIndex{}
for _, index := range indexes {
aggr[index.Name] = append(aggr[index.Name], promotetypes.WrappedIndex{
FieldDataType: index.FieldDataType,
Type: index.IndexType,
Granularity: index.Granularity,
path, columnType, err := schemamigrator.UnfoldJSONSubColumnIndexExpr(index.Expression)
if err != nil {
return nil, err
}
// clean backticks from the path
path = strings.ReplaceAll(path, "`", "")
aggr[path] = append(aggr[path], promotetypes.WrappedIndex{
ColumnType: columnType,
Type: index.Type,
Granularity: index.Granularity,
})
}
promotedPaths, err := m.listPromotedPaths(ctx)

View File

@@ -0,0 +1,85 @@
package implspanmapper
import (
"context"
"encoding/json"
"github.com/SigNoz/signoz/pkg/modules/spanmapper"
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/types/spantypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
const SpanAttrMappingFeatureType agentConf.AgentFeatureType = "span_attr_mapping"
// SpanAttrMappingFeature implements agentConf.AgentFeature. It reads enabled
// mapping groups and mappers from the module and generates the
// signozspanmappingprocessor config for deployment via OpAMP.
type SpanAttrMappingFeature struct {
module spanmapper.Module
}
func NewSpanAttrMappingFeature(module spanmapper.Module) *SpanAttrMappingFeature {
return &SpanAttrMappingFeature{module: module}
}
func (f *SpanAttrMappingFeature) AgentFeatureType() agentConf.AgentFeatureType {
return SpanAttrMappingFeatureType
}
func (f *SpanAttrMappingFeature) RecommendAgentConfig(
orgId valuer.UUID,
currentConfYaml []byte,
configVersion *opamptypes.AgentConfigVersion,
) ([]byte, string, error) {
ctx := context.Background()
groups, err := f.getEnabled(ctx, orgId)
if err != nil {
return nil, "", err
}
updatedConf, err := generateCollectorConfigWithSpanMapping(currentConfYaml, groups)
if err != nil {
return nil, "", err
}
serialized, err := json.Marshal(groups)
if err != nil {
return nil, "", err
}
return updatedConf, string(serialized), nil
}
// getEnabled returns enabled groups alongside their enabled mappers. Groups
// with no enabled mappers are still included so the collector sees the
// exists_any condition, even if the attributes list is empty.
func (f *SpanAttrMappingFeature) getEnabled(ctx context.Context, orgId valuer.UUID) ([]enabledGroup, error) {
if f.module == nil {
return nil, nil
}
enabled := true
groups, err := f.module.ListGroups(ctx, orgId, &spantypes.ListSpanMapperGroupsQuery{Enabled: &enabled})
if err != nil {
return nil, err
}
out := make([]enabledGroup, 0, len(groups))
for _, g := range groups {
mappers, err := f.module.ListMappers(ctx, orgId, g.ID)
if err != nil {
return nil, err
}
enabledMappers := make([]*spantypes.SpanMapper, 0, len(mappers))
for _, m := range mappers {
if m.Enabled {
enabledMappers = append(enabledMappers, m)
}
}
out = append(out, enabledGroup{group: g, mappers: enabledMappers})
}
return out, nil
}

View File

@@ -0,0 +1,137 @@
package implspanmapper
import (
"bytes"
"fmt"
"sort"
"github.com/SigNoz/signoz/pkg/types/spantypes"
"gopkg.in/yaml.v3"
)
const processorName = "signozspanmappingprocessor"
const (
// Collector context values (see signozspanmappingprocessor.ContextAttributes/ContextResource).
ctxAttributes = "attributes"
ctxResource = "resource"
// Collector action values.
actionCopy = "copy"
actionMove = "move"
// Source key prefix the collector treats as "read from resource".
resourcePrefix = "resource."
)
// enabledGroup pairs an enabled group with its enabled mappers.
type enabledGroup struct {
group *spantypes.SpanMapperGroup
mappers []*spantypes.SpanMapper
}
// buildProcessorConfig converts enabled groups + mappers into the
// signozspanmappingprocessor config shape.
func buildProcessorConfig(groups []enabledGroup) *spantypes.SpanMappingProcessorConfig {
out := make([]spantypes.SpanMappingGroup, 0, len(groups))
for _, eg := range groups {
rules := make([]spantypes.SpanMappingAttribute, 0, len(eg.mappers))
for _, m := range eg.mappers {
rules = append(rules, buildAttributeRule(m))
}
out = append(out, spantypes.SpanMappingGroup{
ID: eg.group.Name,
ExistsAny: spantypes.SpanMappingExistsAny{
Attributes: eg.group.Condition.Attributes,
Resource: eg.group.Condition.Resource,
},
Attributes: rules,
})
}
return &spantypes.SpanMappingProcessorConfig{Groups: out}
}
// buildAttributeRule maps a single SpanMapper to a collector AttributeRule.
// Sources are sorted by Priority DESC (highest-priority first), and read-from-
// resource sources are encoded via the "resource." prefix. The rule-level
// action is derived from the sources' operations (all sources within one
// mapper are expected to share the same operation; the highest-priority
// source's operation is used).
func buildAttributeRule(m *spantypes.SpanMapper) spantypes.SpanMappingAttribute {
sources := make([]spantypes.SpanMapperSource, len(m.Config.Sources))
copy(sources, m.Config.Sources)
sort.SliceStable(sources, func(i, j int) bool { return sources[i].Priority > sources[j].Priority })
keys := make([]string, 0, len(sources))
for _, s := range sources {
if s.Context == spantypes.FieldContextResource {
keys = append(keys, resourcePrefix+s.Key)
} else {
keys = append(keys, s.Key)
}
}
action := actionCopy
if len(sources) > 0 && sources[0].Operation == spantypes.SpanMapperOperationMove {
action = actionMove
}
ctx := ctxAttributes
if m.FieldContext == spantypes.FieldContextResource {
ctx = ctxResource
}
return spantypes.SpanMappingAttribute{
Target: m.Name,
Context: ctx,
Action: action,
Sources: keys,
}
}
// generateCollectorConfigWithSpanMapping injects (or replaces) the
// signozspanmappingprocessor block in the collector YAML. Pipeline wiring is
// handled by the collector's baseline config, not here.
func generateCollectorConfigWithSpanMapping(
currentConfYaml []byte,
groups []enabledGroup,
) ([]byte, error) {
// Empty input: nothing to inject into. Pass through unchanged so we don't
// turn it into "null\n" or fail on yaml.v3's EOF.
if len(bytes.TrimSpace(currentConfYaml)) == 0 {
return currentConfYaml, nil
}
var collectorConf map[string]any
if err := yaml.Unmarshal(currentConfYaml, &collectorConf); err != nil {
return nil, fmt.Errorf("failed to unmarshal collector config: %w", err)
}
if collectorConf == nil {
collectorConf = map[string]any{}
}
processors := map[string]any{}
if collectorConf["processors"] != nil {
if p, ok := collectorConf["processors"].(map[string]any); ok {
processors = p
}
}
procConfig := buildProcessorConfig(groups)
configBytes, err := yaml.Marshal(procConfig)
if err != nil {
return nil, fmt.Errorf("failed to marshal span attr mapping processor config: %w", err)
}
var configMap any
if err := yaml.Unmarshal(configBytes, &configMap); err != nil {
return nil, fmt.Errorf("failed to re-unmarshal span attr mapping processor config: %w", err)
}
processors[processorName] = configMap
collectorConf["processors"] = processors
return yaml.Marshal(collectorConf)
}

View File

@@ -0,0 +1,301 @@
package implspanmapper
import (
"context"
"net/http"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/http/binding"
"github.com/SigNoz/signoz/pkg/http/render"
"github.com/SigNoz/signoz/pkg/modules/spanmapper"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/spantypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/gorilla/mux"
)
type handler struct {
module spanmapper.Module
providerSettings factory.ProviderSettings
}
func NewHandler(module spanmapper.Module, providerSettings factory.ProviderSettings) spanmapper.Handler {
return &handler{module: module, providerSettings: providerSettings}
}
// ListGroups handles GET /api/v1/span_mapper_groups.
func (h *handler) ListGroups(rw http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()
claims, err := authtypes.ClaimsFromContext(ctx)
if err != nil {
render.Error(rw, err)
return
}
orgID := valuer.MustNewUUID(claims.OrgID)
var q spantypes.ListSpanMapperGroupsQuery
if err := binding.Query.BindQuery(r.URL.Query(), &q); err != nil {
render.Error(rw, err)
return
}
groups, err := h.module.ListGroups(ctx, orgID, &q)
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusOK, spantypes.NewGettableSpanMapperGroups(groups))
}
// CreateGroup handles POST /api/v1/span_mapper_groups.
func (h *handler) CreateGroup(rw http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()
claims, err := authtypes.ClaimsFromContext(ctx)
if err != nil {
render.Error(rw, err)
return
}
orgID := valuer.MustNewUUID(claims.OrgID)
req := new(spantypes.PostableSpanMapperGroup)
if err := binding.JSON.BindBody(r.Body, req); err != nil {
render.Error(rw, err)
return
}
group := spantypes.NewSpanMapperGroupFromPostable(req)
err = h.module.CreateGroup(ctx, orgID, claims.Email, group)
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusCreated, group)
}
// UpdateGroup handles PUT /api/v1/span_mapper_groups/{id}.
func (h *handler) UpdateGroup(rw http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()
claims, err := authtypes.ClaimsFromContext(ctx)
if err != nil {
render.Error(rw, err)
return
}
orgID := valuer.MustNewUUID(claims.OrgID)
id, err := groupIDFromPath(r)
if err != nil {
render.Error(rw, err)
return
}
req := new(spantypes.UpdatableSpanMapperGroup)
if err := binding.JSON.BindBody(r.Body, req); err != nil {
render.Error(rw, err)
return
}
err = h.module.UpdateGroup(ctx, orgID, id, claims.Email, spantypes.NewSpanMapperGroupFromUpdatable(req))
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusNoContent, nil)
}
// DeleteGroup handles DELETE /api/v1/span_mapper_groups/{id}.
func (h *handler) DeleteGroup(rw http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()
claims, err := authtypes.ClaimsFromContext(ctx)
if err != nil {
render.Error(rw, err)
return
}
orgID := valuer.MustNewUUID(claims.OrgID)
id, err := groupIDFromPath(r)
if err != nil {
render.Error(rw, err)
return
}
if err := h.module.DeleteGroup(ctx, orgID, id); err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusNoContent, nil)
}
// ListMappers handles GET /api/v1/span_mapper_groups/{id}/span_mappers.
func (h *handler) ListMappers(rw http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()
claims, err := authtypes.ClaimsFromContext(ctx)
if err != nil {
render.Error(rw, err)
return
}
orgID := valuer.MustNewUUID(claims.OrgID)
groupID, err := groupIDFromPath(r)
if err != nil {
render.Error(rw, err)
return
}
mappers, err := h.module.ListMappers(ctx, orgID, groupID)
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusOK, spantypes.NewGettableSpanMappers(mappers))
}
// CreateMapper handles POST /api/v1/span_mapper_groups/{id}/span_mappers.
func (h *handler) CreateMapper(rw http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()
claims, err := authtypes.ClaimsFromContext(ctx)
if err != nil {
render.Error(rw, err)
return
}
orgID := valuer.MustNewUUID(claims.OrgID)
groupID, err := groupIDFromPath(r)
if err != nil {
render.Error(rw, err)
return
}
req := new(spantypes.PostableSpanMapper)
if err := binding.JSON.BindBody(r.Body, req); err != nil {
render.Error(rw, err)
return
}
mapper := spantypes.NewSpanMapperFromPostable(req)
err = h.module.CreateMapper(ctx, orgID, groupID, claims.Email, mapper)
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusCreated, mapper)
}
// UpdateMapper handles PUT /api/v1/span_mapper_groups/{groupId}/span_mappers/{mapperId}.
func (h *handler) UpdateMapper(rw http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()
claims, err := authtypes.ClaimsFromContext(ctx)
if err != nil {
render.Error(rw, err)
return
}
orgID := valuer.MustNewUUID(claims.OrgID)
groupID, err := groupIDFromPath(r)
if err != nil {
render.Error(rw, err)
return
}
mapperID, err := mapperIDFromPath(r)
if err != nil {
render.Error(rw, err)
return
}
req := new(spantypes.UpdatableSpanMapper)
if err := binding.JSON.BindBody(r.Body, req); err != nil {
render.Error(rw, err)
return
}
err = h.module.UpdateMapper(ctx, orgID, groupID, mapperID, claims.Email, spantypes.NewSpanMapperFromUpdatable(req))
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusNoContent, nil)
}
// DeleteMapper handles DELETE /api/v1/span_mapper_groups/{groupId}/span_mappers/{mapperId}.
func (h *handler) DeleteMapper(rw http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()
claims, err := authtypes.ClaimsFromContext(ctx)
if err != nil {
render.Error(rw, err)
return
}
orgID := valuer.MustNewUUID(claims.OrgID)
groupID, err := groupIDFromPath(r)
if err != nil {
render.Error(rw, err)
return
}
mapperID, err := mapperIDFromPath(r)
if err != nil {
render.Error(rw, err)
return
}
if err := h.module.DeleteMapper(ctx, orgID, groupID, mapperID); err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusNoContent, nil)
}
// groupIDFromPath extracts and validates the {id} or {groupId} path variable.
func groupIDFromPath(r *http.Request) (valuer.UUID, error) {
vars := mux.Vars(r)
raw := vars["groupId"]
id, err := valuer.NewUUID(raw)
if err != nil {
return valuer.UUID{}, errors.Wrapf(err, errors.TypeInvalidInput, spantypes.ErrCodeMappingInvalidInput, "group id is not a valid uuid")
}
return id, nil
}
// mapperIDFromPath extracts and validates the {mapperId} path variable.
func mapperIDFromPath(r *http.Request) (valuer.UUID, error) {
raw := mux.Vars(r)["mapperId"]
id, err := valuer.NewUUID(raw)
if err != nil {
return valuer.UUID{}, errors.Wrapf(err, errors.TypeInvalidInput, spantypes.ErrCodeMappingInvalidInput, "mapper id is not a valid uuid")
}
return id, nil
}

View File

@@ -0,0 +1,179 @@
package implspanmapper
import (
"context"
"time"
"github.com/SigNoz/signoz/pkg/modules/spanmapper"
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/spantypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
type module struct {
store spantypes.Store
}
func NewModule(store spantypes.Store) spanmapper.Module {
return &module{store: store}
}
func (m *module) ListGroups(ctx context.Context, orgID valuer.UUID, q *spantypes.ListSpanMapperGroupsQuery) ([]*spantypes.SpanMapperGroup, error) {
storables, err := m.store.ListSpanMapperGroups(ctx, orgID, q)
if err != nil {
return nil, err
}
return spantypes.NewSpanMapperGroupsFromStorableGroups(storables), nil
}
func (m *module) GetGroup(ctx context.Context, orgID valuer.UUID, id valuer.UUID) (*spantypes.SpanMapperGroup, error) {
s, err := m.store.GetSpanMapperGroup(ctx, orgID, id)
if err != nil {
return nil, err
}
return spantypes.NewSpanMapperGroupFromStorable(s), nil
}
func (m *module) CreateGroup(ctx context.Context, orgID valuer.UUID, createdBy string, group *spantypes.SpanMapperGroup) error {
now := time.Now()
group.ID = valuer.GenerateUUID()
group.OrgID = orgID
group.CreatedAt = now
group.UpdatedAt = now
group.CreatedBy = createdBy
group.UpdatedBy = createdBy
storable := &spantypes.StorableSpanMapperGroup{
Identifiable: types.Identifiable{ID: group.ID},
TimeAuditable: types.TimeAuditable{CreatedAt: now, UpdatedAt: now},
UserAuditable: types.UserAuditable{CreatedBy: createdBy, UpdatedBy: createdBy},
OrgID: orgID,
Name: group.Name,
Category: group.Category,
Condition: group.Condition,
Enabled: group.Enabled,
}
if err := m.store.CreateSpanMapperGroup(ctx, storable); err != nil {
return err
}
agentConf.NotifyConfigUpdate(ctx)
return nil
}
func (m *module) UpdateGroup(ctx context.Context, orgID valuer.UUID, id valuer.UUID, updatedBy string, group *spantypes.SpanMapperGroup) error {
existing, err := m.store.GetSpanMapperGroup(ctx, orgID, id)
if err != nil {
return err
}
if group.Name != "" {
existing.Name = group.Name
}
if len(group.Condition.Attributes) > 0 || len(group.Condition.Resource) > 0 {
existing.Condition = group.Condition
}
existing.Enabled = group.Enabled
existing.UpdatedAt = time.Now()
existing.UpdatedBy = updatedBy
if err := m.store.UpdateSpanMapperGroup(ctx, existing); err != nil {
return err
}
agentConf.NotifyConfigUpdate(ctx)
return nil
}
func (m *module) DeleteGroup(ctx context.Context, orgID valuer.UUID, id valuer.UUID) error {
if err := m.store.DeleteSpanMapperGroup(ctx, orgID, id); err != nil {
return err
}
agentConf.NotifyConfigUpdate(ctx)
return nil
}
func (m *module) ListMappers(ctx context.Context, orgID valuer.UUID, groupID valuer.UUID) ([]*spantypes.SpanMapper, error) {
storables, err := m.store.ListSpanMappers(ctx, orgID, groupID)
if err != nil {
return nil, err
}
return spantypes.NewSpanMappersFromStorableSpanMappers(storables), nil
}
func (m *module) GetMapper(ctx context.Context, orgID valuer.UUID, groupID valuer.UUID, id valuer.UUID) (*spantypes.SpanMapper, error) {
s, err := m.store.GetSpanMapper(ctx, orgID, groupID, id)
if err != nil {
return nil, err
}
return spantypes.NewSpanMapperFromStorable(s), nil
}
func (m *module) CreateMapper(ctx context.Context, orgID valuer.UUID, groupID valuer.UUID, createdBy string, mapper *spantypes.SpanMapper) error {
// Ensure the group belongs to the org before inserting the child row.
if _, err := m.store.GetSpanMapperGroup(ctx, orgID, groupID); err != nil {
return err
}
now := time.Now()
mapper.ID = valuer.GenerateUUID()
mapper.GroupID = groupID
mapper.CreatedAt = now
mapper.UpdatedAt = now
mapper.CreatedBy = createdBy
mapper.UpdatedBy = createdBy
storable := &spantypes.StorableSpanMapper{
Identifiable: types.Identifiable{ID: mapper.ID},
TimeAuditable: types.TimeAuditable{CreatedAt: now, UpdatedAt: now},
UserAuditable: types.UserAuditable{CreatedBy: createdBy, UpdatedBy: createdBy},
GroupID: groupID,
Name: mapper.Name,
FieldContext: mapper.FieldContext,
Config: mapper.Config,
Enabled: mapper.Enabled,
}
if err := m.store.CreateSpanMapper(ctx, storable); err != nil {
return err
}
agentConf.NotifyConfigUpdate(ctx)
return nil
}
func (m *module) UpdateMapper(ctx context.Context, orgID valuer.UUID, groupID valuer.UUID, id valuer.UUID, updatedBy string, mapper *spantypes.SpanMapper) error {
existing, err := m.store.GetSpanMapper(ctx, orgID, groupID, id)
if err != nil {
return err
}
if mapper.FieldContext != (spantypes.FieldContext{}) {
existing.FieldContext = mapper.FieldContext
}
if mapper.Config.Sources != nil {
existing.Config = mapper.Config
}
existing.Enabled = mapper.Enabled
existing.UpdatedAt = time.Now()
existing.UpdatedBy = updatedBy
if err := m.store.UpdateSpanMapper(ctx, existing); err != nil {
return err
}
agentConf.NotifyConfigUpdate(ctx)
return nil
}
func (m *module) DeleteMapper(ctx context.Context, orgID valuer.UUID, groupID valuer.UUID, id valuer.UUID) error {
if err := m.store.DeleteSpanMapper(ctx, orgID, groupID, id); err != nil {
return err
}
agentConf.NotifyConfigUpdate(ctx)
return nil
}

View File

@@ -0,0 +1,236 @@
package implspanmapper
import (
"context"
"database/sql"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types/spantypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
type store struct {
sqlstore sqlstore.SQLStore
}
func NewStore(sqlstore sqlstore.SQLStore) spantypes.Store {
return &store{sqlstore: sqlstore}
}
func (s *store) ListSpanMapperGroups(ctx context.Context, orgID valuer.UUID, q *spantypes.ListSpanMapperGroupsQuery) ([]*spantypes.StorableSpanMapperGroup, error) {
groups := make([]*spantypes.StorableSpanMapperGroup, 0)
sel := s.sqlstore.
BunDB().
NewSelect().
Model(&groups).
Where("org_id = ?", orgID)
if q != nil {
if q.Category != nil {
sel = sel.Where("category = ?", valuer.String(*q.Category).StringValue())
}
if q.Enabled != nil {
sel = sel.Where("enabled = ?", *q.Enabled)
}
}
if err := sel.Order("created_at DESC").Scan(ctx); err != nil {
return nil, err
}
return groups, nil
}
func (s *store) GetSpanMapperGroup(ctx context.Context, orgID, id valuer.UUID) (*spantypes.StorableSpanMapperGroup, error) {
group := new(spantypes.StorableSpanMapperGroup)
err := s.sqlstore.
BunDB().
NewSelect().
Model(group).
Where("org_id = ?", orgID).
Where("id = ?", id).
Scan(ctx)
if err != nil {
if err == sql.ErrNoRows {
return nil, s.sqlstore.WrapNotFoundErrf(err, spantypes.ErrCodeMappingGroupNotFound, "span mapper group %s not found", id)
}
return nil, err
}
return group, nil
}
func (s *store) CreateSpanMapperGroup(ctx context.Context, group *spantypes.StorableSpanMapperGroup) error {
_, err := s.sqlstore.
BunDBCtx(ctx).
NewInsert().
Model(group).
Exec(ctx)
if err != nil {
return s.sqlstore.WrapAlreadyExistsErrf(err, spantypes.ErrCodeMappingGroupAlreadyExists, "span mapper group %q already exists", group.Name)
}
return nil
}
func (s *store) UpdateSpanMapperGroup(ctx context.Context, group *spantypes.StorableSpanMapperGroup) error {
res, err := s.sqlstore.
BunDBCtx(ctx).
NewUpdate().
Model(group).
Where("org_id = ?", group.OrgID).
Where("id = ?", group.ID).
ExcludeColumn("id", "org_id", "created_at", "created_by").
Exec(ctx)
if err != nil {
return err
}
rowsAffected, err := res.RowsAffected()
if err != nil {
return err
}
if rowsAffected == 0 {
return errors.Newf(errors.TypeNotFound, spantypes.ErrCodeMappingGroupNotFound, "span mapper group %s not found", group.ID)
}
return nil
}
func (s *store) DeleteSpanMapperGroup(ctx context.Context, orgID, id valuer.UUID) error {
tx, err := s.sqlstore.BunDBCtx(ctx).BeginTx(ctx, nil)
if err != nil {
return err
}
defer func() { _ = tx.Rollback() }()
// Cascade: remove mappers belonging to this group first.
if _, err := tx.NewDelete().
Model((*spantypes.StorableSpanMapper)(nil)).
Where("group_id = ?", id).
Exec(ctx); err != nil {
return err
}
res, err := tx.NewDelete().
Model((*spantypes.StorableSpanMapperGroup)(nil)).
Where("org_id = ?", orgID).
Where("id = ?", id).
Exec(ctx)
if err != nil {
return err
}
rowsAffected, err := res.RowsAffected()
if err != nil {
return err
}
if rowsAffected == 0 {
return errors.Newf(errors.TypeNotFound, spantypes.ErrCodeMappingGroupNotFound, "span mapper group %s not found", id)
}
return tx.Commit()
}
func (s *store) ListSpanMappers(ctx context.Context, orgID, groupID valuer.UUID) ([]*spantypes.StorableSpanMapper, error) {
mappers := make([]*spantypes.StorableSpanMapper, 0)
// Scope by org via the parent group's org_id.
if _, err := s.GetSpanMapperGroup(ctx, orgID, groupID); err != nil {
return nil, err
}
if err := s.sqlstore.
BunDB().
NewSelect().
Model(&mappers).
Where("group_id = ?", groupID).
Order("created_at DESC").
Scan(ctx); err != nil {
return nil, err
}
return mappers, nil
}
func (s *store) GetSpanMapper(ctx context.Context, orgID, groupID, id valuer.UUID) (*spantypes.StorableSpanMapper, error) {
// Ensure the group belongs to the org.
if _, err := s.GetSpanMapperGroup(ctx, orgID, groupID); err != nil {
return nil, err
}
mapper := new(spantypes.StorableSpanMapper)
err := s.sqlstore.
BunDB().
NewSelect().
Model(mapper).
Where("group_id = ?", groupID).
Where("id = ?", id).
Scan(ctx)
if err != nil {
if err == sql.ErrNoRows {
return nil, s.sqlstore.WrapNotFoundErrf(err, spantypes.ErrCodeMapperNotFound, "span mapper %s not found", id)
}
return nil, err
}
return mapper, nil
}
func (s *store) CreateSpanMapper(ctx context.Context, mapper *spantypes.StorableSpanMapper) error {
_, err := s.sqlstore.
BunDBCtx(ctx).
NewInsert().
Model(mapper).
Exec(ctx)
if err != nil {
return s.sqlstore.WrapAlreadyExistsErrf(err, spantypes.ErrCodeMapperAlreadyExists, "span mapper %q already exists", mapper.Name)
}
return nil
}
func (s *store) UpdateSpanMapper(ctx context.Context, mapper *spantypes.StorableSpanMapper) error {
res, err := s.sqlstore.
BunDBCtx(ctx).
NewUpdate().
Model(mapper).
Where("group_id = ?", mapper.GroupID).
Where("id = ?", mapper.ID).
ExcludeColumn("id", "group_id", "created_at", "created_by").
Exec(ctx)
if err != nil {
return err
}
rowsAffected, err := res.RowsAffected()
if err != nil {
return err
}
if rowsAffected == 0 {
return errors.Newf(errors.TypeNotFound, spantypes.ErrCodeMapperNotFound, "span mapper %s not found", mapper.ID)
}
return nil
}
func (s *store) DeleteSpanMapper(ctx context.Context, orgID, groupID, id valuer.UUID) error {
if _, err := s.GetSpanMapperGroup(ctx, orgID, groupID); err != nil {
return err
}
res, err := s.sqlstore.
BunDBCtx(ctx).
NewDelete().
Model((*spantypes.StorableSpanMapper)(nil)).
Where("group_id = ?", groupID).
Where("id = ?", id).
Exec(ctx)
if err != nil {
return err
}
rowsAffected, err := res.RowsAffected()
if err != nil {
return err
}
if rowsAffected == 0 {
return errors.Newf(errors.TypeNotFound, spantypes.ErrCodeMapperNotFound, "span mapper %s not found", id)
}
return nil
}

View File

@@ -0,0 +1,41 @@
package spanmapper
import (
"context"
"net/http"
"github.com/SigNoz/signoz/pkg/types/spantypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
// Module defines the business logic for span attribute mapping groups and mappers.
type Module interface {
// Group operations
ListGroups(ctx context.Context, orgID valuer.UUID, q *spantypes.ListSpanMapperGroupsQuery) ([]*spantypes.SpanMapperGroup, error)
GetGroup(ctx context.Context, orgID valuer.UUID, id valuer.UUID) (*spantypes.SpanMapperGroup, error)
CreateGroup(ctx context.Context, orgID valuer.UUID, createdBy string, group *spantypes.SpanMapperGroup) error
UpdateGroup(ctx context.Context, orgID valuer.UUID, id valuer.UUID, updatedBy string, group *spantypes.SpanMapperGroup) error
DeleteGroup(ctx context.Context, orgID valuer.UUID, id valuer.UUID) error
// Mapper operations
ListMappers(ctx context.Context, orgID valuer.UUID, groupID valuer.UUID) ([]*spantypes.SpanMapper, error)
GetMapper(ctx context.Context, orgID valuer.UUID, groupID valuer.UUID, id valuer.UUID) (*spantypes.SpanMapper, error)
CreateMapper(ctx context.Context, orgID valuer.UUID, groupID valuer.UUID, createdBy string, mapper *spantypes.SpanMapper) error
UpdateMapper(ctx context.Context, orgID valuer.UUID, groupID valuer.UUID, id valuer.UUID, updatedBy string, mapper *spantypes.SpanMapper) error
DeleteMapper(ctx context.Context, orgID valuer.UUID, groupID valuer.UUID, id valuer.UUID) error
}
// Handler defines the HTTP handler interface for mapping group and mapper endpoints.
type Handler interface {
// Group handlers
ListGroups(rw http.ResponseWriter, r *http.Request)
CreateGroup(rw http.ResponseWriter, r *http.Request)
UpdateGroup(rw http.ResponseWriter, r *http.Request)
DeleteGroup(rw http.ResponseWriter, r *http.Request)
// Mapper handlers
ListMappers(rw http.ResponseWriter, r *http.Request)
CreateMapper(rw http.ResponseWriter, r *http.Request)
UpdateMapper(rw http.ResponseWriter, r *http.Request)
DeleteMapper(rw http.ResponseWriter, r *http.Request)
}

View File

@@ -9,6 +9,7 @@ import (
"github.com/SigNoz/signoz/pkg/cache/memorycache"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/modules/spanmapper/implspanmapper"
"github.com/SigNoz/signoz/pkg/queryparser"
"github.com/gorilla/handlers"
@@ -129,11 +130,14 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
opAmpModel.Init(signoz.SQLStore, signoz.Instrumentation.Logger(), signoz.Modules.OrgGetter)
spanAttrMappingFeature := implspanmapper.NewSpanAttrMappingFeature(signoz.Modules.SpanMapper)
agentConfMgr, err := agentConf.Initiate(
&agentConf.ManagerOptions{
Store: signoz.SQLStore,
AgentFeatures: []agentConf.AgentFeature{
logParsingPipelineController,
spanAttrMappingFeature,
},
},
)

View File

@@ -204,7 +204,7 @@ func AdjustKey(key *telemetrytypes.TelemetryFieldKey, keys map[string][]*telemet
// Downstream query builder should handle multiple matching keys with their own metadata
// and not rely on this function to do so.
materialized := true
indexes := []telemetrytypes.TelemetryFieldKeySkipIndex{}
indexes := []telemetrytypes.JSONDataTypeIndex{}
fieldContextsSeen := map[telemetrytypes.FieldContext]bool{}
dataTypesSeen := map[telemetrytypes.FieldDataType]bool{}
for _, matchingKey := range matchingKeys {

View File

@@ -36,6 +36,8 @@ import (
"github.com/SigNoz/signoz/pkg/modules/serviceaccount/implserviceaccount"
"github.com/SigNoz/signoz/pkg/modules/services"
"github.com/SigNoz/signoz/pkg/modules/services/implservices"
"github.com/SigNoz/signoz/pkg/modules/spanmapper"
"github.com/SigNoz/signoz/pkg/modules/spanmapper/implspanmapper"
"github.com/SigNoz/signoz/pkg/modules/spanpercentile"
"github.com/SigNoz/signoz/pkg/modules/spanpercentile/implspanpercentile"
"github.com/SigNoz/signoz/pkg/modules/tracedetail"
@@ -71,6 +73,7 @@ type Handlers struct {
RegistryHandler factory.Handler
CloudIntegrationHandler cloudintegration.Handler
RuleStateHistory rulestatehistory.Handler
SpanMapperHandler spanmapper.Handler
AlertmanagerHandler alertmanager.Handler
TraceDetail tracedetail.Handler
RulerHandler ruler.Handler
@@ -114,6 +117,7 @@ func NewHandlers(
RegistryHandler: registryHandler,
RuleStateHistory: implrulestatehistory.NewHandler(modules.RuleStateHistory),
CloudIntegrationHandler: implcloudintegration.NewHandler(modules.CloudIntegration),
SpanMapperHandler: implspanmapper.NewHandler(modules.SpanMapper, providerSettings),
AlertmanagerHandler: signozalertmanager.NewHandler(alertmanagerService),
TraceDetail: impltracedetail.NewHandler(modules.TraceDetail),
RulerHandler: signozruler.NewHandler(rulerService),

View File

@@ -37,6 +37,8 @@ import (
"github.com/SigNoz/signoz/pkg/modules/services/implservices"
"github.com/SigNoz/signoz/pkg/modules/session"
"github.com/SigNoz/signoz/pkg/modules/session/implsession"
"github.com/SigNoz/signoz/pkg/modules/spanmapper"
"github.com/SigNoz/signoz/pkg/modules/spanmapper/implspanmapper"
"github.com/SigNoz/signoz/pkg/modules/spanpercentile"
"github.com/SigNoz/signoz/pkg/modules/spanpercentile/implspanpercentile"
"github.com/SigNoz/signoz/pkg/modules/tracedetail"
@@ -79,6 +81,7 @@ type Modules struct {
CloudIntegration cloudintegration.Module
RuleStateHistory rulestatehistory.Module
TraceDetail tracedetail.Module
SpanMapper spanmapper.Module
}
func NewModules(
@@ -131,5 +134,6 @@ func NewModules(
RuleStateHistory: implrulestatehistory.NewModule(implrulestatehistory.NewStore(telemetryStore, telemetryMetadataStore, providerSettings.Logger)),
CloudIntegration: cloudIntegrationModule,
TraceDetail: impltracedetail.NewModule(impltracedetail.NewTraceStore(telemetryStore), providerSettings, config.TraceDetail),
SpanMapper: implspanmapper.NewModule(implspanmapper.NewStore(sqlstore)),
}
}

View File

@@ -21,8 +21,8 @@ import (
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
"github.com/SigNoz/signoz/pkg/modules/fields"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/inframonitoring"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/preference"
"github.com/SigNoz/signoz/pkg/modules/promote"
@@ -30,6 +30,7 @@ import (
"github.com/SigNoz/signoz/pkg/modules/rulestatehistory"
"github.com/SigNoz/signoz/pkg/modules/serviceaccount"
"github.com/SigNoz/signoz/pkg/modules/session"
"github.com/SigNoz/signoz/pkg/modules/spanmapper"
"github.com/SigNoz/signoz/pkg/modules/tracedetail"
"github.com/SigNoz/signoz/pkg/modules/user"
"github.com/SigNoz/signoz/pkg/querier"
@@ -74,6 +75,7 @@ func NewOpenAPI(ctx context.Context, instrumentation instrumentation.Instrumenta
struct{ factory.Handler }{},
struct{ cloudintegration.Handler }{},
struct{ rulestatehistory.Handler }{},
struct{ spanmapper.Handler }{},
struct{ alertmanager.Handler }{},
struct{ tracedetail.Handler }{},
struct{ ruler.Handler }{},

View File

@@ -195,6 +195,7 @@ func NewSQLMigrationProviderFactories(
sqlmigration.NewServiceAccountAuthzactory(sqlstore),
sqlmigration.NewDropUserDeletedAtFactory(sqlstore, sqlschema),
sqlmigration.NewMigrateAWSAllRegionsFactory(sqlstore),
sqlmigration.NewAddSpanMapperFactory(sqlstore, sqlschema),
)
}
@@ -281,6 +282,7 @@ func NewAPIServerProviderFactories(orgGetter organization.Getter, authz authz.Au
handlers.RegistryHandler,
handlers.CloudIntegrationHandler,
handlers.RuleStateHistory,
handlers.SpanMapperHandler,
handlers.AlertmanagerHandler,
handlers.TraceDetail,
handlers.RulerHandler,

View File

@@ -0,0 +1,119 @@
package sqlmigration
import (
"context"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/sqlschema"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
)
type addSpanMapper struct {
sqlschema sqlschema.SQLSchema
sqlstore sqlstore.SQLStore
}
func NewAddSpanMapperFactory(sqlstore sqlstore.SQLStore, sqlschema sqlschema.SQLSchema) factory.ProviderFactory[SQLMigration, Config] {
return factory.NewProviderFactory(factory.MustNewName("add_span_mapper"), func(_ context.Context, _ factory.ProviderSettings, _ Config) (SQLMigration, error) {
return &addSpanMapper{sqlschema: sqlschema, sqlstore: sqlstore}, nil
})
}
func (migration *addSpanMapper) Register(migrations *migrate.Migrations) error {
return migrations.Register(migration.Up, migration.Down)
}
func (migration *addSpanMapper) Up(ctx context.Context, db *bun.DB) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer func() {
_ = tx.Rollback()
}()
sqls := [][]byte{}
groupSQLs := migration.sqlschema.Operator().CreateTable(&sqlschema.Table{
Name: "span_mapper_group",
Columns: []*sqlschema.Column{
{Name: "id", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "created_at", DataType: sqlschema.DataTypeTimestamp, Nullable: false},
{Name: "updated_at", DataType: sqlschema.DataTypeTimestamp, Nullable: false},
{Name: "created_by", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "updated_by", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "org_id", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "name", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "category", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "condition", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "enabled", DataType: sqlschema.DataTypeBoolean, Nullable: false, Default: "true"},
},
PrimaryKeyConstraint: &sqlschema.PrimaryKeyConstraint{
ColumnNames: []sqlschema.ColumnName{"id"},
},
ForeignKeyConstraints: []*sqlschema.ForeignKeyConstraint{
{
ReferencingColumnName: sqlschema.ColumnName("org_id"),
ReferencedTableName: sqlschema.TableName("organizations"),
ReferencedColumnName: sqlschema.ColumnName("id"),
},
},
})
sqls = append(sqls, groupSQLs...)
groupIdxSQLs := migration.sqlschema.Operator().CreateIndex(
&sqlschema.UniqueIndex{
TableName: "span_mapper_group",
ColumnNames: []sqlschema.ColumnName{"org_id", "name"},
})
sqls = append(sqls, groupIdxSQLs...)
mapperSQLs := migration.sqlschema.Operator().CreateTable(&sqlschema.Table{
Name: "span_mapper",
Columns: []*sqlschema.Column{
{Name: "id", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "created_at", DataType: sqlschema.DataTypeTimestamp, Nullable: false},
{Name: "updated_at", DataType: sqlschema.DataTypeTimestamp, Nullable: false},
{Name: "created_by", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "updated_by", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "group_id", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "name", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "field_context", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "config", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "enabled", DataType: sqlschema.DataTypeBoolean, Nullable: false, Default: "true"},
},
PrimaryKeyConstraint: &sqlschema.PrimaryKeyConstraint{
ColumnNames: []sqlschema.ColumnName{"id"},
},
ForeignKeyConstraints: []*sqlschema.ForeignKeyConstraint{
{
ReferencingColumnName: sqlschema.ColumnName("group_id"),
ReferencedTableName: sqlschema.TableName("span_mapper_group"),
ReferencedColumnName: sqlschema.ColumnName("id"),
},
},
})
sqls = append(sqls, mapperSQLs...)
mapperIdxSQLs := migration.sqlschema.Operator().CreateIndex(
&sqlschema.UniqueIndex{
TableName: "span_mapper",
ColumnNames: []sqlschema.ColumnName{"group_id", "name"},
})
sqls = append(sqls, mapperIdxSQLs...)
for _, sql := range sqls {
if _, err := tx.ExecContext(ctx, string(sql)); err != nil {
return err
}
}
return tx.Commit()
}
func (migration *addSpanMapper) Down(context.Context, *bun.DB) error {
return nil
}

View File

@@ -195,8 +195,8 @@ func (c *jsonConditionBuilder) buildPrimitiveTerminalCondition(node *telemetryty
// the field genuinely holds the empty/zero value.
//
// Note: indexing is also skipped for Array Nested fields because they cannot be indexed.
indexed := slices.ContainsFunc(node.TerminalConfig.Key.Indexes, func(index telemetrytypes.TelemetryFieldKeySkipIndex) bool {
return telemetrytypes.MappingFieldDataTypeToJSONDataType[index.FieldDataType] == node.TerminalConfig.ElemType
indexed := slices.ContainsFunc(node.TerminalConfig.Key.Indexes, func(index telemetrytypes.JSONDataTypeIndex) bool {
return index.Type == node.TerminalConfig.ElemType
})
isExistsCheck := operator == qbtypes.FilterOperatorExists || operator == qbtypes.FilterOperatorNotExists
if node.TerminalConfig.ElemType.IndexSupported && indexed && !isExistsCheck {

View File

@@ -1127,12 +1127,9 @@ func buildTestTelemetryMetadataStore(t *testing.T, addIndexes bool) *telemetryty
return entry.Path == path && entry.Type == jsonType
})
if idx >= 0 {
key.Indexes = append(key.Indexes, telemetrytypes.TelemetryFieldKeySkipIndex{
Name: path,
FieldContext: telemetrytypes.FieldContextBody,
FieldDataType: fdt,
BaseColumn: LogsV2BodyV2Column,
IndexExpression: schemamigrator.JSONSubColumnIndexExpr(LogsV2BodyV2Column, path, jsonType.StringValue()),
key.Indexes = append(key.Indexes, telemetrytypes.JSONDataTypeIndex{
Type: jsonType,
ColumnExpression: schemamigrator.JSONSubColumnIndexExpr(LogsV2BodyV2Column, path, jsonType.StringValue()),
})
}
}

View File

@@ -106,7 +106,7 @@ func (t *telemetryMetaStore) buildJSONPlans(keys []*telemetrytypes.TelemetryFiel
return nil
}
func (t *telemetryMetaStore) getJSONPathIndexes(ctx context.Context, paths ...string) (map[string][]telemetrytypes.TelemetryFieldKeySkipIndex, error) {
func (t *telemetryMetaStore) getJSONPathIndexes(ctx context.Context, paths ...string) (map[string][]telemetrytypes.JSONDataTypeIndex, error) {
filteredPaths := []string{}
for _, path := range paths {
// skip array paths; since they don't have any indexes
@@ -116,22 +116,47 @@ func (t *telemetryMetaStore) getJSONPathIndexes(ctx context.Context, paths ...st
filteredPaths = append(filteredPaths, path)
}
if len(filteredPaths) == 0 {
return make(map[string][]telemetrytypes.TelemetryFieldKeySkipIndex), nil
return make(map[string][]telemetrytypes.JSONDataTypeIndex), nil
}
// list indexes for the paths
indexes, err := t.ListLogsJSONIndexes(ctx, filteredPaths...)
indexesMap, err := t.ListLogsJSONIndexes(ctx, filteredPaths...)
if err != nil {
return nil, errors.WrapInternalf(err, CodeFailLoadLogsJSONIndexes, "failed to list JSON path indexes")
}
// build a set of indexes
fieldPathToIndexes := make(map[string][]telemetrytypes.TelemetryFieldKeySkipIndex)
for _, index := range indexes {
fieldPathToIndexes[index.Name] = append(fieldPathToIndexes[index.Name], index)
cleanIndexes := make(map[string][]telemetrytypes.JSONDataTypeIndex)
for path, indexes := range indexesMap {
for _, index := range indexes {
columnExpr, columnType, err := schemamigrator.UnfoldJSONSubColumnIndexExpr(index.Expression)
if err != nil {
return nil, errors.WrapInternalf(err, CodeFailLoadLogsJSONIndexes, "failed to unfold JSON sub column index expression: %s", index.Expression)
}
jsonDataType, found := telemetrytypes.MappingStringToJSONDataType[columnType]
if !found {
t.logger.ErrorContext(ctx, "failed to map column type to JSON data type", slog.String("column_type", columnType), slog.String("column_expr", columnExpr))
continue
}
if jsonDataType == telemetrytypes.String {
cleanIndexes[path] = append(cleanIndexes[path], telemetrytypes.JSONDataTypeIndex{
Type: telemetrytypes.String,
ColumnExpression: columnExpr,
IndexExpression: index.Expression,
})
} else if strings.HasPrefix(index.Type, "minmax") {
cleanIndexes[path] = append(cleanIndexes[path], telemetrytypes.JSONDataTypeIndex{
Type: jsonDataType,
ColumnExpression: columnExpr,
IndexExpression: index.Expression,
})
}
}
}
return fieldPathToIndexes, nil
return cleanIndexes, nil
}
func buildListLogsJSONIndexesQuery(cluster string, filters ...string) (string, []any) {
@@ -148,15 +173,14 @@ func buildListLogsJSONIndexesQuery(cluster string, filters ...string) (string, [
filterExprs := []string{}
for _, filter := range filters {
// Remove backticks from actual expr cuz paths from metadata doesn't have backticks
filterExprs = append(filterExprs, sb.ILike("replaceAll(expr, '`', '')", fmt.Sprintf("%%%s%%", querybuilder.FormatValueForContains(filter))))
filterExprs = append(filterExprs, sb.ILike("expr", fmt.Sprintf("%%%s%%", querybuilder.FormatValueForContains(filter))))
}
sb.Where(sb.Or(filterExprs...))
return sb.BuildWithFlavor(sqlbuilder.ClickHouse)
}
func (t *telemetryMetaStore) ListLogsJSONIndexes(ctx context.Context, filters ...string) ([]telemetrytypes.TelemetryFieldKeySkipIndex, error) {
func (t *telemetryMetaStore) ListLogsJSONIndexes(ctx context.Context, filters ...string) (map[string][]schemamigrator.Index, error) {
ctx = withTelemetryContext(ctx, "ListLogsJSONIndexes")
query, args := buildListLogsJSONIndexesQuery(t.telemetrystore.Cluster(), filters...)
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
@@ -165,7 +189,7 @@ func (t *telemetryMetaStore) ListLogsJSONIndexes(ctx context.Context, filters ..
}
defer rows.Close()
indexes := []telemetrytypes.TelemetryFieldKeySkipIndex{}
indexes := make(map[string][]schemamigrator.Index)
for rows.Next() {
var name string
var typeFull string
@@ -174,39 +198,11 @@ func (t *telemetryMetaStore) ListLogsJSONIndexes(ctx context.Context, filters ..
if err := rows.Scan(&name, &typeFull, &expr, &granularity); err != nil {
return nil, errors.WrapInternalf(err, CodeFailLoadLogsJSONIndexes, "failed to scan string indexed column")
}
columnExpr, columnType, err := schemamigrator.UnfoldJSONSubColumnIndexExpr(expr)
if err != nil {
return nil, errors.WrapInternalf(err, CodeFailLoadLogsJSONIndexes, "failed to unfold JSON sub column index expression: %s", expr)
}
fdt, found := telemetrytypes.MappingJSONDataTypeToFieldDataType[columnType]
if !found {
t.logger.ErrorContext(ctx, "failed to map JSON data type to field data type", slog.String("column_type", columnType), slog.String("column_expr", columnExpr))
continue
}
baseColumn := ""
fieldName := ""
switch {
case strings.HasPrefix(columnExpr, telemetrylogs.BodyV2ColumnPrefix):
baseColumn = telemetrylogs.BodyV2ColumnPrefix
fieldName = strings.TrimPrefix(columnExpr, telemetrylogs.BodyV2ColumnPrefix)
case strings.HasPrefix(columnExpr, telemetrylogs.BodyPromotedColumnPrefix):
baseColumn = telemetrylogs.BodyPromotedColumnPrefix
fieldName = strings.TrimPrefix(columnExpr, telemetrylogs.BodyPromotedColumnPrefix)
}
fieldName = strings.ReplaceAll(fieldName, "`", "")
indexes = append(indexes, telemetrytypes.TelemetryFieldKeySkipIndex{
Name: fieldName,
FieldContext: telemetrytypes.FieldContextBody,
FieldDataType: fdt,
BaseColumn: baseColumn,
IndexName: name,
IndexType: typeFull,
IndexExpression: expr,
Granularity: int(granularity),
indexes[name] = append(indexes[name], schemamigrator.Index{
Name: name,
Type: typeFull,
Expression: expr,
Granularity: int(granularity),
})
}

View File

@@ -36,7 +36,7 @@ func TestBuildListLogsJSONIndexesQuery(t *testing.T) {
cluster: "test-cluster",
filters: []string{"foo", "bar"},
expectedSQL: "SELECT name, type_full, expr, granularity FROM clusterAllReplicas('test-cluster', system.data_skipping_indices) " +
"WHERE database = ? AND table = ? AND (LOWER(expr) LIKE LOWER(?) OR LOWER(expr) LIKE LOWER(?)) AND (LOWER(replaceAll(expr, '`', '')) LIKE LOWER(?) OR LOWER(replaceAll(expr, '`', '')) LIKE LOWER(?))",
"WHERE database = ? AND table = ? AND (LOWER(expr) LIKE LOWER(?) OR LOWER(expr) LIKE LOWER(?)) AND (LOWER(expr) LIKE LOWER(?) OR LOWER(expr) LIKE LOWER(?))",
expectedArgs: []any{
telemetrylogs.DBName,
telemetrylogs.LogsV2LocalTableName,

View File

@@ -10,10 +10,10 @@ import (
)
type WrappedIndex struct {
JSONDataType telemetrytypes.JSONDataType `json:"-"`
FieldDataType telemetrytypes.FieldDataType `json:"fieldDataType"`
Type string `json:"type"`
Granularity int `json:"granularity"`
JSONDataType telemetrytypes.JSONDataType `json:"-"`
ColumnType string `json:"column_type"`
Type string `json:"type"`
Granularity int `json:"granularity"`
}
type PromotePath struct {
@@ -60,12 +60,12 @@ func (i *PromotePath) ValidateAndSetDefaults() error {
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "index granularity must be greater than 0")
}
jsonDataType, ok := telemetrytypes.MappingFieldDataTypeToJSONDataType[index.FieldDataType]
jsonDataType, ok := telemetrytypes.MappingStringToJSONDataType[index.ColumnType]
if !ok {
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid column type: %s", index.FieldDataType)
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid column type: %s", index.ColumnType)
}
if !jsonDataType.IndexSupported {
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "index is not supported for column type: %s", index.FieldDataType)
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "index is not supported for column type: %s", index.ColumnType)
}
i.Indexes[idx].JSONDataType = jsonDataType

View File

@@ -0,0 +1,137 @@
package spantypes
import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
)
var (
ErrCodeMapperNotFound = errors.MustNewCode("span_attribute_mapper_not_found")
ErrCodeMapperAlreadyExists = errors.MustNewCode("span_attribute_mapper_already_exists")
ErrCodeMappingInvalidInput = errors.MustNewCode("span_attribute_mapping_invalid_input")
)
// FieldContext is where the target attribute is written.
type FieldContext struct {
valuer.String
}
var (
FieldContextSpanAttribute = FieldContext{valuer.NewString("attribute")}
FieldContextResource = FieldContext{valuer.NewString("resource")}
)
// MapperOperation determines whether the source attribute is moved (deleted) or copied.
type SpanMapperOperation struct {
valuer.String
}
var (
SpanMapperOperationMove = SpanMapperOperation{valuer.NewString("move")}
SpanMapperOperationCopy = SpanMapperOperation{valuer.NewString("copy")}
)
// MapperSource describes one candidate source for a target attribute.
type SpanMapperSource struct {
Key string `json:"key" required:"true"`
Context FieldContext `json:"context" required:"true"`
Operation SpanMapperOperation `json:"operation" required:"true"`
Priority int `json:"priority" required:"true"`
}
// MapperConfig holds the mapping logic for a single target attribute.
// It implements driver.Valuer and sql.Scanner for JSON text column storage.
type SpanMapperConfig struct {
Sources []SpanMapperSource `json:"sources" required:"true" nullable:"true"`
}
// SpanMapper is the domain model for a span attribute mapper.
type SpanMapper struct {
types.TimeAuditable
types.UserAuditable
ID valuer.UUID `json:"id" required:"true"`
GroupID valuer.UUID `json:"group_id" required:"true"`
Name string `json:"name" required:"true"`
FieldContext FieldContext `json:"field_context" required:"true"`
Config SpanMapperConfig `json:"config" required:"true"`
Enabled bool `json:"enabled" required:"true"`
}
type PostableSpanMapper struct {
Name string `json:"name" required:"true"`
FieldContext FieldContext `json:"field_context" required:"true"`
Config SpanMapperConfig `json:"config" required:"true"`
Enabled bool `json:"enabled"`
}
// UpdatableSpanMapper is the HTTP request body for updating a span mapper.
// All fields are optional; only non-nil fields are applied.
type UpdatableSpanMapper struct {
FieldContext FieldContext `json:"field_context,omitempty"`
Config *SpanMapperConfig `json:"config,omitempty"`
Enabled *bool `json:"enabled,omitempty"`
}
type GettableSpanMapper = SpanMapper
type GettableSpanMappers struct {
Items []*GettableSpanMapper `json:"items" required:"true" nullable:"false"`
}
func (FieldContext) Enum() []any {
return []any{FieldContextSpanAttribute, FieldContextResource}
}
func (SpanMapperOperation) Enum() []any {
return []any{SpanMapperOperationMove, SpanMapperOperationCopy}
}
func NewSpanMapperFromStorable(s *StorableSpanMapper) *SpanMapper {
return &SpanMapper{
TimeAuditable: s.TimeAuditable,
UserAuditable: s.UserAuditable,
ID: s.ID,
GroupID: s.GroupID,
Name: s.Name,
FieldContext: s.FieldContext,
Config: s.Config,
Enabled: s.Enabled,
}
}
func NewSpanMapperFromPostable(req *PostableSpanMapper) *SpanMapper {
return &SpanMapper{
Name: req.Name,
FieldContext: req.FieldContext,
Config: req.Config,
Enabled: req.Enabled,
}
}
func NewSpanMapperFromUpdatable(req *UpdatableSpanMapper) *SpanMapper {
m := &SpanMapper{}
if req.FieldContext != (FieldContext{}) {
m.FieldContext = req.FieldContext
}
if req.Config != nil {
m.Config = *req.Config
}
if req.Enabled != nil {
m.Enabled = *req.Enabled
}
return m
}
func NewSpanMappersFromStorableSpanMappers(ss []*StorableSpanMapper) []*SpanMapper {
mappers := make([]*SpanMapper, len(ss))
for i, s := range ss {
mappers[i] = NewSpanMapperFromStorable(s)
}
return mappers
}
func NewGettableSpanMappers(m []*SpanMapper) *GettableSpanMappers {
return &GettableSpanMappers{Items: m}
}

View File

@@ -0,0 +1,109 @@
package spantypes
import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
)
var (
ErrCodeMappingGroupNotFound = errors.MustNewCode("span_attribute_mapping_group_not_found")
ErrCodeMappingGroupAlreadyExists = errors.MustNewCode("span_attribute_mapping_group_already_exists")
)
// SpanMapperGroupCategory defaults will be llm, tool, agent but user can configure more as they want.
type SpanMapperGroupCategory valuer.String
// A group runs when any of the listed attribute/resource key patterns match.
type SpanMapperGroupCondition struct {
Attributes []string `json:"attributes" required:"true" nullable:"true"`
Resource []string `json:"resource" required:"true" nullable:"true"`
}
// SpanMapperGroup is the domain model for a span attribute mapping group.
type SpanMapperGroup struct {
types.TimeAuditable
types.UserAuditable
ID valuer.UUID `json:"id" required:"true"`
OrgID valuer.UUID `json:"orgId" required:"true"`
Name string `json:"name" required:"true"`
Category SpanMapperGroupCategory `json:"category" required:"true"`
Condition SpanMapperGroupCondition `json:"condition" required:"true"`
Enabled bool `json:"enabled" required:"true"`
}
// GettableSpanMapperGroup is the HTTP response representation of a mapping group.
type GettableSpanMapperGroup = SpanMapperGroup
type PostableSpanMapperGroup struct {
Name string `json:"name" required:"true"`
Category SpanMapperGroupCategory `json:"category" required:"true"`
Condition SpanMapperGroupCondition `json:"condition" required:"true"`
Enabled bool `json:"enabled"`
}
// UpdatableSpanMapperGroup is the HTTP request body for updating a mapping group.
// All fields are optional; only non-nil fields are applied.
type UpdatableSpanMapperGroup struct {
Name *string `json:"name,omitempty"`
Condition *SpanMapperGroupCondition `json:"condition,omitempty"`
Enabled *bool `json:"enabled,omitempty"`
}
type ListSpanMapperGroupsQuery struct {
Category *SpanMapperGroupCategory `query:"category"`
Enabled *bool `query:"enabled"`
}
type GettableSpanMapperGroups struct {
Items []*GettableSpanMapperGroup `json:"items" required:"true" nullable:"false"`
}
func NewSpanMapperGroupFromStorable(s *StorableSpanMapperGroup) *SpanMapperGroup {
return &SpanMapperGroup{
TimeAuditable: s.TimeAuditable,
UserAuditable: s.UserAuditable,
ID: s.ID,
OrgID: s.OrgID,
Name: s.Name,
Category: s.Category,
Condition: s.Condition,
Enabled: s.Enabled,
}
}
func NewSpanMapperGroupFromPostable(p *PostableSpanMapperGroup) *SpanMapperGroup {
return &SpanMapperGroup{
Name: p.Name,
Category: p.Category,
Condition: p.Condition,
Enabled: p.Enabled,
}
}
func NewSpanMapperGroupFromUpdatable(u *UpdatableSpanMapperGroup) *SpanMapperGroup {
g := &SpanMapperGroup{}
if u.Name != nil {
g.Name = *u.Name
}
if u.Condition != nil {
g.Condition = *u.Condition
}
if u.Enabled != nil {
g.Enabled = *u.Enabled
}
return g
}
func NewSpanMapperGroupsFromStorableGroups(ss []*StorableSpanMapperGroup) []*SpanMapperGroup {
groups := make([]*SpanMapperGroup, len(ss))
for i, s := range ss {
groups[i] = NewSpanMapperGroupFromStorable(s)
}
return groups
}
func NewGettableSpanMapperGroups(g []*SpanMapperGroup) *GettableSpanMapperGroups {
return &GettableSpanMapperGroups{Items: g}
}

View File

@@ -0,0 +1,23 @@
package spantypes
type SpanMappingProcessorConfig struct {
Groups []SpanMappingGroup `yaml:"groups" json:"groups"`
}
type SpanMappingGroup struct {
ID string `yaml:"id" json:"id"`
ExistsAny SpanMappingExistsAny `yaml:"exists_any" json:"exists_any"`
Attributes []SpanMappingAttribute `yaml:"attributes" json:"attributes"`
}
type SpanMappingExistsAny struct {
Attributes []string `yaml:"attributes,omitempty" json:"attributes,omitempty"`
Resource []string `yaml:"resource,omitempty" json:"resource,omitempty"`
}
type SpanMappingAttribute struct {
Target string `yaml:"target" json:"target"`
Context string `yaml:"context,omitempty" json:"context,omitempty"`
Action string `yaml:"action,omitempty" json:"action,omitempty"`
Sources []string `yaml:"sources" json:"sources"`
}

View File

@@ -0,0 +1,87 @@
package spantypes
import (
"database/sql/driver"
"encoding/json"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun"
)
type StorableSpanMapperGroup struct {
bun.BaseModel `bun:"table:span_mapper_group,alias:span_attribute_mapping_group"`
types.Identifiable
types.TimeAuditable
types.UserAuditable
OrgID valuer.UUID `bun:"org_id,type:text,notnull"`
Name string `bun:"name,type:text,notnull"`
Category SpanMapperGroupCategory `bun:"category,type:text,notnull"`
Condition SpanMapperGroupCondition `bun:"condition,type:jsonb,notnull"`
Enabled bool `bun:"enabled,notnull,default:true"`
}
type StorableSpanMapper struct {
bun.BaseModel `bun:"table:span_mapper,alias:span_attribute_mapping"`
types.Identifiable
types.TimeAuditable
types.UserAuditable
GroupID valuer.UUID `bun:"group_id,type:text,notnull"`
Name string `bun:"name,type:text,notnull"`
FieldContext FieldContext `bun:"field_context,type:text,notnull"`
Config SpanMapperConfig `bun:"config,type:jsonb,notnull"`
Enabled bool `bun:"enabled,notnull,default:true"`
}
func (c SpanMapperGroupCondition) Value() (driver.Value, error) {
b, err := json.Marshal(c)
if err != nil {
return nil, err
}
return string(b), nil
}
func (c *SpanMapperGroupCondition) Scan(src any) error {
var raw []byte
switch v := src.(type) {
case string:
raw = []byte(v)
case []byte:
raw = v
case nil:
*c = SpanMapperGroupCondition{}
return nil
default:
return errors.NewInternalf(errors.CodeInternal, "spanmapper: cannot scan %T into Condition", src)
}
return json.Unmarshal(raw, c)
}
func (m SpanMapperConfig) Value() (driver.Value, error) {
b, err := json.Marshal(m)
if err != nil {
return nil, err
}
return string(b), nil
}
func (m *SpanMapperConfig) Scan(src any) error {
var raw []byte
switch v := src.(type) {
case string:
raw = []byte(v)
case []byte:
raw = v
case nil:
*m = SpanMapperConfig{}
return nil
default:
return errors.NewInternalf(errors.CodeInternal, "spanmapper: cannot scan %T into MapperConfig", src)
}
return json.Unmarshal(raw, m)
}

View File

@@ -0,0 +1,23 @@
package spantypes
import (
"context"
"github.com/SigNoz/signoz/pkg/valuer"
)
type Store interface {
// Group operations
ListSpanMapperGroups(ctx context.Context, orgID valuer.UUID, q *ListSpanMapperGroupsQuery) ([]*StorableSpanMapperGroup, error)
GetSpanMapperGroup(ctx context.Context, orgID, id valuer.UUID) (*StorableSpanMapperGroup, error)
CreateSpanMapperGroup(ctx context.Context, group *StorableSpanMapperGroup) error
UpdateSpanMapperGroup(ctx context.Context, group *StorableSpanMapperGroup) error
DeleteSpanMapperGroup(ctx context.Context, orgID, id valuer.UUID) error
// Mapper operations
ListSpanMappers(ctx context.Context, orgID, groupID valuer.UUID) ([]*StorableSpanMapper, error)
GetSpanMapper(ctx context.Context, orgID, groupID, id valuer.UUID) (*StorableSpanMapper, error)
CreateSpanMapper(ctx context.Context, mapper *StorableSpanMapper) error
UpdateSpanMapper(ctx context.Context, mapper *StorableSpanMapper) error
DeleteSpanMapper(ctx context.Context, orgID, groupID, id valuer.UUID) error
}

View File

@@ -37,9 +37,9 @@ type TelemetryFieldKey struct {
FieldContext FieldContext `json:"fieldContext,omitzero"`
FieldDataType FieldDataType `json:"fieldDataType,omitzero"`
JSONPlan JSONAccessPlan `json:"-"`
Indexes []TelemetryFieldKeySkipIndex `json:"-"`
Materialized bool `json:"-"` // refers to promoted in case of body.... fields
JSONPlan JSONAccessPlan `json:"-"`
Indexes []JSONDataTypeIndex `json:"-"`
Materialized bool `json:"-"` // refers to promoted in case of body.... fields
Evolutions []*EvolutionEntry `json:"-"`
}
@@ -102,7 +102,7 @@ func (f TelemetryFieldKey) String() string {
if i > 0 {
sb.WriteString("; ")
}
fmt.Fprintf(&sb, "{type=%s, indexExpr=%s}", MappingFieldDataTypeToJSONDataType[index.FieldDataType].StringValue(), index.IndexExpression)
fmt.Fprintf(&sb, "{type=%s, columnExpr=%s, indexExpr=%s}", index.Type.StringValue(), index.ColumnExpression, index.IndexExpression)
}
sb.WriteString("]")
}
@@ -400,14 +400,3 @@ func NewFieldValueSelectorFromPostableFieldValueParams(params PostableFieldValue
return fieldValueSelector
}
type TelemetryFieldKeySkipIndex struct {
Name string `json:"name"` // Name is TelemetryFieldKey.Name not IndexName from ClickHouse
FieldContext FieldContext `json:"fieldContext,omitzero"`
FieldDataType FieldDataType `json:"fieldDataType,omitzero"`
BaseColumn string `json:"baseColumn"`
IndexName string `json:"indexName"`
IndexType string `json:"indexType"`
IndexExpression string `json:"indexExpression"`
Granularity int `json:"granularity"`
}

View File

@@ -1,5 +1,11 @@
package telemetrytypes
type JSONDataTypeIndex struct {
Type JSONDataType
ColumnExpression string
IndexExpression string
}
type JSONDataType struct {
str string // Store the correct case for ClickHouse
IsArray bool
@@ -26,17 +32,18 @@ var (
ArrayJSON = JSONDataType{"Array(JSON)", true, "JSON", false}
)
var MappingJSONDataTypeToFieldDataType = map[string]FieldDataType{
"String": FieldDataTypeString,
"Int64": FieldDataTypeInt64,
"Float64": FieldDataTypeFloat64,
"Bool": FieldDataTypeBool,
"Array(Nullable(String))": FieldDataTypeArrayString,
"Array(Nullable(Int64))": FieldDataTypeArrayInt64,
"Array(Nullable(Float64))": FieldDataTypeArrayFloat64,
"Array(Nullable(Bool))": FieldDataTypeArrayBool,
"Array(Dynamic)": FieldDataTypeArrayDynamic,
"Array(JSON)": FieldDataTypeArrayJSON,
var MappingStringToJSONDataType = map[string]JSONDataType{
"String": String,
"Int64": Int64,
"Float64": Float64,
"Bool": Bool,
"Dynamic": Dynamic,
"Array(Nullable(String))": ArrayString,
"Array(Nullable(Int64))": ArrayInt64,
"Array(Nullable(Float64))": ArrayFloat64,
"Array(Nullable(Bool))": ArrayBool,
"Array(Dynamic)": ArrayDynamic,
"Array(JSON)": ArrayJSON,
}
var MappingFieldDataTypeToJSONDataType = map[FieldDataType]JSONDataType{

View File

@@ -3,6 +3,7 @@ package telemetrytypes
import (
"context"
schemamigrator "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
)
@@ -34,7 +35,7 @@ type MetadataStore interface {
FetchTemporalityAndTypeMulti(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string]metrictypes.Temporality, map[string]metrictypes.Type, error)
// ListLogsJSONIndexes lists the JSON indexes for the logs table.
ListLogsJSONIndexes(ctx context.Context, filters ...string) ([]TelemetryFieldKeySkipIndex, error)
ListLogsJSONIndexes(ctx context.Context, filters ...string) (map[string][]schemamigrator.Index, error)
// ListPromotedPaths lists the promoted paths.
GetPromotedPaths(ctx context.Context, paths ...string) (map[string]bool, error)

View File

@@ -4,6 +4,7 @@ import (
"context"
"strings"
schemamigrator "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
@@ -17,7 +18,7 @@ type MockMetadataStore struct {
TemporalityMap map[string]metrictypes.Temporality
TypeMap map[string]metrictypes.Type
PromotedPathsMap map[string]bool
LogsJSONIndexes []telemetrytypes.TelemetryFieldKeySkipIndex
LogsJSONIndexesMap map[string][]schemamigrator.Index
ColumnEvolutionMetadataMap map[string][]*telemetrytypes.EvolutionEntry
LookupKeysMap map[telemetrytypes.MetricMetadataLookupKey]int64
// StaticFields holds signal-specific intrinsic field definitions (e.g. telemetrylogs.IntrinsicFields).
@@ -33,7 +34,7 @@ func NewMockMetadataStore() *MockMetadataStore {
TemporalityMap: make(map[string]metrictypes.Temporality),
TypeMap: make(map[string]metrictypes.Type),
PromotedPathsMap: make(map[string]bool),
LogsJSONIndexes: []telemetrytypes.TelemetryFieldKeySkipIndex{},
LogsJSONIndexesMap: make(map[string][]schemamigrator.Index),
ColumnEvolutionMetadataMap: make(map[string][]*telemetrytypes.EvolutionEntry),
LookupKeysMap: make(map[telemetrytypes.MetricMetadataLookupKey]int64),
StaticFields: make(map[string]telemetrytypes.TelemetryFieldKey),
@@ -368,8 +369,8 @@ func (m *MockMetadataStore) GetPromotedPaths(ctx context.Context, paths ...strin
}
// ListLogsJSONIndexes lists the JSON indexes for the logs table.
func (m *MockMetadataStore) ListLogsJSONIndexes(ctx context.Context, filters ...string) ([]telemetrytypes.TelemetryFieldKeySkipIndex, error) {
return m.LogsJSONIndexes, nil
func (m *MockMetadataStore) ListLogsJSONIndexes(ctx context.Context, filters ...string) (map[string][]schemamigrator.Index, error) {
return m.LogsJSONIndexesMap, nil
}
func (m *MockMetadataStore) updateColumnEvolutionMetadataForKeys(_ context.Context, keysToUpdate []*telemetrytypes.TelemetryFieldKey) map[string][]*telemetrytypes.EvolutionEntry {

View File

@@ -23,7 +23,6 @@ pytest_plugins = [
"fixtures.notification_channel",
"fixtures.alerts",
"fixtures.cloudintegrations",
"fixtures.jsontypeexporter",
"fixtures.seeder",
]
@@ -80,6 +79,6 @@ def pytest_addoption(parser: pytest.Parser):
parser.addoption(
"--schema-migrator-version",
action="store",
default="v0.144.3",
default="v0.144.2",
help="schema migrator version",
)

View File

@@ -1,473 +0,0 @@
"""
Simpler version of jsontypeexporter for test fixtures.
This exports JSON type metadata to the path_types table by parsing JSON bodies
and extracting all paths with their types, similar to how the real jsontypeexporter works.
"""
import datetime
import json
from abc import ABC
from http import HTTPStatus
from typing import (
Any,
Callable,
Dict,
Generator,
List,
Optional,
Set,
Union,
)
import numpy as np
import pytest
import requests
from fixtures import types
class JSONPathType(ABC):
"""Represents a JSON path with its type information"""
field_name: str
field_data_type: str
last_seen: np.uint64
signal: str = "logs"
field_context: str = "body"
def __init__(
self,
field_name: str,
field_data_type: str,
last_seen: Optional[datetime.datetime] = None,
) -> None:
self.field_name = field_name
self.field_data_type = field_data_type
self.signal = "logs"
self.field_context = "body"
if last_seen is None:
last_seen = datetime.datetime.now()
self.last_seen = np.uint64(int(last_seen.timestamp() * 1e9))
def np_arr(self) -> np.array:
"""Return path type data as numpy array for database insertion"""
return np.array([self.signal, self.field_context, self.field_name, self.field_data_type, self.last_seen])
# Constants matching jsontypeexporter
ARRAY_SEPARATOR = "[]." # Used in paths like "education[].name"
ARRAY_SUFFIX = "[]" # Used when traversing into array element objects
def _infer_array_type_from_type_strings(types: List[str]) -> Optional[str]:
"""
Infer array type from a list of pre-classified type strings.
Matches metadataexporter's inferArrayMask logic.
Internal type strings are: "JSON", "String", "Bool", "Float64", "Int64"
SuperTyping rules (matching Go inferArrayMask):
- JSON alone → []json
- JSON + any primitive → []dynamic
- String alone → []string; String + other → []dynamic
- Float64 wins over Int64 and Bool
- Int64 wins over Bool
- Bool alone → []bool
"""
if len(types) == 0:
return None
unique = set(types)
has_json = "JSON" in unique
# hasPrimitive mirrors Go: (hasJSON && len(unique) > 1) || (!hasJSON && len(unique) > 0)
has_primitive = (has_json and len(unique) > 1) or (not has_json and len(unique) > 0)
if has_json:
if not has_primitive:
return "[]json"
return "[]dynamic"
# ---- Primitive Type Resolution (Float > Int > Bool) ----
if "String" in unique:
if len(unique) > 1:
return "[]dynamic"
return "[]string"
if "Float64" in unique:
return "[]float64"
if "Int64" in unique:
return "[]int64"
if "Bool" in unique:
return "[]bool"
return "[]dynamic"
def _infer_array_type(elements: List[Any]) -> Optional[str]:
"""
Infer array type from raw Python list elements.
Classifies each element then delegates to _infer_array_type_from_type_strings.
"""
if len(elements) == 0:
return None
types = []
for elem in elements:
if elem is None:
continue
if isinstance(elem, dict):
types.append("JSON")
elif isinstance(elem, str):
types.append("String")
elif isinstance(elem, bool): # must be before int (bool is subclass of int)
types.append("Bool")
elif isinstance(elem, float):
types.append("Float64")
elif isinstance(elem, int):
types.append("Int64")
return _infer_array_type_from_type_strings(types)
def _python_type_to_clickhouse_type(value: Any) -> str:
"""
Convert Python type to ClickHouse JSON type string.
Maps Python types to ClickHouse JSON data types.
Matches metadataexporter's mapPCommonValueTypeToDataType.
"""
if isinstance(value, bool):
return "bool"
elif isinstance(value, int):
return "int64"
elif isinstance(value, float):
return "float64"
elif isinstance(value, str):
return "string"
elif isinstance(value, list):
# Use the sophisticated array type inference
array_type = _infer_array_type(value)
return array_type if array_type else "[]dynamic"
elif isinstance(value, dict):
return "json"
else:
return "string" # Default fallback
def _extract_json_paths(
obj: Any,
current_path: str = "",
path_types: Optional[Dict[str, Set[str]]] = None,
level: int = 0,
) -> Dict[str, Set[str]]:
"""
Recursively extract all paths and their types from a JSON object.
Matches jsontypeexporter's analyzePValue logic.
Args:
obj: The JSON object to traverse
current_path: Current path being built (e.g., "user.name")
path_types: Dictionary mapping paths to sets of types found
level: Current nesting level (for depth limiting)
Returns:
Dictionary mapping paths to sets of type strings
"""
if path_types is None:
path_types = {}
if obj is None:
# Skip null values — matches Go walkNode which errors on ValueTypeEmpty
return path_types
if isinstance(obj, dict):
# For objects, recurse into keys without recording the object itself as a type.
# Matches Go walkMap which recurses without calling ta.record on the map node.
for key, value in obj.items():
# Build the path for this key
if current_path:
new_path = f"{current_path}.{key}"
else:
new_path = key
# Recurse into the value
_extract_json_paths(value, new_path, path_types, level + 1)
elif isinstance(obj, list):
# Skip empty arrays
if len(obj) == 0:
return path_types
# Collect types from array elements (matching Go: types := make([]pcommon.ValueType, 0, s.Len()))
types = []
for item in obj:
if isinstance(item, dict):
# When traversing into array element objects, use ArraySuffix ([])
# This matches: prefix+ArraySuffix in the Go code
# Example: if current_path is "education", we use "education[]" to traverse into objects
array_prefix = current_path + ARRAY_SUFFIX if current_path else ""
for key, value in item.items():
if array_prefix:
# Use array separator: education[].name
array_path = f"{array_prefix}.{key}"
else:
array_path = key
# Recurse without increasing level (matching Go behavior)
_extract_json_paths(value, array_path, path_types, level)
types.append("JSON")
elif isinstance(item, list):
# Arrays inside arrays are not supported - skip the whole path
# Matching Go: e.logger.Error("arrays inside arrays are not supported!", ...); return nil
return path_types
elif isinstance(item, str):
types.append("String")
elif isinstance(item, bool):
types.append("Bool")
elif isinstance(item, float):
types.append("Float64")
elif isinstance(item, int):
types.append("Int64")
# Infer array type from collected types (matching Go: if mask := inferArrayMask(types); mask != 0)
if len(types) > 0:
array_type = _infer_array_type_from_type_strings(types)
if array_type and current_path:
if current_path not in path_types:
path_types[current_path] = set()
path_types[current_path].add(array_type)
else:
# Primitive value (string, number, bool)
if current_path:
if current_path not in path_types:
path_types[current_path] = set()
obj_type = _python_type_to_clickhouse_type(obj)
path_types[current_path].add(obj_type)
return path_types
def _parse_json_bodies_and_extract_paths(
json_bodies: List[str],
timestamp: Optional[datetime.datetime] = None,
) -> List[JSONPathType]:
"""
Parse JSON bodies and extract all paths with their types.
This mimics the behavior of jsontypeexporter.
Args:
json_bodies: List of JSON body strings to parse
timestamp: Timestamp to use for last_seen (defaults to now)
Returns:
List of JSONPathType objects with all discovered paths and types
"""
if timestamp is None:
timestamp = datetime.datetime.now()
# Aggregate all paths and their types across all JSON bodies
all_path_types: Dict[str, Set[str]] = {}
for json_body in json_bodies:
try:
parsed = json.loads(json_body)
_extract_json_paths(parsed, "", all_path_types, level=0)
except (json.JSONDecodeError, TypeError):
# Skip invalid JSON
continue
# Convert to list of JSONPathType objects
# Each path can have multiple types, so we create one JSONPathType per type
path_type_objects: List[JSONPathType] = []
for path, types_set in all_path_types.items():
for type_str in types_set:
path_type_objects.append(
JSONPathType(field_name=path, field_data_type=type_str, last_seen=timestamp)
)
return path_type_objects
@pytest.fixture(name="export_json_types", scope="function")
def export_json_types(
clickhouse: types.TestContainerClickhouse,
request: pytest.FixtureRequest, # To access migrator fixture
) -> Generator[
Callable[[Union[List[JSONPathType], List[str], List[Any]]], None], Any, None
]:
"""
Fixture for exporting JSON type metadata to the path_types table.
This is a simpler version of jsontypeexporter for test fixtures.
The function can accept:
1. List of JSONPathType objects (manual specification)
2. List of JSON body strings (auto-extract paths)
3. List of Logs objects (extract from body_json field)
Usage examples:
# Manual specification
export_json_types([
JSONPathType(field_name="user.name", field_data_type="string"),
JSONPathType(field_name="user.age", field_data_type="int64"),
])
# Auto-extract from JSON strings
export_json_types([
'{"user": {"name": "alice", "age": 25}}',
'{"user": {"name": "bob", "age": 30}}',
])
# Auto-extract from Logs objects
export_json_types(logs_list)
"""
# Ensure migrator has run to create the table
try:
request.getfixturevalue("migrator")
except Exception:
# If migrator fixture is not available, that's okay - table might already exist
pass
def _export_json_types(
data: Union[
List[JSONPathType], List[str], List[Any]
], # List[Logs] but avoiding circular import
) -> None:
"""
Export JSON type metadata to signoz_metadata.distributed_field_keys table.
This table stores signal, context, path, and type information for body JSON fields.
"""
path_types: List[JSONPathType] = []
if len(data) == 0:
return
# Determine input type and convert to JSONPathType list
first_item = data[0]
if isinstance(first_item, JSONPathType):
# Already JSONPathType objects
path_types = data # type: ignore
elif isinstance(first_item, str):
# List of JSON strings - parse and extract paths
path_types = _parse_json_bodies_and_extract_paths(data) # type: ignore
else:
# Assume it's a list of Logs objects - extract body_v2
json_bodies: List[str] = []
for log in data: # type: ignore
# Try to get body_v2 attribute
if hasattr(log, "body_v2") and log.body_v2:
json_bodies.append(log.body_v2)
elif hasattr(log, "body") and log.body:
# Fallback to body if body_v2 not available
try:
# Try to parse as JSON
json.loads(log.body)
json_bodies.append(log.body)
except (json.JSONDecodeError, TypeError):
pass
if json_bodies:
path_types = _parse_json_bodies_and_extract_paths(json_bodies)
if len(path_types) == 0:
return
clickhouse.conn.insert(
database="signoz_metadata",
table="distributed_field_keys",
data=[path_type.np_arr() for path_type in path_types],
column_names=[
"signal",
"field_context",
"field_name",
"field_data_type",
"last_seen",
],
)
yield _export_json_types
# Cleanup - truncate the local table after tests (following pattern from logs fixture)
clickhouse.conn.query(
f"TRUNCATE TABLE signoz_metadata.field_keys ON CLUSTER '{clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER']}' SYNC"
)
@pytest.fixture(name="create_json_index", scope="function")
def create_json_index(
signoz: types.SigNoz,
) -> Generator[Callable[[str, List[Dict[str, Any]]], None], None, None]:
"""
Create ClickHouse data-skipping indexes on body_v2 JSON sub-columns via
POST /api/v1/logs/promote_paths.
**Must be called BEFORE insert_logs** so that newly inserted data parts are
covered by the index and the QB uses the indexed condition path.
Each entry in `paths` follows the PromotePath API shape:
{
"path": "body.user.name", # must start with "body."
"indexes": [
{
"fieldDataType": "string", # string | int64 | float64
"type": "ngrambf_v1(3, 256, 2, 0)", # or "minmax", "tokenbf_v1(...)"
"granularity": 1,
}
],
}
Teardown drops every index created during the test by querying
system.data_skipping_indices for matching expressions.
Example::
def test_foo(signoz, get_token, insert_logs, export_json_types, create_json_body_index):
token = get_token(...)
export_json_types(logs_list)
create_json_body_index(token, [
{"path": "body.user.name",
"indexes": [{"fieldDataType": "string", "type": "ngrambf_v1(3, 256, 2, 0)", "granularity": 1}]},
{"path": "body.user.age",
"indexes": [{"fieldDataType": "int64", "type": "minmax", "granularity": 1}]},
])
insert_logs(logs_list) # data inserted after index exists — index is built automatically
"""
created_paths: List[str] = []
def _create_json_body_index(token: str, paths: List[Dict[str, Any]]) -> None:
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/logs/promote_paths"),
headers={"authorization": f"Bearer {token}"},
json=paths,
timeout=30,
)
assert response.status_code == HTTPStatus.CREATED, (
f"Failed to create JSON body indexes: "
f"{response.status_code} {response.text}"
)
for path in paths:
# The API strips the "body." prefix before storing — mirror that here
# so our cleanup query uses the bare path (e.g. "user.name").
raw = path["path"].removeprefix("body.")
if raw not in created_paths:
created_paths.append(raw)
yield _create_json_body_index
if not created_paths:
return
cluster = signoz.telemetrystore.env["SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_CLUSTER"]
for path in created_paths:
result = signoz.telemetrystore.conn.query(
"SELECT name FROM system.data_skipping_indices "
"WHERE database = 'signoz_logs' AND table = 'logs_v2' "
f"AND expr LIKE '%{path}%'"
)
for (index_name,) in result.result_rows:
signoz.telemetrystore.conn.query(
f"ALTER TABLE signoz_logs.logs_v2 "
f"ON CLUSTER '{cluster}' "
f"DROP INDEX IF EXISTS `{index_name}`"
)

View File

@@ -3,7 +3,7 @@ import json
from abc import ABC
from collections.abc import Callable, Generator
from http import HTTPStatus
from typing import Any, Literal, Optional
from typing import Any, Literal
import numpy as np
import pytest
@@ -121,8 +121,6 @@ class Logs(ABC):
resources: dict[str, Any] = {},
attributes: dict[str, Any] = {},
body: str = "default body",
body_v2: Optional[str] = None,
body_promoted: Optional[str] = None,
severity_text: str = "INFO",
trace_id: str = "",
span_id: str = "",
@@ -168,33 +166,6 @@ class Logs(ABC):
# Set body
self.body = body
# Set body_v2 - if body is JSON, parse and stringify it, otherwise use empty string
# ClickHouse accepts String input for JSON column
if body_v2 is not None:
self.body_v2 = body_v2
else:
# Try to parse body as JSON; if successful use it directly,
# otherwise wrap as {"message": body} matching the normalize operator behavior.
try:
json.loads(body)
self.body_v2 = body
except (json.JSONDecodeError, TypeError):
self.body_v2 = json.dumps({"message": body})
# Set body_promoted - must be valid JSON
# Tests will explicitly pass promoted column's content, but we validate it
if body_promoted is not None:
# Validate that it's valid JSON
try:
json.loads(body_promoted)
self.body_promoted = body_promoted
except (json.JSONDecodeError, TypeError):
# If invalid, default to empty JSON object
self.body_promoted = "{}"
else:
# Default to empty JSON object (valid JSON)
self.body_promoted = "{}"
# Process resources and attributes
self.resources_string = {k: str(v) for k, v in resources.items()}
self.resource_json = {} if resource_write_mode == "legacy_only" else dict(self.resources_string)
@@ -338,8 +309,6 @@ class Logs(ABC):
self.severity_text,
self.severity_number,
self.body,
self.body_v2,
self.body_promoted,
self.attributes_string,
self.attributes_number,
self.attributes_bool,
@@ -467,51 +436,31 @@ def insert_logs_to_clickhouse(conn, logs: list[Logs]) -> None:
data=[resource_key.np_arr() for resource_key in resource_keys],
)
all_column_names = [
"ts_bucket_start",
"resource_fingerprint",
"timestamp",
"observed_timestamp",
"id",
"trace_id",
"span_id",
"trace_flags",
"severity_text",
"severity_number",
"body",
"body_v2",
"body_promoted",
"attributes_string",
"attributes_number",
"attributes_bool",
"resources_string",
"scope_name",
"scope_version",
"scope_string",
"resource",
]
result = conn.query(
"SELECT count() FROM system.columns WHERE database = 'signoz_logs' AND table = 'logs_v2' AND name = 'body_v2'"
)
has_json_body = result.result_rows[0][0] > 0
if has_json_body:
column_names = all_column_names
data = [log.np_arr() for log in logs]
else:
json_body_cols = {"body_v2", "body_promoted"}
keep_indices = [
i for i, c in enumerate(all_column_names) if c not in json_body_cols
]
column_names = [all_column_names[i] for i in keep_indices]
data = [log.np_arr()[keep_indices] for log in logs]
conn.insert(
database="signoz_logs",
table="distributed_logs_v2",
data=data,
column_names=column_names,
data=[log.np_arr() for log in logs],
column_names=[
"ts_bucket_start",
"resource_fingerprint",
"timestamp",
"observed_timestamp",
"id",
"trace_id",
"span_id",
"trace_flags",
"severity_text",
"severity_number",
"body",
"attributes_string",
"attributes_number",
"attributes_bool",
"resources_string",
"scope_name",
"scope_version",
"scope_string",
"resource",
],
)

View File

@@ -1,5 +1,3 @@
from typing import Optional
import docker
import pytest
from testcontainers.core.container import Network
@@ -10,32 +8,27 @@ from fixtures.logger import setup_logger
logger = setup_logger(__name__)
def create_migrator(
@pytest.fixture(name="migrator", scope="package")
def migrator(
network: Network,
clickhouse: types.TestContainerClickhouse,
request: pytest.FixtureRequest,
pytestconfig: pytest.Config,
cache_key: str = "migrator",
env_overrides: Optional[dict] = None,
) -> types.Operation:
"""
Factory function for running schema migrations.
Accepts optional env_overrides to customize the migrator environment.
Package-scoped fixture for running schema migrations.
"""
def create() -> None:
version = request.config.getoption("--schema-migrator-version")
client = docker.from_env()
environment = dict(env_overrides) if env_overrides else {}
container = client.containers.run(
image=f"signoz/signoz-schema-migrator:{version}",
command=f"sync --replication=true --cluster-name=cluster --up= --dsn={clickhouse.env['SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_DSN']}",
detach=True,
auto_remove=False,
network=network.id,
environment=environment,
)
result = container.wait()
@@ -54,7 +47,6 @@ def create_migrator(
detach=True,
auto_remove=False,
network=network.id,
environment=environment,
)
result = container.wait()
@@ -67,7 +59,7 @@ def create_migrator(
container.remove()
return types.Operation(name=cache_key)
return types.Operation(name="migrator")
def delete(_: types.Operation) -> None:
pass
@@ -78,27 +70,9 @@ def create_migrator(
return reuse.wrap(
request,
pytestconfig,
cache_key,
"migrator",
lambda: types.Operation(name=""),
create,
delete,
restore,
)
@pytest.fixture(name="migrator", scope="package")
def migrator(
network: Network,
clickhouse: types.TestContainerClickhouse,
request: pytest.FixtureRequest,
pytestconfig: pytest.Config,
) -> types.Operation:
"""
Package-scoped fixture for running schema migrations.
"""
return create_migrator(
network=network,
clickhouse=clickhouse,
request=request,
pytestconfig=pytestconfig,
)

View File

@@ -1,70 +0,0 @@
import pytest
from testcontainers.core.container import Network
from fixtures import types
from fixtures.migrator import create_migrator
from fixtures.signoz import create_signoz
UNSUPPORTED_CLICKHOUSE_VERSIONS = {"25.5.6"}
def pytest_collection_modifyitems(
config: pytest.Config, items: list[pytest.Item]
) -> None:
version = config.getoption("--clickhouse-version")
if version in UNSUPPORTED_CLICKHOUSE_VERSIONS:
skip = pytest.mark.skip(
reason=f"JSON body QB tests require ClickHouse > {version}"
)
for item in items:
item.add_marker(skip)
@pytest.fixture(name="migrator", scope="package")
def migrator_json(
network: Network,
clickhouse: types.TestContainerClickhouse,
request: pytest.FixtureRequest,
pytestconfig: pytest.Config,
) -> types.Operation:
"""
Package-scoped migrator with ENABLE_LOGS_MIGRATIONS_V2=1.
"""
return create_migrator(
network=network,
clickhouse=clickhouse,
request=request,
pytestconfig=pytestconfig,
cache_key="migrator-json-body",
env_overrides={
"ENABLE_LOGS_MIGRATIONS_V2": "1",
},
)
@pytest.fixture(name="signoz", scope="package")
def signoz_json_body(
network: Network,
zeus: types.TestContainerDocker,
gateway: types.TestContainerDocker,
sqlstore: types.TestContainerSQL,
clickhouse: types.TestContainerClickhouse,
request: pytest.FixtureRequest,
pytestconfig: pytest.Config,
) -> types.SigNoz:
"""
Package-scoped fixture for SigNoz with BODY_JSON_QUERY_ENABLED=true.
"""
return create_signoz(
network=network,
zeus=zeus,
gateway=gateway,
sqlstore=sqlstore,
clickhouse=clickhouse,
request=request,
pytestconfig=pytestconfig,
cache_key="signoz-json-body",
env_overrides={
"BODY_JSON_QUERY_ENABLED": "true",
},
)