mirror of
https://github.com/SigNoz/signoz.git
synced 2026-06-12 11:50:28 +01:00
Compare commits
3 Commits
feat/test_
...
platform-p
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9237e25ef8 | ||
|
|
867d626b27 | ||
|
|
0769ac0e7d |
@@ -7185,39 +7185,6 @@ components:
|
||||
- operation
|
||||
- priority
|
||||
type: object
|
||||
SpantypesSpanMappingPreviewRequest:
|
||||
properties:
|
||||
attributes:
|
||||
$ref: '#/components/schemas/SpantypesSpanMappingPreviewSpan'
|
||||
groupId:
|
||||
nullable: true
|
||||
type: string
|
||||
otlpTraces:
|
||||
additionalProperties: {}
|
||||
nullable: true
|
||||
type: object
|
||||
type: object
|
||||
SpantypesSpanMappingPreviewResponse:
|
||||
properties:
|
||||
attributes:
|
||||
$ref: '#/components/schemas/SpantypesSpanMappingPreviewSpan'
|
||||
otlpTraces:
|
||||
additionalProperties: {}
|
||||
nullable: true
|
||||
type: object
|
||||
type: object
|
||||
SpantypesSpanMappingPreviewSpan:
|
||||
nullable: true
|
||||
properties:
|
||||
resourceAttributes:
|
||||
additionalProperties: {}
|
||||
nullable: true
|
||||
type: object
|
||||
spanAttributes:
|
||||
additionalProperties: {}
|
||||
nullable: true
|
||||
type: object
|
||||
type: object
|
||||
SpantypesUpdatableSpanMapper:
|
||||
properties:
|
||||
config:
|
||||
@@ -12823,17 +12790,11 @@ paths:
|
||||
summary: Update a span mapper
|
||||
tags:
|
||||
- spanmapper
|
||||
/api/v1/span_mapper_groups/preview:
|
||||
post:
|
||||
/api/v1/stats:
|
||||
get:
|
||||
deprecated: false
|
||||
description: Previews how the org's saved attribute mappings would transform
|
||||
a sample span.
|
||||
operationId: PreviewSpanMapping
|
||||
requestBody:
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/SpantypesSpanMappingPreviewRequest'
|
||||
description: This endpoint returns the collected stats for the organization
|
||||
operationId: GetStats
|
||||
responses:
|
||||
"200":
|
||||
content:
|
||||
@@ -12841,7 +12802,8 @@ paths:
|
||||
schema:
|
||||
properties:
|
||||
data:
|
||||
$ref: '#/components/schemas/SpantypesSpanMappingPreviewResponse'
|
||||
additionalProperties: {}
|
||||
type: object
|
||||
status:
|
||||
type: string
|
||||
required:
|
||||
@@ -12849,12 +12811,6 @@ paths:
|
||||
- data
|
||||
type: object
|
||||
description: OK
|
||||
"400":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/RenderErrorResponse'
|
||||
description: Bad Request
|
||||
"401":
|
||||
content:
|
||||
application/json:
|
||||
@@ -12867,12 +12823,6 @@ paths:
|
||||
schema:
|
||||
$ref: '#/components/schemas/RenderErrorResponse'
|
||||
description: Forbidden
|
||||
"404":
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/RenderErrorResponse'
|
||||
description: Not Found
|
||||
"500":
|
||||
content:
|
||||
application/json:
|
||||
@@ -12884,9 +12834,9 @@ paths:
|
||||
- VIEWER
|
||||
- tokenizer:
|
||||
- VIEWER
|
||||
summary: Preview span attribute mapping against a sample span
|
||||
summary: Get stats
|
||||
tags:
|
||||
- spanmapper
|
||||
- stats
|
||||
/api/v1/testChannel:
|
||||
post:
|
||||
deprecated: true
|
||||
|
||||
@@ -8440,83 +8440,6 @@ export interface SpantypesSpanMapperDTO {
|
||||
updatedBy?: string;
|
||||
}
|
||||
|
||||
export type SpantypesSpanMappingPreviewRequestDTOOtlpTracesAnyOf = {
|
||||
[key: string]: unknown;
|
||||
};
|
||||
|
||||
/**
|
||||
* @nullable
|
||||
*/
|
||||
export type SpantypesSpanMappingPreviewRequestDTOOtlpTraces =
|
||||
SpantypesSpanMappingPreviewRequestDTOOtlpTracesAnyOf | null;
|
||||
|
||||
export type SpantypesSpanMappingPreviewSpanDTOAnyOfResourceAttributesAnyOf = {
|
||||
[key: string]: unknown;
|
||||
};
|
||||
|
||||
/**
|
||||
* @nullable
|
||||
*/
|
||||
export type SpantypesSpanMappingPreviewSpanDTOAnyOfResourceAttributes =
|
||||
SpantypesSpanMappingPreviewSpanDTOAnyOfResourceAttributesAnyOf | null;
|
||||
|
||||
export type SpantypesSpanMappingPreviewSpanDTOAnyOfSpanAttributesAnyOf = {
|
||||
[key: string]: unknown;
|
||||
};
|
||||
|
||||
/**
|
||||
* @nullable
|
||||
*/
|
||||
export type SpantypesSpanMappingPreviewSpanDTOAnyOfSpanAttributes =
|
||||
SpantypesSpanMappingPreviewSpanDTOAnyOfSpanAttributesAnyOf | null;
|
||||
|
||||
export type SpantypesSpanMappingPreviewSpanDTOAnyOf = {
|
||||
/**
|
||||
* @type object,null
|
||||
*/
|
||||
resourceAttributes?: SpantypesSpanMappingPreviewSpanDTOAnyOfResourceAttributes;
|
||||
/**
|
||||
* @type object,null
|
||||
*/
|
||||
spanAttributes?: SpantypesSpanMappingPreviewSpanDTOAnyOfSpanAttributes;
|
||||
};
|
||||
|
||||
/**
|
||||
* @nullable
|
||||
*/
|
||||
export type SpantypesSpanMappingPreviewSpanDTO =
|
||||
SpantypesSpanMappingPreviewSpanDTOAnyOf | null;
|
||||
|
||||
export interface SpantypesSpanMappingPreviewRequestDTO {
|
||||
attributes?: SpantypesSpanMappingPreviewSpanDTO | null;
|
||||
/**
|
||||
* @type string,null
|
||||
*/
|
||||
groupId?: string | null;
|
||||
/**
|
||||
* @type object,null
|
||||
*/
|
||||
otlpTraces?: SpantypesSpanMappingPreviewRequestDTOOtlpTraces;
|
||||
}
|
||||
|
||||
export type SpantypesSpanMappingPreviewResponseDTOOtlpTracesAnyOf = {
|
||||
[key: string]: unknown;
|
||||
};
|
||||
|
||||
/**
|
||||
* @nullable
|
||||
*/
|
||||
export type SpantypesSpanMappingPreviewResponseDTOOtlpTraces =
|
||||
SpantypesSpanMappingPreviewResponseDTOOtlpTracesAnyOf | null;
|
||||
|
||||
export interface SpantypesSpanMappingPreviewResponseDTO {
|
||||
attributes?: SpantypesSpanMappingPreviewSpanDTO | null;
|
||||
/**
|
||||
* @type object,null
|
||||
*/
|
||||
otlpTraces?: SpantypesSpanMappingPreviewResponseDTOOtlpTraces;
|
||||
}
|
||||
|
||||
export interface SpantypesUpdatableSpanMapperDTO {
|
||||
config?: SpantypesSpanMapperConfigDTO;
|
||||
/**
|
||||
@@ -9823,8 +9746,13 @@ export type UpdateSpanMapperPathParameters = {
|
||||
groupId: string;
|
||||
mapperId: string;
|
||||
};
|
||||
export type PreviewSpanMapping200 = {
|
||||
data: SpantypesSpanMappingPreviewResponseDTO;
|
||||
export type GetStats200Data = { [key: string]: unknown };
|
||||
|
||||
export type GetStats200 = {
|
||||
/**
|
||||
* @type object
|
||||
*/
|
||||
data: GetStats200Data;
|
||||
/**
|
||||
* @type string
|
||||
*/
|
||||
|
||||
@@ -27,11 +27,9 @@ import type {
|
||||
ListSpanMapperGroupsParams,
|
||||
ListSpanMappers200,
|
||||
ListSpanMappersPathParameters,
|
||||
PreviewSpanMapping200,
|
||||
RenderErrorResponseDTO,
|
||||
SpantypesPostableSpanMapperDTO,
|
||||
SpantypesPostableSpanMapperGroupDTO,
|
||||
SpantypesSpanMappingPreviewRequestDTO,
|
||||
SpantypesUpdatableSpanMapperDTO,
|
||||
SpantypesUpdatableSpanMapperGroupDTO,
|
||||
UpdateSpanMapperGroupPathParameters,
|
||||
@@ -782,86 +780,3 @@ export const useUpdateSpanMapper = <
|
||||
> => {
|
||||
return useMutation(getUpdateSpanMapperMutationOptions(options));
|
||||
};
|
||||
/**
|
||||
* Previews how the org's saved attribute mappings would transform a sample span.
|
||||
* @summary Preview span attribute mapping against a sample span
|
||||
*/
|
||||
export const previewSpanMapping = (
|
||||
spantypesSpanMappingPreviewRequestDTO?: BodyType<SpantypesSpanMappingPreviewRequestDTO>,
|
||||
signal?: AbortSignal,
|
||||
) => {
|
||||
return GeneratedAPIInstance<PreviewSpanMapping200>({
|
||||
url: `/api/v1/span_mapper_groups/preview`,
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
data: spantypesSpanMappingPreviewRequestDTO,
|
||||
signal,
|
||||
});
|
||||
};
|
||||
|
||||
export const getPreviewSpanMappingMutationOptions = <
|
||||
TError = ErrorType<RenderErrorResponseDTO>,
|
||||
TContext = unknown,
|
||||
>(options?: {
|
||||
mutation?: UseMutationOptions<
|
||||
Awaited<ReturnType<typeof previewSpanMapping>>,
|
||||
TError,
|
||||
{ data?: BodyType<SpantypesSpanMappingPreviewRequestDTO> },
|
||||
TContext
|
||||
>;
|
||||
}): UseMutationOptions<
|
||||
Awaited<ReturnType<typeof previewSpanMapping>>,
|
||||
TError,
|
||||
{ data?: BodyType<SpantypesSpanMappingPreviewRequestDTO> },
|
||||
TContext
|
||||
> => {
|
||||
const mutationKey = ['previewSpanMapping'];
|
||||
const { mutation: mutationOptions } = options
|
||||
? options.mutation &&
|
||||
'mutationKey' in options.mutation &&
|
||||
options.mutation.mutationKey
|
||||
? options
|
||||
: { ...options, mutation: { ...options.mutation, mutationKey } }
|
||||
: { mutation: { mutationKey } };
|
||||
|
||||
const mutationFn: MutationFunction<
|
||||
Awaited<ReturnType<typeof previewSpanMapping>>,
|
||||
{ data?: BodyType<SpantypesSpanMappingPreviewRequestDTO> }
|
||||
> = (props) => {
|
||||
const { data } = props ?? {};
|
||||
|
||||
return previewSpanMapping(data);
|
||||
};
|
||||
|
||||
return { mutationFn, ...mutationOptions };
|
||||
};
|
||||
|
||||
export type PreviewSpanMappingMutationResult = NonNullable<
|
||||
Awaited<ReturnType<typeof previewSpanMapping>>
|
||||
>;
|
||||
export type PreviewSpanMappingMutationBody =
|
||||
| BodyType<SpantypesSpanMappingPreviewRequestDTO>
|
||||
| undefined;
|
||||
export type PreviewSpanMappingMutationError = ErrorType<RenderErrorResponseDTO>;
|
||||
|
||||
/**
|
||||
* @summary Preview span attribute mapping against a sample span
|
||||
*/
|
||||
export const usePreviewSpanMapping = <
|
||||
TError = ErrorType<RenderErrorResponseDTO>,
|
||||
TContext = unknown,
|
||||
>(options?: {
|
||||
mutation?: UseMutationOptions<
|
||||
Awaited<ReturnType<typeof previewSpanMapping>>,
|
||||
TError,
|
||||
{ data?: BodyType<SpantypesSpanMappingPreviewRequestDTO> },
|
||||
TContext
|
||||
>;
|
||||
}): UseMutationResult<
|
||||
Awaited<ReturnType<typeof previewSpanMapping>>,
|
||||
TError,
|
||||
{ data?: BodyType<SpantypesSpanMappingPreviewRequestDTO> },
|
||||
TContext
|
||||
> => {
|
||||
return useMutation(getPreviewSpanMappingMutationOptions(options));
|
||||
};
|
||||
|
||||
96
frontend/src/api/generated/services/stats/index.ts
Normal file
96
frontend/src/api/generated/services/stats/index.ts
Normal file
@@ -0,0 +1,96 @@
|
||||
/**
|
||||
* ! Do not edit manually
|
||||
* * The file has been auto-generated using Orval for SigNoz
|
||||
* * regenerate with 'pnpm generate:api'
|
||||
* SigNoz
|
||||
*/
|
||||
import { useQuery } from 'react-query';
|
||||
import type {
|
||||
InvalidateOptions,
|
||||
QueryClient,
|
||||
QueryFunction,
|
||||
QueryKey,
|
||||
UseQueryOptions,
|
||||
UseQueryResult,
|
||||
} from 'react-query';
|
||||
|
||||
import type { GetStats200, RenderErrorResponseDTO } from '../sigNoz.schemas';
|
||||
|
||||
import { GeneratedAPIInstance } from '../../../generatedAPIInstance';
|
||||
import type { ErrorType } from '../../../generatedAPIInstance';
|
||||
|
||||
/**
|
||||
* This endpoint returns the collected stats for the organization
|
||||
* @summary Get stats
|
||||
*/
|
||||
export const getStats = (signal?: AbortSignal) => {
|
||||
return GeneratedAPIInstance<GetStats200>({
|
||||
url: `/api/v1/stats`,
|
||||
method: 'GET',
|
||||
signal,
|
||||
});
|
||||
};
|
||||
|
||||
export const getGetStatsQueryKey = () => {
|
||||
return [`/api/v1/stats`] as const;
|
||||
};
|
||||
|
||||
export const getGetStatsQueryOptions = <
|
||||
TData = Awaited<ReturnType<typeof getStats>>,
|
||||
TError = ErrorType<RenderErrorResponseDTO>,
|
||||
>(options?: {
|
||||
query?: UseQueryOptions<Awaited<ReturnType<typeof getStats>>, TError, TData>;
|
||||
}) => {
|
||||
const { query: queryOptions } = options ?? {};
|
||||
|
||||
const queryKey = queryOptions?.queryKey ?? getGetStatsQueryKey();
|
||||
|
||||
const queryFn: QueryFunction<Awaited<ReturnType<typeof getStats>>> = ({
|
||||
signal,
|
||||
}) => getStats(signal);
|
||||
|
||||
return { queryKey, queryFn, ...queryOptions } as UseQueryOptions<
|
||||
Awaited<ReturnType<typeof getStats>>,
|
||||
TError,
|
||||
TData
|
||||
> & { queryKey: QueryKey };
|
||||
};
|
||||
|
||||
export type GetStatsQueryResult = NonNullable<
|
||||
Awaited<ReturnType<typeof getStats>>
|
||||
>;
|
||||
export type GetStatsQueryError = ErrorType<RenderErrorResponseDTO>;
|
||||
|
||||
/**
|
||||
* @summary Get stats
|
||||
*/
|
||||
|
||||
export function useGetStats<
|
||||
TData = Awaited<ReturnType<typeof getStats>>,
|
||||
TError = ErrorType<RenderErrorResponseDTO>,
|
||||
>(options?: {
|
||||
query?: UseQueryOptions<Awaited<ReturnType<typeof getStats>>, TError, TData>;
|
||||
}): UseQueryResult<TData, TError> & { queryKey: QueryKey } {
|
||||
const queryOptions = getGetStatsQueryOptions(options);
|
||||
|
||||
const query = useQuery(queryOptions) as UseQueryResult<TData, TError> & {
|
||||
queryKey: QueryKey;
|
||||
};
|
||||
|
||||
return { ...query, queryKey: queryOptions.queryKey };
|
||||
}
|
||||
|
||||
/**
|
||||
* @summary Get stats
|
||||
*/
|
||||
export const invalidateGetStats = async (
|
||||
queryClient: QueryClient,
|
||||
options?: InvalidateOptions,
|
||||
): Promise<QueryClient> => {
|
||||
await queryClient.invalidateQueries(
|
||||
{ queryKey: getGetStatsQueryKey() },
|
||||
options,
|
||||
);
|
||||
|
||||
return queryClient;
|
||||
};
|
||||
@@ -31,6 +31,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/modules/user"
|
||||
"github.com/SigNoz/signoz/pkg/querier"
|
||||
"github.com/SigNoz/signoz/pkg/ruler"
|
||||
"github.com/SigNoz/signoz/pkg/statsreporter"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
"github.com/SigNoz/signoz/pkg/zeus"
|
||||
@@ -70,6 +71,7 @@ type provider struct {
|
||||
traceDetailHandler tracedetail.Handler
|
||||
rulerHandler ruler.Handler
|
||||
llmPricingRuleHandler llmpricingrule.Handler
|
||||
statsHandler statsreporter.Handler
|
||||
}
|
||||
|
||||
func NewFactory(
|
||||
@@ -102,6 +104,7 @@ func NewFactory(
|
||||
llmPricingRuleHandler llmpricingrule.Handler,
|
||||
traceDetailHandler tracedetail.Handler,
|
||||
rulerHandler ruler.Handler,
|
||||
statsHandler statsreporter.Handler,
|
||||
) factory.ProviderFactory[apiserver.APIServer, apiserver.Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("signoz"), func(ctx context.Context, providerSettings factory.ProviderSettings, config apiserver.Config) (apiserver.APIServer, error) {
|
||||
return newProvider(
|
||||
@@ -137,6 +140,7 @@ func NewFactory(
|
||||
llmPricingRuleHandler,
|
||||
traceDetailHandler,
|
||||
rulerHandler,
|
||||
statsHandler,
|
||||
)
|
||||
})
|
||||
}
|
||||
@@ -174,6 +178,7 @@ func newProvider(
|
||||
llmPricingRuleHandler llmpricingrule.Handler,
|
||||
traceDetailHandler tracedetail.Handler,
|
||||
rulerHandler ruler.Handler,
|
||||
statsHandler statsreporter.Handler,
|
||||
) (apiserver.APIServer, error) {
|
||||
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/apiserver/signozapiserver")
|
||||
router := mux.NewRouter().UseEncodedPath()
|
||||
@@ -210,6 +215,7 @@ func newProvider(
|
||||
traceDetailHandler: traceDetailHandler,
|
||||
rulerHandler: rulerHandler,
|
||||
llmPricingRuleHandler: llmPricingRuleHandler,
|
||||
statsHandler: statsHandler,
|
||||
}
|
||||
|
||||
provider.authzMiddleware = middleware.NewAuthZ(settings.Logger(), orgGetter, authzService)
|
||||
@@ -334,6 +340,10 @@ func (provider *provider) AddToRouter(router *mux.Router) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := provider.addStatsReporterRoutes(router); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -51,26 +51,6 @@ func (provider *provider) addSpanMapperRoutes(router *mux.Router) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := router.Handle("/api/v1/span_mapper_groups/preview", handler.New(
|
||||
provider.authzMiddleware.ViewAccess(provider.spanMapperHandler.PreviewMapping),
|
||||
handler.OpenAPIDef{
|
||||
ID: "PreviewSpanMapping",
|
||||
Tags: []string{"spanmapper"},
|
||||
Summary: "Preview span attribute mapping against a sample span",
|
||||
Description: "Previews how the org's saved attribute mappings would transform a sample span.",
|
||||
Request: new(spantypes.SpanMappingPreviewRequest),
|
||||
RequestContentType: "application/json",
|
||||
Response: new(spantypes.SpanMappingPreviewResponse),
|
||||
ResponseContentType: "application/json",
|
||||
SuccessStatusCode: http.StatusOK,
|
||||
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound},
|
||||
Deprecated: false,
|
||||
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
|
||||
},
|
||||
)).Methods(http.MethodPost).GetError(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := router.Handle("/api/v1/span_mapper_groups/{groupId}", handler.New(
|
||||
provider.authzMiddleware.AdminAccess(provider.spanMapperHandler.UpdateGroup),
|
||||
handler.OpenAPIDef{
|
||||
|
||||
33
pkg/apiserver/signozapiserver/statsreporter.go
Normal file
33
pkg/apiserver/signozapiserver/statsreporter.go
Normal file
@@ -0,0 +1,33 @@
|
||||
package signozapiserver
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/http/handler"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
func (provider *provider) addStatsReporterRoutes(router *mux.Router) error {
|
||||
if err := router.Handle("/api/v1/stats", handler.New(
|
||||
provider.authzMiddleware.ViewAccess(provider.statsHandler.Get),
|
||||
handler.OpenAPIDef{
|
||||
ID: "GetStats",
|
||||
Tags: []string{"stats"},
|
||||
Summary: "Get stats",
|
||||
Description: "This endpoint returns the collected stats for the organization",
|
||||
Request: nil,
|
||||
RequestContentType: "",
|
||||
Response: map[string]any{},
|
||||
ResponseContentType: "application/json",
|
||||
SuccessStatusCode: http.StatusOK,
|
||||
ErrorStatusCodes: []int{},
|
||||
Deprecated: false,
|
||||
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
|
||||
},
|
||||
)).Methods(http.MethodGet).GetError(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -273,35 +273,6 @@ func (h *handler) DeleteMapper(rw http.ResponseWriter, r *http.Request) {
|
||||
render.Success(rw, http.StatusNoContent, nil)
|
||||
}
|
||||
|
||||
// PreviewMapping handles POST /api/v1/span_mapper_groups/preview.
|
||||
// used to get preview of attributes after remapping.
|
||||
func (h *handler) PreviewMapping(rw http.ResponseWriter, r *http.Request) {
|
||||
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
claims, err := authtypes.ClaimsFromContext(ctx)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
orgID := valuer.MustNewUUID(claims.OrgID)
|
||||
|
||||
req := new(spantypes.SpanMappingPreviewRequest)
|
||||
if err := binding.JSON.BindBody(r.Body, req); err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
result, err := h.module.PreviewMapping(ctx, orgID, req)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
render.Success(rw, http.StatusOK, result)
|
||||
}
|
||||
|
||||
// groupIDFromPath extracts and validates the {id} or {groupId} path variable.
|
||||
func groupIDFromPath(r *http.Request) (valuer.UUID, error) {
|
||||
vars := mux.Vars(r)
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/modules/spanmapper"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
|
||||
"github.com/SigNoz/signoz/pkg/types/opamptypes"
|
||||
@@ -103,64 +102,6 @@ func (module *module) DeleteMapper(ctx context.Context, orgID, groupID, id value
|
||||
return nil
|
||||
}
|
||||
|
||||
// PreviewMapping resolves the org's saved mappings for a sample input
|
||||
// and returns the transformed result.
|
||||
func (module *module) PreviewMapping(ctx context.Context, orgID valuer.UUID, req *spantypes.SpanMappingPreviewRequest) (*spantypes.SpanMappingPreviewResponse, error) {
|
||||
groups, err := module.resolvePreviewGroups(ctx, orgID, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
hasAttrs := req.Attributes != nil
|
||||
hasOTLP := len(req.OtlpTraces) > 0
|
||||
if hasAttrs == hasOTLP {
|
||||
return nil, errors.New(errors.TypeInvalidInput, spantypes.ErrCodeMappingInvalidInput, "exactly one of 'attributes' or 'otlpTraces' must be provided")
|
||||
}
|
||||
|
||||
if hasAttrs {
|
||||
outResource, outSpan := spantypes.SimulateSpanMapping(groups, req.Attributes.ResourceAttributes, req.Attributes.SpanAttributes)
|
||||
return &spantypes.SpanMappingPreviewResponse{
|
||||
Attributes: &spantypes.SpanMappingPreviewSpan{ResourceAttributes: outResource, SpanAttributes: outSpan},
|
||||
}, nil
|
||||
}
|
||||
|
||||
in, err := json.Marshal(req.OtlpTraces)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInvalidInput, spantypes.ErrCodeMappingInvalidInput, "could not serialize otlpTraces payload")
|
||||
}
|
||||
out, err := spantypes.SimulateSpanMappingOTLP(groups, in)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var transformed map[string]any
|
||||
if err := json.Unmarshal(out, &transformed); err != nil {
|
||||
return nil, errors.WrapInternalf(err, spantypes.ErrCodeMappingPreviewFailed, "could not deserialize transformed traces")
|
||||
}
|
||||
return &spantypes.SpanMappingPreviewResponse{OtlpTraces: transformed}, nil
|
||||
}
|
||||
|
||||
// resolvePreviewGroups resolves the config to preview against a specific saved
|
||||
// group when GroupID is set, otherwise all the org's enabled saved mappings.
|
||||
func (module *module) resolvePreviewGroups(ctx context.Context, orgID valuer.UUID, req *spantypes.SpanMappingPreviewRequest) ([]*spantypes.SpanMapperGroupWithMappers, error) {
|
||||
if req.GroupID != nil && *req.GroupID != "" {
|
||||
id, err := valuer.NewUUID(*req.GroupID)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInvalidInput, spantypes.ErrCodeMappingInvalidInput, "group id is not a valid uuid")
|
||||
}
|
||||
group, err := module.store.GetGroup(ctx, orgID, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
mappers, err := module.store.ListMappers(ctx, orgID, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return []*spantypes.SpanMapperGroupWithMappers{{Group: group, Mappers: mappers}}, nil
|
||||
}
|
||||
|
||||
return module.listEnabledGroupsWithMappers(ctx, orgID)
|
||||
}
|
||||
|
||||
func (module *module) AgentFeatureType() agentConf.AgentFeatureType {
|
||||
return spantypes.SpanAttrMappingFeatureType
|
||||
}
|
||||
|
||||
@@ -27,7 +27,6 @@ type Module interface {
|
||||
CreateMapper(ctx context.Context, orgID, groupID valuer.UUID, mapper *spantypes.SpanMapper) error
|
||||
UpdateMapper(ctx context.Context, orgID, groupID, id valuer.UUID, fieldContext spantypes.FieldContext, config *spantypes.SpanMapperConfig, enabled *bool, updatedBy string) error
|
||||
DeleteMapper(ctx context.Context, orgID, groupID, id valuer.UUID) error
|
||||
PreviewMapping(ctx context.Context, orgID valuer.UUID, req *spantypes.SpanMappingPreviewRequest) (*spantypes.SpanMappingPreviewResponse, error)
|
||||
}
|
||||
|
||||
// Handler defines the HTTP handler interface for mapping group and mapper endpoints.
|
||||
@@ -43,5 +42,4 @@ type Handler interface {
|
||||
CreateMapper(rw http.ResponseWriter, r *http.Request)
|
||||
UpdateMapper(rw http.ResponseWriter, r *http.Request)
|
||||
DeleteMapper(rw http.ResponseWriter, r *http.Request)
|
||||
PreviewMapping(rw http.ResponseWriter, r *http.Request)
|
||||
}
|
||||
|
||||
78
pkg/querier/collect.go
Normal file
78
pkg/querier/collect.go
Normal file
@@ -0,0 +1,78 @@
|
||||
package querier
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrylogs"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrytraces"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
// Collect implements statsreporter.StatsCollector. It returns telemetry counts and last-observed
|
||||
// times for traces, logs and metrics. orgID is unused because the underlying tables are
|
||||
// cluster-global. Collection is best-effort: a failing query is logged and its key omitted rather
|
||||
// than failing the whole collection.
|
||||
func (q *querier) Collect(ctx context.Context, _ valuer.UUID) (map[string]any, error) {
|
||||
stats := make(map[string]any)
|
||||
|
||||
tracesTable := fmt.Sprintf("%s.%s", telemetrytraces.DBName, telemetrytraces.SpanIndexV3TableName)
|
||||
logsTable := fmt.Sprintf("%s.%s", telemetrylogs.DBName, telemetrylogs.LogsV2TableName)
|
||||
metricsTable := fmt.Sprintf("%s.%s", telemetrymetrics.DBName, telemetrymetrics.SamplesV4TableName)
|
||||
|
||||
var traces uint64
|
||||
if err := q.telemetryStore.ClickhouseDB().QueryRow(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", tracesTable)).Scan(&traces); err == nil {
|
||||
stats["telemetry.traces.count"] = traces
|
||||
} else {
|
||||
q.logger.DebugContext(ctx, "failed to collect traces count", errors.Attr(err))
|
||||
}
|
||||
|
||||
var logs uint64
|
||||
if err := q.telemetryStore.ClickhouseDB().QueryRow(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", logsTable)).Scan(&logs); err == nil {
|
||||
stats["telemetry.logs.count"] = logs
|
||||
} else {
|
||||
q.logger.DebugContext(ctx, "failed to collect logs count", errors.Attr(err))
|
||||
}
|
||||
|
||||
var metrics uint64
|
||||
if err := q.telemetryStore.ClickhouseDB().QueryRow(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", metricsTable)).Scan(&metrics); err == nil {
|
||||
stats["telemetry.metrics.count"] = metrics
|
||||
} else {
|
||||
q.logger.DebugContext(ctx, "failed to collect metrics count", errors.Attr(err))
|
||||
}
|
||||
|
||||
var tracesLastSeenAt time.Time
|
||||
if err := q.telemetryStore.ClickhouseDB().QueryRow(ctx, fmt.Sprintf("SELECT max(timestamp) FROM %s", tracesTable)).Scan(&tracesLastSeenAt); err == nil {
|
||||
if tracesLastSeenAt.Unix() != 0 {
|
||||
stats["telemetry.traces.last_observed.time"] = tracesLastSeenAt.UTC()
|
||||
stats["telemetry.traces.last_observed.time_unix"] = tracesLastSeenAt.Unix()
|
||||
}
|
||||
} else {
|
||||
q.logger.DebugContext(ctx, "failed to collect traces last observed", errors.Attr(err))
|
||||
}
|
||||
|
||||
var logsLastSeenAt time.Time
|
||||
if err := q.telemetryStore.ClickhouseDB().QueryRow(ctx, fmt.Sprintf("SELECT fromUnixTimestamp64Nano(max(timestamp)) FROM %s", logsTable)).Scan(&logsLastSeenAt); err == nil {
|
||||
if logsLastSeenAt.Unix() != 0 {
|
||||
stats["telemetry.logs.last_observed.time"] = logsLastSeenAt.UTC()
|
||||
stats["telemetry.logs.last_observed.time_unix"] = logsLastSeenAt.Unix()
|
||||
}
|
||||
} else {
|
||||
q.logger.DebugContext(ctx, "failed to collect logs last observed", errors.Attr(err))
|
||||
}
|
||||
|
||||
var metricsLastSeenAt time.Time
|
||||
if err := q.telemetryStore.ClickhouseDB().QueryRow(ctx, fmt.Sprintf("SELECT toDateTime(max(unix_milli) / 1000) FROM %s", metricsTable)).Scan(&metricsLastSeenAt); err == nil {
|
||||
if metricsLastSeenAt.Unix() != 0 {
|
||||
stats["telemetry.metrics.last_observed.time"] = metricsLastSeenAt.UTC()
|
||||
stats["telemetry.metrics.last_observed.time_unix"] = metricsLastSeenAt.Unix()
|
||||
}
|
||||
} else {
|
||||
q.logger.DebugContext(ctx, "failed to collect metrics last observed", errors.Attr(err))
|
||||
}
|
||||
|
||||
return stats, nil
|
||||
}
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/statsreporter"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
@@ -12,6 +13,7 @@ import (
|
||||
type Querier interface {
|
||||
QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtypes.QueryRangeRequest) (*qbtypes.QueryRangeResponse, error)
|
||||
QueryRawStream(ctx context.Context, orgID valuer.UUID, req *qbtypes.QueryRangeRequest, client *qbtypes.RawStream)
|
||||
statsreporter.StatsCollector
|
||||
}
|
||||
|
||||
// BucketCache is the interface for bucket-based caching.
|
||||
|
||||
@@ -49,6 +49,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/querier"
|
||||
"github.com/SigNoz/signoz/pkg/ruler"
|
||||
"github.com/SigNoz/signoz/pkg/ruler/signozruler"
|
||||
"github.com/SigNoz/signoz/pkg/statsreporter"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/SigNoz/signoz/pkg/zeus"
|
||||
)
|
||||
@@ -80,6 +81,7 @@ type Handlers struct {
|
||||
TraceDetail tracedetail.Handler
|
||||
RulerHandler ruler.Handler
|
||||
LLMPricingRuleHandler llmpricingrule.Handler
|
||||
StatsHandler statsreporter.Handler
|
||||
}
|
||||
|
||||
func NewHandlers(
|
||||
@@ -97,6 +99,7 @@ func NewHandlers(
|
||||
registryHandler factory.Handler,
|
||||
alertmanagerService alertmanager.Alertmanager,
|
||||
rulerService ruler.Ruler,
|
||||
statsAggregator statsreporter.Aggregator,
|
||||
) Handlers {
|
||||
return Handlers{
|
||||
SavedView: implsavedview.NewHandler(modules.SavedView),
|
||||
@@ -125,5 +128,6 @@ func NewHandlers(
|
||||
TraceDetail: impltracedetail.NewHandler(modules.TraceDetail),
|
||||
RulerHandler: signozruler.NewHandler(rulerService),
|
||||
LLMPricingRuleHandler: impllmpricingrule.NewHandler(modules.LLMPricingRule),
|
||||
StatsHandler: statsreporter.NewHandler(statsAggregator),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -63,7 +63,7 @@ func TestNewHandlers(t *testing.T) {
|
||||
|
||||
querierHandler := querier.NewHandler(providerSettings, nil, nil)
|
||||
registryHandler := factory.NewHandler(nil)
|
||||
handlers := NewHandlers(modules, providerSettings, nil, querierHandler, nil, nil, nil, nil, nil, nil, nil, registryHandler, alertmanager, nil)
|
||||
handlers := NewHandlers(modules, providerSettings, nil, querierHandler, nil, nil, nil, nil, nil, nil, nil, registryHandler, alertmanager, nil, nil)
|
||||
reflectVal := reflect.ValueOf(handlers)
|
||||
for i := 0; i < reflectVal.NumField(); i++ {
|
||||
f := reflectVal.Field(i)
|
||||
|
||||
@@ -36,6 +36,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/modules/user"
|
||||
"github.com/SigNoz/signoz/pkg/querier"
|
||||
"github.com/SigNoz/signoz/pkg/ruler"
|
||||
"github.com/SigNoz/signoz/pkg/statsreporter"
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
"github.com/SigNoz/signoz/pkg/zeus"
|
||||
"github.com/swaggest/jsonschema-go"
|
||||
@@ -83,6 +84,7 @@ func NewOpenAPI(ctx context.Context, instrumentation instrumentation.Instrumenta
|
||||
struct{ llmpricingrule.Handler }{},
|
||||
struct{ tracedetail.Handler }{},
|
||||
struct{ ruler.Handler }{},
|
||||
struct{ statsreporter.Handler }{},
|
||||
).New(ctx, instrumentation.ToProviderSettings(), apiserver.Config{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -264,9 +264,9 @@ func NewSharderProviderFactories() factory.NamedMap[factory.ProviderFactory[shar
|
||||
)
|
||||
}
|
||||
|
||||
func NewStatsReporterProviderFactories(telemetryStore telemetrystore.TelemetryStore, collectors []statsreporter.StatsCollector, orgGetter organization.Getter, userGetter user.Getter, tokenizer tokenizer.Tokenizer, build version.Build, analyticsConfig analytics.Config) factory.NamedMap[factory.ProviderFactory[statsreporter.StatsReporter, statsreporter.Config]] {
|
||||
func NewStatsReporterProviderFactories(aggregator statsreporter.Aggregator, orgGetter organization.Getter, userGetter user.Getter, tokenizer tokenizer.Tokenizer, build version.Build, analyticsConfig analytics.Config) factory.NamedMap[factory.ProviderFactory[statsreporter.StatsReporter, statsreporter.Config]] {
|
||||
return factory.MustNewNamedMap(
|
||||
analyticsstatsreporter.NewFactory(telemetryStore, collectors, orgGetter, userGetter, tokenizer, build, analyticsConfig),
|
||||
analyticsstatsreporter.NewFactory(aggregator, orgGetter, userGetter, tokenizer, build, analyticsConfig),
|
||||
noopstatsreporter.NewFactory(),
|
||||
)
|
||||
}
|
||||
@@ -309,6 +309,7 @@ func NewAPIServerProviderFactories(orgGetter organization.Getter, authz authz.Au
|
||||
handlers.LLMPricingRuleHandler,
|
||||
handlers.TraceDetail,
|
||||
handlers.RulerHandler,
|
||||
handlers.StatsHandler,
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -85,8 +85,8 @@ func TestNewProviderFactories(t *testing.T) {
|
||||
|
||||
userGetter := impluser.NewGetter(impluser.NewStore(sqlstoretest.New(sqlstore.Config{Provider: "sqlite"}, sqlmock.QueryMatcherEqual), instrumentationtest.New().ToProviderSettings()), userRoleStore, flagger)
|
||||
orgGetter := implorganization.NewGetter(implorganization.NewStore(sqlstoretest.New(sqlstore.Config{Provider: "sqlite"}, sqlmock.QueryMatcherEqual)), nil)
|
||||
telemetryStore := telemetrystoretest.New(telemetrystore.Config{Provider: "clickhouse"}, sqlmock.QueryMatcherEqual)
|
||||
NewStatsReporterProviderFactories(telemetryStore, []statsreporter.StatsCollector{}, orgGetter, userGetter, tokenizertest.NewMockTokenizer(t), version.Build{}, analytics.Config{Enabled: true})
|
||||
statsAggregator := statsreporter.NewAggregator(providerSettings, []statsreporter.StatsCollector{})
|
||||
NewStatsReporterProviderFactories(statsAggregator, orgGetter, userGetter, tokenizertest.NewMockTokenizer(t), version.Build{}, analytics.Config{Enabled: true})
|
||||
})
|
||||
|
||||
assert.NotPanics(t, func() {
|
||||
|
||||
@@ -499,14 +499,18 @@ func New(
|
||||
serviceAccount,
|
||||
cloudIntegrationModule,
|
||||
modules.LogsPipeline,
|
||||
querier,
|
||||
}
|
||||
|
||||
// Initialize the stats aggregator (always-on, independent of whether reporting is enabled)
|
||||
statsAggregator := statsreporter.NewAggregator(providerSettings, statsCollectors)
|
||||
|
||||
// Initialize stats reporter from the available stats reporter provider factories
|
||||
statsReporter, err := factory.NewProviderFromNamedMap(
|
||||
ctx,
|
||||
providerSettings,
|
||||
config.StatsReporter,
|
||||
NewStatsReporterProviderFactories(telemetrystore, statsCollectors, orgGetter, userGetter, tokenizer, version.Info, config.Analytics),
|
||||
NewStatsReporterProviderFactories(statsAggregator, orgGetter, userGetter, tokenizer, version.Info, config.Analytics),
|
||||
config.StatsReporter.Provider(),
|
||||
)
|
||||
if err != nil {
|
||||
@@ -535,7 +539,7 @@ func New(
|
||||
|
||||
// Initialize all handlers for the modules
|
||||
registryHandler := factory.NewHandler(registry)
|
||||
handlers := NewHandlers(modules, providerSettings, analytics, querierHandler, licensing, global, flagger, gateway, telemetryMetadataStore, authz, zeus, registryHandler, alertmanager, rulerInstance)
|
||||
handlers := NewHandlers(modules, providerSettings, analytics, querierHandler, licensing, global, flagger, gateway, telemetryMetadataStore, authz, zeus, registryHandler, alertmanager, rulerInstance, statsAggregator)
|
||||
|
||||
// Initialize the API server (after registry so it can access service health)
|
||||
apiserverInstance, err := factory.NewProviderFromNamedMap(
|
||||
|
||||
67
pkg/statsreporter/aggregator.go
Normal file
67
pkg/statsreporter/aggregator.go
Normal file
@@ -0,0 +1,67 @@
|
||||
package statsreporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/types/ctxtypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/instrumentationtypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
// Aggregator aggregates stats from every registered StatsCollector for a single organization.
|
||||
type Aggregator interface {
|
||||
Aggregate(ctx context.Context, orgID valuer.UUID) (map[string]any, error)
|
||||
}
|
||||
|
||||
type aggregator struct {
|
||||
// settings
|
||||
settings factory.ScopedProviderSettings
|
||||
|
||||
// a list of collectors, used to collect stats from across the codebase
|
||||
collectors []StatsCollector
|
||||
}
|
||||
|
||||
func NewAggregator(providerSettings factory.ProviderSettings, collectors []StatsCollector) Aggregator {
|
||||
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/statsreporter")
|
||||
|
||||
return &aggregator{
|
||||
settings: settings,
|
||||
collectors: collectors,
|
||||
}
|
||||
}
|
||||
|
||||
func (aggregator *aggregator) Aggregate(ctx context.Context, orgID valuer.UUID) (map[string]any, error) {
|
||||
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
|
||||
instrumentationtypes.CodeNamespace: "statsreporter",
|
||||
instrumentationtypes.CodeFunctionName: "Aggregate",
|
||||
})
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(aggregator.collectors))
|
||||
|
||||
stats := make(map[string]any, 0)
|
||||
mtx := sync.Mutex{}
|
||||
|
||||
for _, collector := range aggregator.collectors {
|
||||
go func(collector StatsCollector) {
|
||||
defer wg.Done()
|
||||
|
||||
collectorStats, err := collector.Collect(ctx, orgID)
|
||||
if err != nil {
|
||||
aggregator.settings.Logger().ErrorContext(ctx, "failed to collect stats", errors.Attr(err))
|
||||
return
|
||||
}
|
||||
|
||||
mtx.Lock()
|
||||
for k, v := range collectorStats {
|
||||
stats[k] = v
|
||||
}
|
||||
mtx.Unlock()
|
||||
}(collector)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
return stats, nil
|
||||
}
|
||||
@@ -3,7 +3,6 @@ package analyticsstatsreporter
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
@@ -16,11 +15,8 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/modules/organization"
|
||||
"github.com/SigNoz/signoz/pkg/modules/user"
|
||||
"github.com/SigNoz/signoz/pkg/statsreporter"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/tokenizer"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/SigNoz/signoz/pkg/types/ctxtypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/instrumentationtypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/SigNoz/signoz/pkg/version"
|
||||
)
|
||||
@@ -32,11 +28,8 @@ type provider struct {
|
||||
// config
|
||||
config statsreporter.Config
|
||||
|
||||
// used to get telemetry details. srikanthcvv to move this to the querier layer
|
||||
telemetryStore telemetrystore.TelemetryStore
|
||||
|
||||
// a list of collectors, used to collect stats from across the codebase
|
||||
collectors []statsreporter.StatsCollector
|
||||
// used to aggregate stats for an organization
|
||||
aggregator statsreporter.Aggregator
|
||||
|
||||
// used to get organizations
|
||||
orgGetter organization.Getter
|
||||
@@ -60,9 +53,9 @@ type provider struct {
|
||||
stopC chan struct{}
|
||||
}
|
||||
|
||||
func NewFactory(telemetryStore telemetrystore.TelemetryStore, collectors []statsreporter.StatsCollector, orgGetter organization.Getter, userGetter user.Getter, tokenizer tokenizer.Tokenizer, build version.Build, analyticsConfig analytics.Config) factory.ProviderFactory[statsreporter.StatsReporter, statsreporter.Config] {
|
||||
func NewFactory(aggregator statsreporter.Aggregator, orgGetter organization.Getter, userGetter user.Getter, tokenizer tokenizer.Tokenizer, build version.Build, analyticsConfig analytics.Config) factory.ProviderFactory[statsreporter.StatsReporter, statsreporter.Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("analytics"), func(ctx context.Context, settings factory.ProviderSettings, config statsreporter.Config) (statsreporter.StatsReporter, error) {
|
||||
return New(ctx, settings, config, telemetryStore, collectors, orgGetter, userGetter, tokenizer, build, analyticsConfig)
|
||||
return New(ctx, settings, config, aggregator, orgGetter, userGetter, tokenizer, build, analyticsConfig)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -70,8 +63,7 @@ func New(
|
||||
ctx context.Context,
|
||||
providerSettings factory.ProviderSettings,
|
||||
config statsreporter.Config,
|
||||
telemetryStore telemetrystore.TelemetryStore,
|
||||
collectors []statsreporter.StatsCollector,
|
||||
aggregator statsreporter.Aggregator,
|
||||
orgGetter organization.Getter,
|
||||
userGetter user.Getter,
|
||||
tokenizer tokenizer.Tokenizer,
|
||||
@@ -86,17 +78,16 @@ func New(
|
||||
}
|
||||
|
||||
return &provider{
|
||||
settings: settings,
|
||||
config: config,
|
||||
telemetryStore: telemetryStore,
|
||||
collectors: collectors,
|
||||
orgGetter: orgGetter,
|
||||
userGetter: userGetter,
|
||||
analytics: analytics,
|
||||
tokenizer: tokenizer,
|
||||
build: build,
|
||||
deployment: deployment,
|
||||
stopC: make(chan struct{}),
|
||||
settings: settings,
|
||||
config: config,
|
||||
aggregator: aggregator,
|
||||
orgGetter: orgGetter,
|
||||
userGetter: userGetter,
|
||||
analytics: analytics,
|
||||
tokenizer: tokenizer,
|
||||
build: build,
|
||||
deployment: deployment,
|
||||
stopC: make(chan struct{}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -134,7 +125,12 @@ func (provider *provider) Report(ctx context.Context) error {
|
||||
}
|
||||
|
||||
for _, org := range orgs {
|
||||
stats := provider.collectOrg(ctx, org.ID)
|
||||
stats, err := provider.aggregator.Aggregate(ctx, org.ID)
|
||||
if err != nil {
|
||||
provider.settings.Logger().WarnContext(ctx, "failed to aggregate stats", errors.Attr(err), slog.Any("org_id", org.ID))
|
||||
continue
|
||||
}
|
||||
|
||||
if len(stats) == 0 {
|
||||
provider.settings.Logger().WarnContext(ctx, "no stats collected", slog.Any("org_id", org.ID))
|
||||
continue
|
||||
@@ -204,75 +200,3 @@ func (provider *provider) Stop(ctx context.Context) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (provider *provider) collectOrg(ctx context.Context, orgID valuer.UUID) map[string]any {
|
||||
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
|
||||
instrumentationtypes.CodeNamespace: "statsreporter",
|
||||
instrumentationtypes.CodeFunctionName: "collectOrg",
|
||||
})
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(provider.collectors))
|
||||
|
||||
stats := make(map[string]any, 0)
|
||||
mtx := sync.Mutex{}
|
||||
|
||||
for _, collector := range provider.collectors {
|
||||
go func(collector statsreporter.StatsCollector) {
|
||||
defer wg.Done()
|
||||
|
||||
collectorStats, err := collector.Collect(ctx, orgID)
|
||||
if err != nil {
|
||||
provider.settings.Logger().ErrorContext(ctx, "failed to collect stats", errors.Attr(err))
|
||||
return
|
||||
}
|
||||
|
||||
mtx.Lock()
|
||||
for k, v := range collectorStats {
|
||||
stats[k] = v
|
||||
}
|
||||
mtx.Unlock()
|
||||
}(collector)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
var traces uint64
|
||||
if err := provider.telemetryStore.ClickhouseDB().QueryRow(ctx, "SELECT COUNT(*) FROM signoz_traces.distributed_signoz_index_v3").Scan(&traces); err == nil {
|
||||
stats["telemetry.traces.count"] = traces
|
||||
}
|
||||
|
||||
var logs uint64
|
||||
if err := provider.telemetryStore.ClickhouseDB().QueryRow(ctx, "SELECT COUNT(*) FROM signoz_logs.distributed_logs_v2").Scan(&logs); err == nil {
|
||||
stats["telemetry.logs.count"] = logs
|
||||
}
|
||||
|
||||
var metrics uint64
|
||||
if err := provider.telemetryStore.ClickhouseDB().QueryRow(ctx, "SELECT COUNT(*) FROM signoz_metrics.distributed_samples_v4").Scan(&metrics); err == nil {
|
||||
stats["telemetry.metrics.count"] = metrics
|
||||
}
|
||||
|
||||
var tracesLastSeenAt time.Time
|
||||
if err := provider.telemetryStore.ClickhouseDB().QueryRow(ctx, "SELECT max(timestamp) FROM signoz_traces.distributed_signoz_index_v3").Scan(&tracesLastSeenAt); err == nil {
|
||||
if tracesLastSeenAt.Unix() != 0 {
|
||||
stats["telemetry.traces.last_observed.time"] = tracesLastSeenAt.UTC()
|
||||
stats["telemetry.traces.last_observed.time_unix"] = tracesLastSeenAt.Unix()
|
||||
}
|
||||
}
|
||||
|
||||
var logsLastSeenAt time.Time
|
||||
if err := provider.telemetryStore.ClickhouseDB().QueryRow(ctx, "SELECT fromUnixTimestamp64Nano(max(timestamp)) FROM signoz_logs.distributed_logs_v2").Scan(&logsLastSeenAt); err == nil {
|
||||
if logsLastSeenAt.Unix() != 0 {
|
||||
stats["telemetry.logs.last_observed.time"] = logsLastSeenAt.UTC()
|
||||
stats["telemetry.logs.last_observed.time_unix"] = logsLastSeenAt.Unix()
|
||||
}
|
||||
}
|
||||
|
||||
var metricsLastSeenAt time.Time
|
||||
if err := provider.telemetryStore.ClickhouseDB().QueryRow(ctx, "SELECT toDateTime(max(unix_milli) / 1000) FROM signoz_metrics.distributed_samples_v4").Scan(&metricsLastSeenAt); err == nil {
|
||||
if metricsLastSeenAt.Unix() != 0 {
|
||||
stats["telemetry.metrics.last_observed.time"] = metricsLastSeenAt.UTC()
|
||||
stats["telemetry.metrics.last_observed.time_unix"] = metricsLastSeenAt.Unix()
|
||||
}
|
||||
}
|
||||
|
||||
return stats
|
||||
}
|
||||
|
||||
50
pkg/statsreporter/handler.go
Normal file
50
pkg/statsreporter/handler.go
Normal file
@@ -0,0 +1,50 @@
|
||||
package statsreporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/http/render"
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
type Handler interface {
|
||||
Get(http.ResponseWriter, *http.Request)
|
||||
}
|
||||
|
||||
type handler struct {
|
||||
aggregator Aggregator
|
||||
}
|
||||
|
||||
func NewHandler(aggregator Aggregator) Handler {
|
||||
return &handler{
|
||||
aggregator: aggregator,
|
||||
}
|
||||
}
|
||||
|
||||
func (handler *handler) Get(rw http.ResponseWriter, r *http.Request) {
|
||||
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
claims, err := authtypes.ClaimsFromContext(ctx)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
orgID, err := valuer.NewUUID(claims.OrgID)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
stats, err := handler.aggregator.Aggregate(ctx, orgID)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
render.Success(rw, http.StatusOK, stats)
|
||||
}
|
||||
@@ -1,160 +0,0 @@
|
||||
package spantypes
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"go.opentelemetry.io/collector/pdata/ptrace"
|
||||
)
|
||||
|
||||
var resourceSourcePrefix = FieldContextResource.StringValue() + "."
|
||||
|
||||
var ErrCodeMappingPreviewFailed = errors.MustNewCode("span_attribute_mapping_preview_failed")
|
||||
|
||||
type SpanMappingPreviewSpan struct {
|
||||
ResourceAttributes map[string]any `json:"resourceAttributes" nullable:"true"`
|
||||
SpanAttributes map[string]any `json:"spanAttributes" nullable:"true"`
|
||||
}
|
||||
|
||||
type SpanMappingPreviewRequest struct {
|
||||
Attributes *SpanMappingPreviewSpan `json:"attributes" nullable:"true"`
|
||||
OtlpTraces map[string]any `json:"otlpTraces" nullable:"true"`
|
||||
GroupID *string `json:"groupId" nullable:"true"`
|
||||
}
|
||||
|
||||
type SpanMappingPreviewResponse struct {
|
||||
Attributes *SpanMappingPreviewSpan `json:"attributes,omitempty" nullable:"true"`
|
||||
OtlpTraces map[string]any `json:"otlpTraces,omitempty" nullable:"true"`
|
||||
}
|
||||
|
||||
func SimulateSpanMapping(groups []*SpanMapperGroupWithMappers, resourceAttrs, spanAttrs map[string]any) (outResource, outSpan map[string]any) {
|
||||
cfg := buildProcessorConfig(filterEnabledGroupsWithMappers(groups))
|
||||
|
||||
outResource = cloneAttrs(resourceAttrs)
|
||||
outSpan = cloneAttrs(spanAttrs)
|
||||
|
||||
applyEnabledGroups(cfg, outSpan, outResource)
|
||||
return outResource, outSpan
|
||||
}
|
||||
|
||||
func SimulateSpanMappingOTLP(groups []*SpanMapperGroupWithMappers, otlp []byte) ([]byte, error) {
|
||||
td, err := (&ptrace.JSONUnmarshaler{}).UnmarshalTraces(otlp)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInvalidInputf(err, ErrCodeMappingInvalidInput, "invalid OTLP traces payload")
|
||||
}
|
||||
|
||||
cfg := buildProcessorConfig(filterEnabledGroupsWithMappers(groups))
|
||||
|
||||
rss := td.ResourceSpans()
|
||||
for i := 0; i < rss.Len(); i++ {
|
||||
rs := rss.At(i)
|
||||
resourceAttrs := rs.Resource().Attributes().AsRaw()
|
||||
|
||||
scopeSpans := rs.ScopeSpans()
|
||||
for j := 0; j < scopeSpans.Len(); j++ {
|
||||
spans := scopeSpans.At(j).Spans()
|
||||
for k := 0; k < spans.Len(); k++ {
|
||||
span := spans.At(k)
|
||||
spanAttrs := span.Attributes().AsRaw()
|
||||
applyEnabledGroups(cfg, spanAttrs, resourceAttrs)
|
||||
if err := span.Attributes().FromRaw(spanAttrs); err != nil {
|
||||
return nil, errors.WrapInternalf(err, ErrCodeMappingPreviewFailed, "could not write transformed span attributes")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := rs.Resource().Attributes().FromRaw(resourceAttrs); err != nil {
|
||||
return nil, errors.WrapInternalf(err, ErrCodeMappingPreviewFailed, "could not write transformed resource attributes")
|
||||
}
|
||||
}
|
||||
|
||||
out, err := (&ptrace.JSONMarshaler{}).MarshalTraces(td)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(err, ErrCodeMappingPreviewFailed, "could not marshal transformed traces")
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// TODO(spanmapper-preview): the apply logic below is a temporary copy of the signozspanmapper processor — remove it and call the real processor once signoz-otel-collector#796 is merged.
|
||||
func applyEnabledGroups(cfg *spanMapperProcessorConfig, spanAttrs, resourceAttrs map[string]any) {
|
||||
for i := range cfg.Groups {
|
||||
g := &cfg.Groups[i]
|
||||
if !spanMapperConditionMet(g.ExistsAny, spanAttrs, resourceAttrs) {
|
||||
continue
|
||||
}
|
||||
for j := range g.Attributes {
|
||||
applySpanMapperRule(&g.Attributes[j], spanAttrs, resourceAttrs)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// filterEnabledGroupsWithMappers keeps only enabled groups and their enabled
|
||||
// mappers, dropping groups left with no enabled mappers.
|
||||
func filterEnabledGroupsWithMappers(groups []*SpanMapperGroupWithMappers) []*SpanMapperGroupWithMappers {
|
||||
out := make([]*SpanMapperGroupWithMappers, 0, len(groups))
|
||||
for _, gm := range groups {
|
||||
if gm == nil || gm.Group == nil || !gm.Group.Enabled {
|
||||
continue
|
||||
}
|
||||
enabled := make([]*SpanMapper, 0, len(gm.Mappers))
|
||||
for _, m := range gm.Mappers {
|
||||
if m != nil && m.Enabled {
|
||||
enabled = append(enabled, m)
|
||||
}
|
||||
}
|
||||
if len(enabled) > 0 {
|
||||
out = append(out, &SpanMapperGroupWithMappers{Group: gm.Group, Mappers: enabled})
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func spanMapperConditionMet(cond spanMapperProcessorExistsAny, spanAttrs, resourceAttrs map[string]any) bool {
|
||||
return anyKeyContains(spanAttrs, cond.Attributes) || anyKeyContains(resourceAttrs, cond.Resource)
|
||||
}
|
||||
|
||||
func anyKeyContains(attrs map[string]any, patterns []string) bool {
|
||||
for k := range attrs {
|
||||
for _, p := range patterns {
|
||||
if strings.Contains(k, p) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func applySpanMapperRule(rule *spanMapperProcessorAttribute, spanAttrs, resourceAttrs map[string]any) {
|
||||
dst := spanAttrs
|
||||
if rule.Context == FieldContextResource.StringValue() {
|
||||
dst = resourceAttrs
|
||||
}
|
||||
|
||||
for i := range rule.Sources {
|
||||
src := &rule.Sources[i]
|
||||
bare, isResource := strings.CutPrefix(src.Key, resourceSourcePrefix)
|
||||
|
||||
from := spanAttrs
|
||||
if isResource {
|
||||
from = resourceAttrs
|
||||
}
|
||||
val, ok := from[bare]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
dst[rule.Target] = val
|
||||
if src.Action == SpanMapperOperationMove.StringValue() {
|
||||
delete(from, bare)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func cloneAttrs(in map[string]any) map[string]any {
|
||||
out := make(map[string]any, len(in))
|
||||
for k, v := range in {
|
||||
out[k] = v
|
||||
}
|
||||
return out
|
||||
}
|
||||
@@ -1,192 +0,0 @@
|
||||
package spantypes
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.opentelemetry.io/collector/pdata/ptrace"
|
||||
)
|
||||
|
||||
func simGroup(name string, attrCond, resCond []string, mappers ...*SpanMapper) *SpanMapperGroupWithMappers {
|
||||
return &SpanMapperGroupWithMappers{
|
||||
Group: &SpanMapperGroup{
|
||||
Name: name,
|
||||
Condition: SpanMapperGroupCondition{Attributes: attrCond, Resource: resCond},
|
||||
Enabled: true,
|
||||
},
|
||||
Mappers: mappers,
|
||||
}
|
||||
}
|
||||
|
||||
func simMapper(target string, ctx FieldContext, sources ...SpanMapperSource) *SpanMapper {
|
||||
return &SpanMapper{
|
||||
Name: target,
|
||||
FieldContext: ctx,
|
||||
Config: SpanMapperConfig{Sources: sources},
|
||||
Enabled: true,
|
||||
}
|
||||
}
|
||||
|
||||
func simAttrSrc(key string, op SpanMapperOperation, priority int) SpanMapperSource {
|
||||
return SpanMapperSource{Key: key, Context: FieldContextSpanAttribute, Operation: op, Priority: priority}
|
||||
}
|
||||
|
||||
func simResSrc(key string, op SpanMapperOperation, priority int) SpanMapperSource {
|
||||
return SpanMapperSource{Key: key, Context: FieldContextResource, Operation: op, Priority: priority}
|
||||
}
|
||||
|
||||
func TestSimulate_MatchInSpanAttrs(t *testing.T) {
|
||||
groups := []*SpanMapperGroupWithMappers{
|
||||
simGroup("llm", []string{"model"}, nil,
|
||||
simMapper("gen_ai.request.model", FieldContextSpanAttribute,
|
||||
simAttrSrc("llm.model", SpanMapperOperationCopy, 1)),
|
||||
),
|
||||
}
|
||||
_, outSpan := SimulateSpanMapping(groups, nil, map[string]any{"llm.model": "gpt-4", "gen_ai.llm.model": "gpt-40"})
|
||||
|
||||
assert.Equal(t, "gpt-4", outSpan["gen_ai.request.model"])
|
||||
}
|
||||
|
||||
func TestSimulate_MatchInResourceAttrs(t *testing.T) {
|
||||
groups := []*SpanMapperGroupWithMappers{
|
||||
simGroup("llm", nil, []string{"service.name"},
|
||||
simMapper("gen_ai.request.model", FieldContextSpanAttribute,
|
||||
simResSrc("service.name", SpanMapperOperationCopy, 1)),
|
||||
),
|
||||
}
|
||||
_, outSpan := SimulateSpanMapping(groups, map[string]any{"service.name": "my-llm-service"}, nil)
|
||||
|
||||
assert.Equal(t, "my-llm-service", outSpan["gen_ai.request.model"])
|
||||
}
|
||||
|
||||
func TestSimulate_NoMatchSkipsGroup(t *testing.T) {
|
||||
groups := []*SpanMapperGroupWithMappers{
|
||||
simGroup("llm", []string{"model"}, nil,
|
||||
simMapper("gen_ai.request.model", FieldContextSpanAttribute,
|
||||
simAttrSrc("llm.model", SpanMapperOperationCopy, 1)),
|
||||
),
|
||||
}
|
||||
_, outSpan := SimulateSpanMapping(groups, nil, map[string]any{"some.other.key": "value"})
|
||||
|
||||
_, ok := outSpan["gen_ai.request.model"]
|
||||
assert.False(t, ok, "target must not be set when condition is not met")
|
||||
}
|
||||
|
||||
func TestSimulate_SourceFirstMatchWins(t *testing.T) {
|
||||
groups := []*SpanMapperGroupWithMappers{
|
||||
simGroup("tokens", []string{"llm"}, nil,
|
||||
simMapper("gen_ai.request.tokens", FieldContextSpanAttribute,
|
||||
simAttrSrc("gen_ai.request_tokens", SpanMapperOperationCopy, 2),
|
||||
simAttrSrc("llm.tokens", SpanMapperOperationCopy, 1)),
|
||||
),
|
||||
}
|
||||
_, outSpan := SimulateSpanMapping(groups, nil, map[string]any{"gen_ai.request_tokens": "100", "llm.tokens": "200"})
|
||||
|
||||
assert.Equal(t, "100", outSpan["gen_ai.request.tokens"])
|
||||
}
|
||||
|
||||
func TestSimulate_SourceFallsBackToSecond(t *testing.T) {
|
||||
groups := []*SpanMapperGroupWithMappers{
|
||||
simGroup("tokens", []string{"llm"}, nil,
|
||||
simMapper("gen_ai.request.tokens", FieldContextSpanAttribute,
|
||||
simAttrSrc("gen_ai.request_tokens", SpanMapperOperationCopy, 2),
|
||||
simAttrSrc("llm.tokens", SpanMapperOperationCopy, 1)),
|
||||
),
|
||||
}
|
||||
_, outSpan := SimulateSpanMapping(groups, nil, map[string]any{"llm.tokens": "200"})
|
||||
|
||||
assert.Equal(t, "200", outSpan["gen_ai.request.tokens"])
|
||||
}
|
||||
|
||||
func TestSimulate_ActionMove(t *testing.T) {
|
||||
groups := []*SpanMapperGroupWithMappers{
|
||||
simGroup("input", []string{"gen_ai"}, nil,
|
||||
simMapper("gen_ai.request.input", FieldContextSpanAttribute,
|
||||
simAttrSrc("gen_ai.input", SpanMapperOperationMove, 1)),
|
||||
),
|
||||
}
|
||||
_, outSpan := SimulateSpanMapping(groups, nil, map[string]any{"gen_ai.input": "hello"})
|
||||
|
||||
assert.Equal(t, "hello", outSpan["gen_ai.request.input"])
|
||||
_, srcPresent := outSpan["gen_ai.input"]
|
||||
assert.False(t, srcPresent, "source key must be deleted when action=move")
|
||||
}
|
||||
|
||||
func TestSimulate_WriteToResourceContext(t *testing.T) {
|
||||
groups := []*SpanMapperGroupWithMappers{
|
||||
simGroup("llm", []string{"llm"}, nil,
|
||||
simMapper("gen_ai.request.model", FieldContextResource,
|
||||
simAttrSrc("llm.model", SpanMapperOperationCopy, 1)),
|
||||
),
|
||||
}
|
||||
outResource, outSpan := SimulateSpanMapping(groups, nil, map[string]any{"llm.model": "gpt-4"})
|
||||
|
||||
assert.Equal(t, "gpt-4", outResource["gen_ai.request.model"], "target must be written to resource attributes")
|
||||
_, inSpan := outSpan["gen_ai.request.model"]
|
||||
assert.False(t, inSpan)
|
||||
}
|
||||
|
||||
func TestSimulate_DisabledGroupsAndMappersSkipped(t *testing.T) {
|
||||
disabledGroup := simGroup("g1", []string{"llm"}, nil,
|
||||
simMapper("gen_ai.request.model", FieldContextSpanAttribute,
|
||||
simAttrSrc("llm.model", SpanMapperOperationCopy, 1)))
|
||||
disabledGroup.Group.Enabled = false
|
||||
|
||||
_, outSpan := SimulateSpanMapping([]*SpanMapperGroupWithMappers{disabledGroup}, nil, map[string]any{"llm.model": "gpt-4"})
|
||||
|
||||
_, ok := outSpan["gen_ai.request.model"]
|
||||
assert.False(t, ok, "disabled groups must not be evaluated")
|
||||
}
|
||||
|
||||
func TestSimulate_DoesNotMutateInput(t *testing.T) {
|
||||
input := map[string]any{"gen_ai.input": "hi"}
|
||||
groups := []*SpanMapperGroupWithMappers{
|
||||
simGroup("input", []string{"gen_ai"}, nil,
|
||||
simMapper("gen_ai.request.input", FieldContextSpanAttribute,
|
||||
simAttrSrc("gen_ai.input", SpanMapperOperationMove, 1))),
|
||||
}
|
||||
_, _ = SimulateSpanMapping(groups, nil, input)
|
||||
|
||||
// Original input map must be untouched (move would have deleted the key).
|
||||
_, ok := input["gen_ai.input"]
|
||||
assert.True(t, ok, "input map must not be mutated by the preview")
|
||||
}
|
||||
|
||||
func TestSimulateOTLP_TransformsSpan(t *testing.T) {
|
||||
groups := []*SpanMapperGroupWithMappers{
|
||||
simGroup("llm", []string{"model"}, nil,
|
||||
simMapper("gen_ai.request.model", FieldContextSpanAttribute,
|
||||
simAttrSrc("llm.model", SpanMapperOperationMove, 1)),
|
||||
),
|
||||
}
|
||||
|
||||
otlp := []byte(`{
|
||||
"resourceSpans": [{
|
||||
"resource": {"attributes": [{"key": "service.name", "value": {"stringValue": "checkout"}}]},
|
||||
"scopeSpans": [{
|
||||
"spans": [{"attributes": [{"key": "llm.model", "value": {"stringValue": "gpt-4"}}]}]
|
||||
}]
|
||||
}]
|
||||
}`)
|
||||
|
||||
out, err := SimulateSpanMappingOTLP(groups, otlp)
|
||||
require.NoError(t, err)
|
||||
|
||||
td, err := (&ptrace.JSONUnmarshaler{}).UnmarshalTraces(out)
|
||||
require.NoError(t, err)
|
||||
|
||||
span := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0)
|
||||
v, ok := span.Attributes().Get("gen_ai.request.model")
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, "gpt-4", v.Str())
|
||||
|
||||
// move must have deleted the source.
|
||||
_, srcPresent := span.Attributes().Get("llm.model")
|
||||
assert.False(t, srcPresent)
|
||||
}
|
||||
|
||||
func TestSimulateOTLP_Invalid(t *testing.T) {
|
||||
_, err := SimulateSpanMappingOTLP(nil, []byte(`{ not json`))
|
||||
assert.Error(t, err)
|
||||
}
|
||||
Reference in New Issue
Block a user