Compare commits

..

3 Commits

Author SHA1 Message Date
grandwizard28
9237e25ef8 chore: regenerate openapi spec and frontend client
Backend docs/api/openapi.yml gains the GET /api/v1/stats (GetStats)
operation; the Orval client gains a useGetStats hook and GetStats200
type.
2026-06-12 16:16:47 +05:30
grandwizard28
867d626b27 refactor(statsreporter): collect telemetry stats via the querier
Move the trace/log/metric row-count and last-observed queries out of the
stats aggregator and into the querier, which now implements
statsreporter.StatsCollector. The aggregator becomes a pure collector
fan-out and no longer depends on telemetrystore; the querier is wired in
as one of the stats collectors.
2026-06-12 16:03:48 +05:30
grandwizard28
0769ac0e7d feat(statsreporter): expose collected stats via GET /api/v1/stats
Extract per-org stats collection out of the analytics reporter into an
always-on Aggregator (collector fan-out + telemetry-store counts) shared
by the reporter and a new HTTP handler. The GET /api/v1/stats endpoint
returns the caller's org stats regardless of whether scheduled reporting
is enabled.
2026-06-12 15:36:59 +05:30
23 changed files with 390 additions and 788 deletions

View File

@@ -7185,39 +7185,6 @@ components:
- operation
- priority
type: object
SpantypesSpanMappingPreviewRequest:
properties:
attributes:
$ref: '#/components/schemas/SpantypesSpanMappingPreviewSpan'
groupId:
nullable: true
type: string
otlpTraces:
additionalProperties: {}
nullable: true
type: object
type: object
SpantypesSpanMappingPreviewResponse:
properties:
attributes:
$ref: '#/components/schemas/SpantypesSpanMappingPreviewSpan'
otlpTraces:
additionalProperties: {}
nullable: true
type: object
type: object
SpantypesSpanMappingPreviewSpan:
nullable: true
properties:
resourceAttributes:
additionalProperties: {}
nullable: true
type: object
spanAttributes:
additionalProperties: {}
nullable: true
type: object
type: object
SpantypesUpdatableSpanMapper:
properties:
config:
@@ -12823,17 +12790,11 @@ paths:
summary: Update a span mapper
tags:
- spanmapper
/api/v1/span_mapper_groups/preview:
post:
/api/v1/stats:
get:
deprecated: false
description: Previews how the org's saved attribute mappings would transform
a sample span.
operationId: PreviewSpanMapping
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/SpantypesSpanMappingPreviewRequest'
description: This endpoint returns the collected stats for the organization
operationId: GetStats
responses:
"200":
content:
@@ -12841,7 +12802,8 @@ paths:
schema:
properties:
data:
$ref: '#/components/schemas/SpantypesSpanMappingPreviewResponse'
additionalProperties: {}
type: object
status:
type: string
required:
@@ -12849,12 +12811,6 @@ paths:
- data
type: object
description: OK
"400":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Bad Request
"401":
content:
application/json:
@@ -12867,12 +12823,6 @@ paths:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Forbidden
"404":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Not Found
"500":
content:
application/json:
@@ -12884,9 +12834,9 @@ paths:
- VIEWER
- tokenizer:
- VIEWER
summary: Preview span attribute mapping against a sample span
summary: Get stats
tags:
- spanmapper
- stats
/api/v1/testChannel:
post:
deprecated: true

View File

@@ -8440,83 +8440,6 @@ export interface SpantypesSpanMapperDTO {
updatedBy?: string;
}
export type SpantypesSpanMappingPreviewRequestDTOOtlpTracesAnyOf = {
[key: string]: unknown;
};
/**
* @nullable
*/
export type SpantypesSpanMappingPreviewRequestDTOOtlpTraces =
SpantypesSpanMappingPreviewRequestDTOOtlpTracesAnyOf | null;
export type SpantypesSpanMappingPreviewSpanDTOAnyOfResourceAttributesAnyOf = {
[key: string]: unknown;
};
/**
* @nullable
*/
export type SpantypesSpanMappingPreviewSpanDTOAnyOfResourceAttributes =
SpantypesSpanMappingPreviewSpanDTOAnyOfResourceAttributesAnyOf | null;
export type SpantypesSpanMappingPreviewSpanDTOAnyOfSpanAttributesAnyOf = {
[key: string]: unknown;
};
/**
* @nullable
*/
export type SpantypesSpanMappingPreviewSpanDTOAnyOfSpanAttributes =
SpantypesSpanMappingPreviewSpanDTOAnyOfSpanAttributesAnyOf | null;
export type SpantypesSpanMappingPreviewSpanDTOAnyOf = {
/**
* @type object,null
*/
resourceAttributes?: SpantypesSpanMappingPreviewSpanDTOAnyOfResourceAttributes;
/**
* @type object,null
*/
spanAttributes?: SpantypesSpanMappingPreviewSpanDTOAnyOfSpanAttributes;
};
/**
* @nullable
*/
export type SpantypesSpanMappingPreviewSpanDTO =
SpantypesSpanMappingPreviewSpanDTOAnyOf | null;
export interface SpantypesSpanMappingPreviewRequestDTO {
attributes?: SpantypesSpanMappingPreviewSpanDTO | null;
/**
* @type string,null
*/
groupId?: string | null;
/**
* @type object,null
*/
otlpTraces?: SpantypesSpanMappingPreviewRequestDTOOtlpTraces;
}
export type SpantypesSpanMappingPreviewResponseDTOOtlpTracesAnyOf = {
[key: string]: unknown;
};
/**
* @nullable
*/
export type SpantypesSpanMappingPreviewResponseDTOOtlpTraces =
SpantypesSpanMappingPreviewResponseDTOOtlpTracesAnyOf | null;
export interface SpantypesSpanMappingPreviewResponseDTO {
attributes?: SpantypesSpanMappingPreviewSpanDTO | null;
/**
* @type object,null
*/
otlpTraces?: SpantypesSpanMappingPreviewResponseDTOOtlpTraces;
}
export interface SpantypesUpdatableSpanMapperDTO {
config?: SpantypesSpanMapperConfigDTO;
/**
@@ -9823,8 +9746,13 @@ export type UpdateSpanMapperPathParameters = {
groupId: string;
mapperId: string;
};
export type PreviewSpanMapping200 = {
data: SpantypesSpanMappingPreviewResponseDTO;
export type GetStats200Data = { [key: string]: unknown };
export type GetStats200 = {
/**
* @type object
*/
data: GetStats200Data;
/**
* @type string
*/

View File

@@ -27,11 +27,9 @@ import type {
ListSpanMapperGroupsParams,
ListSpanMappers200,
ListSpanMappersPathParameters,
PreviewSpanMapping200,
RenderErrorResponseDTO,
SpantypesPostableSpanMapperDTO,
SpantypesPostableSpanMapperGroupDTO,
SpantypesSpanMappingPreviewRequestDTO,
SpantypesUpdatableSpanMapperDTO,
SpantypesUpdatableSpanMapperGroupDTO,
UpdateSpanMapperGroupPathParameters,
@@ -782,86 +780,3 @@ export const useUpdateSpanMapper = <
> => {
return useMutation(getUpdateSpanMapperMutationOptions(options));
};
/**
* Previews how the org's saved attribute mappings would transform a sample span.
* @summary Preview span attribute mapping against a sample span
*/
export const previewSpanMapping = (
spantypesSpanMappingPreviewRequestDTO?: BodyType<SpantypesSpanMappingPreviewRequestDTO>,
signal?: AbortSignal,
) => {
return GeneratedAPIInstance<PreviewSpanMapping200>({
url: `/api/v1/span_mapper_groups/preview`,
method: 'POST',
headers: { 'Content-Type': 'application/json' },
data: spantypesSpanMappingPreviewRequestDTO,
signal,
});
};
export const getPreviewSpanMappingMutationOptions = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof previewSpanMapping>>,
TError,
{ data?: BodyType<SpantypesSpanMappingPreviewRequestDTO> },
TContext
>;
}): UseMutationOptions<
Awaited<ReturnType<typeof previewSpanMapping>>,
TError,
{ data?: BodyType<SpantypesSpanMappingPreviewRequestDTO> },
TContext
> => {
const mutationKey = ['previewSpanMapping'];
const { mutation: mutationOptions } = options
? options.mutation &&
'mutationKey' in options.mutation &&
options.mutation.mutationKey
? options
: { ...options, mutation: { ...options.mutation, mutationKey } }
: { mutation: { mutationKey } };
const mutationFn: MutationFunction<
Awaited<ReturnType<typeof previewSpanMapping>>,
{ data?: BodyType<SpantypesSpanMappingPreviewRequestDTO> }
> = (props) => {
const { data } = props ?? {};
return previewSpanMapping(data);
};
return { mutationFn, ...mutationOptions };
};
export type PreviewSpanMappingMutationResult = NonNullable<
Awaited<ReturnType<typeof previewSpanMapping>>
>;
export type PreviewSpanMappingMutationBody =
| BodyType<SpantypesSpanMappingPreviewRequestDTO>
| undefined;
export type PreviewSpanMappingMutationError = ErrorType<RenderErrorResponseDTO>;
/**
* @summary Preview span attribute mapping against a sample span
*/
export const usePreviewSpanMapping = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof previewSpanMapping>>,
TError,
{ data?: BodyType<SpantypesSpanMappingPreviewRequestDTO> },
TContext
>;
}): UseMutationResult<
Awaited<ReturnType<typeof previewSpanMapping>>,
TError,
{ data?: BodyType<SpantypesSpanMappingPreviewRequestDTO> },
TContext
> => {
return useMutation(getPreviewSpanMappingMutationOptions(options));
};

View File

@@ -0,0 +1,96 @@
/**
* ! Do not edit manually
* * The file has been auto-generated using Orval for SigNoz
* * regenerate with 'pnpm generate:api'
* SigNoz
*/
import { useQuery } from 'react-query';
import type {
InvalidateOptions,
QueryClient,
QueryFunction,
QueryKey,
UseQueryOptions,
UseQueryResult,
} from 'react-query';
import type { GetStats200, RenderErrorResponseDTO } from '../sigNoz.schemas';
import { GeneratedAPIInstance } from '../../../generatedAPIInstance';
import type { ErrorType } from '../../../generatedAPIInstance';
/**
* This endpoint returns the collected stats for the organization
* @summary Get stats
*/
export const getStats = (signal?: AbortSignal) => {
return GeneratedAPIInstance<GetStats200>({
url: `/api/v1/stats`,
method: 'GET',
signal,
});
};
export const getGetStatsQueryKey = () => {
return [`/api/v1/stats`] as const;
};
export const getGetStatsQueryOptions = <
TData = Awaited<ReturnType<typeof getStats>>,
TError = ErrorType<RenderErrorResponseDTO>,
>(options?: {
query?: UseQueryOptions<Awaited<ReturnType<typeof getStats>>, TError, TData>;
}) => {
const { query: queryOptions } = options ?? {};
const queryKey = queryOptions?.queryKey ?? getGetStatsQueryKey();
const queryFn: QueryFunction<Awaited<ReturnType<typeof getStats>>> = ({
signal,
}) => getStats(signal);
return { queryKey, queryFn, ...queryOptions } as UseQueryOptions<
Awaited<ReturnType<typeof getStats>>,
TError,
TData
> & { queryKey: QueryKey };
};
export type GetStatsQueryResult = NonNullable<
Awaited<ReturnType<typeof getStats>>
>;
export type GetStatsQueryError = ErrorType<RenderErrorResponseDTO>;
/**
* @summary Get stats
*/
export function useGetStats<
TData = Awaited<ReturnType<typeof getStats>>,
TError = ErrorType<RenderErrorResponseDTO>,
>(options?: {
query?: UseQueryOptions<Awaited<ReturnType<typeof getStats>>, TError, TData>;
}): UseQueryResult<TData, TError> & { queryKey: QueryKey } {
const queryOptions = getGetStatsQueryOptions(options);
const query = useQuery(queryOptions) as UseQueryResult<TData, TError> & {
queryKey: QueryKey;
};
return { ...query, queryKey: queryOptions.queryKey };
}
/**
* @summary Get stats
*/
export const invalidateGetStats = async (
queryClient: QueryClient,
options?: InvalidateOptions,
): Promise<QueryClient> => {
await queryClient.invalidateQueries(
{ queryKey: getGetStatsQueryKey() },
options,
);
return queryClient;
};

View File

@@ -31,6 +31,7 @@ import (
"github.com/SigNoz/signoz/pkg/modules/user"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/ruler"
"github.com/SigNoz/signoz/pkg/statsreporter"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/zeus"
@@ -70,6 +71,7 @@ type provider struct {
traceDetailHandler tracedetail.Handler
rulerHandler ruler.Handler
llmPricingRuleHandler llmpricingrule.Handler
statsHandler statsreporter.Handler
}
func NewFactory(
@@ -102,6 +104,7 @@ func NewFactory(
llmPricingRuleHandler llmpricingrule.Handler,
traceDetailHandler tracedetail.Handler,
rulerHandler ruler.Handler,
statsHandler statsreporter.Handler,
) factory.ProviderFactory[apiserver.APIServer, apiserver.Config] {
return factory.NewProviderFactory(factory.MustNewName("signoz"), func(ctx context.Context, providerSettings factory.ProviderSettings, config apiserver.Config) (apiserver.APIServer, error) {
return newProvider(
@@ -137,6 +140,7 @@ func NewFactory(
llmPricingRuleHandler,
traceDetailHandler,
rulerHandler,
statsHandler,
)
})
}
@@ -174,6 +178,7 @@ func newProvider(
llmPricingRuleHandler llmpricingrule.Handler,
traceDetailHandler tracedetail.Handler,
rulerHandler ruler.Handler,
statsHandler statsreporter.Handler,
) (apiserver.APIServer, error) {
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/apiserver/signozapiserver")
router := mux.NewRouter().UseEncodedPath()
@@ -210,6 +215,7 @@ func newProvider(
traceDetailHandler: traceDetailHandler,
rulerHandler: rulerHandler,
llmPricingRuleHandler: llmPricingRuleHandler,
statsHandler: statsHandler,
}
provider.authzMiddleware = middleware.NewAuthZ(settings.Logger(), orgGetter, authzService)
@@ -334,6 +340,10 @@ func (provider *provider) AddToRouter(router *mux.Router) error {
return err
}
if err := provider.addStatsReporterRoutes(router); err != nil {
return err
}
return nil
}

View File

@@ -51,26 +51,6 @@ func (provider *provider) addSpanMapperRoutes(router *mux.Router) error {
return err
}
if err := router.Handle("/api/v1/span_mapper_groups/preview", handler.New(
provider.authzMiddleware.ViewAccess(provider.spanMapperHandler.PreviewMapping),
handler.OpenAPIDef{
ID: "PreviewSpanMapping",
Tags: []string{"spanmapper"},
Summary: "Preview span attribute mapping against a sample span",
Description: "Previews how the org's saved attribute mappings would transform a sample span.",
Request: new(spantypes.SpanMappingPreviewRequest),
RequestContentType: "application/json",
Response: new(spantypes.SpanMappingPreviewResponse),
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusOK,
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound},
Deprecated: false,
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
},
)).Methods(http.MethodPost).GetError(); err != nil {
return err
}
if err := router.Handle("/api/v1/span_mapper_groups/{groupId}", handler.New(
provider.authzMiddleware.AdminAccess(provider.spanMapperHandler.UpdateGroup),
handler.OpenAPIDef{

View File

@@ -0,0 +1,33 @@
package signozapiserver
import (
"net/http"
"github.com/SigNoz/signoz/pkg/http/handler"
"github.com/SigNoz/signoz/pkg/types"
"github.com/gorilla/mux"
)
func (provider *provider) addStatsReporterRoutes(router *mux.Router) error {
if err := router.Handle("/api/v1/stats", handler.New(
provider.authzMiddleware.ViewAccess(provider.statsHandler.Get),
handler.OpenAPIDef{
ID: "GetStats",
Tags: []string{"stats"},
Summary: "Get stats",
Description: "This endpoint returns the collected stats for the organization",
Request: nil,
RequestContentType: "",
Response: map[string]any{},
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusOK,
ErrorStatusCodes: []int{},
Deprecated: false,
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
},
)).Methods(http.MethodGet).GetError(); err != nil {
return err
}
return nil
}

View File

@@ -273,35 +273,6 @@ func (h *handler) DeleteMapper(rw http.ResponseWriter, r *http.Request) {
render.Success(rw, http.StatusNoContent, nil)
}
// PreviewMapping handles POST /api/v1/span_mapper_groups/preview.
// used to get preview of attributes after remapping.
func (h *handler) PreviewMapping(rw http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()
claims, err := authtypes.ClaimsFromContext(ctx)
if err != nil {
render.Error(rw, err)
return
}
orgID := valuer.MustNewUUID(claims.OrgID)
req := new(spantypes.SpanMappingPreviewRequest)
if err := binding.JSON.BindBody(r.Body, req); err != nil {
render.Error(rw, err)
return
}
result, err := h.module.PreviewMapping(ctx, orgID, req)
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusOK, result)
}
// groupIDFromPath extracts and validates the {id} or {groupId} path variable.
func groupIDFromPath(r *http.Request) (valuer.UUID, error) {
vars := mux.Vars(r)

View File

@@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/modules/spanmapper"
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
@@ -103,64 +102,6 @@ func (module *module) DeleteMapper(ctx context.Context, orgID, groupID, id value
return nil
}
// PreviewMapping resolves the org's saved mappings for a sample input
// and returns the transformed result.
func (module *module) PreviewMapping(ctx context.Context, orgID valuer.UUID, req *spantypes.SpanMappingPreviewRequest) (*spantypes.SpanMappingPreviewResponse, error) {
groups, err := module.resolvePreviewGroups(ctx, orgID, req)
if err != nil {
return nil, err
}
hasAttrs := req.Attributes != nil
hasOTLP := len(req.OtlpTraces) > 0
if hasAttrs == hasOTLP {
return nil, errors.New(errors.TypeInvalidInput, spantypes.ErrCodeMappingInvalidInput, "exactly one of 'attributes' or 'otlpTraces' must be provided")
}
if hasAttrs {
outResource, outSpan := spantypes.SimulateSpanMapping(groups, req.Attributes.ResourceAttributes, req.Attributes.SpanAttributes)
return &spantypes.SpanMappingPreviewResponse{
Attributes: &spantypes.SpanMappingPreviewSpan{ResourceAttributes: outResource, SpanAttributes: outSpan},
}, nil
}
in, err := json.Marshal(req.OtlpTraces)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInvalidInput, spantypes.ErrCodeMappingInvalidInput, "could not serialize otlpTraces payload")
}
out, err := spantypes.SimulateSpanMappingOTLP(groups, in)
if err != nil {
return nil, err
}
var transformed map[string]any
if err := json.Unmarshal(out, &transformed); err != nil {
return nil, errors.WrapInternalf(err, spantypes.ErrCodeMappingPreviewFailed, "could not deserialize transformed traces")
}
return &spantypes.SpanMappingPreviewResponse{OtlpTraces: transformed}, nil
}
// resolvePreviewGroups resolves the config to preview against a specific saved
// group when GroupID is set, otherwise all the org's enabled saved mappings.
func (module *module) resolvePreviewGroups(ctx context.Context, orgID valuer.UUID, req *spantypes.SpanMappingPreviewRequest) ([]*spantypes.SpanMapperGroupWithMappers, error) {
if req.GroupID != nil && *req.GroupID != "" {
id, err := valuer.NewUUID(*req.GroupID)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInvalidInput, spantypes.ErrCodeMappingInvalidInput, "group id is not a valid uuid")
}
group, err := module.store.GetGroup(ctx, orgID, id)
if err != nil {
return nil, err
}
mappers, err := module.store.ListMappers(ctx, orgID, id)
if err != nil {
return nil, err
}
return []*spantypes.SpanMapperGroupWithMappers{{Group: group, Mappers: mappers}}, nil
}
return module.listEnabledGroupsWithMappers(ctx, orgID)
}
func (module *module) AgentFeatureType() agentConf.AgentFeatureType {
return spantypes.SpanAttrMappingFeatureType
}

View File

@@ -27,7 +27,6 @@ type Module interface {
CreateMapper(ctx context.Context, orgID, groupID valuer.UUID, mapper *spantypes.SpanMapper) error
UpdateMapper(ctx context.Context, orgID, groupID, id valuer.UUID, fieldContext spantypes.FieldContext, config *spantypes.SpanMapperConfig, enabled *bool, updatedBy string) error
DeleteMapper(ctx context.Context, orgID, groupID, id valuer.UUID) error
PreviewMapping(ctx context.Context, orgID valuer.UUID, req *spantypes.SpanMappingPreviewRequest) (*spantypes.SpanMappingPreviewResponse, error)
}
// Handler defines the HTTP handler interface for mapping group and mapper endpoints.
@@ -43,5 +42,4 @@ type Handler interface {
CreateMapper(rw http.ResponseWriter, r *http.Request)
UpdateMapper(rw http.ResponseWriter, r *http.Request)
DeleteMapper(rw http.ResponseWriter, r *http.Request)
PreviewMapping(rw http.ResponseWriter, r *http.Request)
}

78
pkg/querier/collect.go Normal file
View File

@@ -0,0 +1,78 @@
package querier
import (
"context"
"fmt"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/telemetrylogs"
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
"github.com/SigNoz/signoz/pkg/telemetrytraces"
"github.com/SigNoz/signoz/pkg/valuer"
)
// Collect implements statsreporter.StatsCollector. It returns telemetry counts and last-observed
// times for traces, logs and metrics. orgID is unused because the underlying tables are
// cluster-global. Collection is best-effort: a failing query is logged and its key omitted rather
// than failing the whole collection.
func (q *querier) Collect(ctx context.Context, _ valuer.UUID) (map[string]any, error) {
stats := make(map[string]any)
tracesTable := fmt.Sprintf("%s.%s", telemetrytraces.DBName, telemetrytraces.SpanIndexV3TableName)
logsTable := fmt.Sprintf("%s.%s", telemetrylogs.DBName, telemetrylogs.LogsV2TableName)
metricsTable := fmt.Sprintf("%s.%s", telemetrymetrics.DBName, telemetrymetrics.SamplesV4TableName)
var traces uint64
if err := q.telemetryStore.ClickhouseDB().QueryRow(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", tracesTable)).Scan(&traces); err == nil {
stats["telemetry.traces.count"] = traces
} else {
q.logger.DebugContext(ctx, "failed to collect traces count", errors.Attr(err))
}
var logs uint64
if err := q.telemetryStore.ClickhouseDB().QueryRow(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", logsTable)).Scan(&logs); err == nil {
stats["telemetry.logs.count"] = logs
} else {
q.logger.DebugContext(ctx, "failed to collect logs count", errors.Attr(err))
}
var metrics uint64
if err := q.telemetryStore.ClickhouseDB().QueryRow(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", metricsTable)).Scan(&metrics); err == nil {
stats["telemetry.metrics.count"] = metrics
} else {
q.logger.DebugContext(ctx, "failed to collect metrics count", errors.Attr(err))
}
var tracesLastSeenAt time.Time
if err := q.telemetryStore.ClickhouseDB().QueryRow(ctx, fmt.Sprintf("SELECT max(timestamp) FROM %s", tracesTable)).Scan(&tracesLastSeenAt); err == nil {
if tracesLastSeenAt.Unix() != 0 {
stats["telemetry.traces.last_observed.time"] = tracesLastSeenAt.UTC()
stats["telemetry.traces.last_observed.time_unix"] = tracesLastSeenAt.Unix()
}
} else {
q.logger.DebugContext(ctx, "failed to collect traces last observed", errors.Attr(err))
}
var logsLastSeenAt time.Time
if err := q.telemetryStore.ClickhouseDB().QueryRow(ctx, fmt.Sprintf("SELECT fromUnixTimestamp64Nano(max(timestamp)) FROM %s", logsTable)).Scan(&logsLastSeenAt); err == nil {
if logsLastSeenAt.Unix() != 0 {
stats["telemetry.logs.last_observed.time"] = logsLastSeenAt.UTC()
stats["telemetry.logs.last_observed.time_unix"] = logsLastSeenAt.Unix()
}
} else {
q.logger.DebugContext(ctx, "failed to collect logs last observed", errors.Attr(err))
}
var metricsLastSeenAt time.Time
if err := q.telemetryStore.ClickhouseDB().QueryRow(ctx, fmt.Sprintf("SELECT toDateTime(max(unix_milli) / 1000) FROM %s", metricsTable)).Scan(&metricsLastSeenAt); err == nil {
if metricsLastSeenAt.Unix() != 0 {
stats["telemetry.metrics.last_observed.time"] = metricsLastSeenAt.UTC()
stats["telemetry.metrics.last_observed.time_unix"] = metricsLastSeenAt.Unix()
}
} else {
q.logger.DebugContext(ctx, "failed to collect metrics last observed", errors.Attr(err))
}
return stats, nil
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"net/http"
"github.com/SigNoz/signoz/pkg/statsreporter"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/valuer"
)
@@ -12,6 +13,7 @@ import (
type Querier interface {
QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtypes.QueryRangeRequest) (*qbtypes.QueryRangeResponse, error)
QueryRawStream(ctx context.Context, orgID valuer.UUID, req *qbtypes.QueryRangeRequest, client *qbtypes.RawStream)
statsreporter.StatsCollector
}
// BucketCache is the interface for bucket-based caching.

View File

@@ -49,6 +49,7 @@ import (
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/ruler"
"github.com/SigNoz/signoz/pkg/ruler/signozruler"
"github.com/SigNoz/signoz/pkg/statsreporter"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/zeus"
)
@@ -80,6 +81,7 @@ type Handlers struct {
TraceDetail tracedetail.Handler
RulerHandler ruler.Handler
LLMPricingRuleHandler llmpricingrule.Handler
StatsHandler statsreporter.Handler
}
func NewHandlers(
@@ -97,6 +99,7 @@ func NewHandlers(
registryHandler factory.Handler,
alertmanagerService alertmanager.Alertmanager,
rulerService ruler.Ruler,
statsAggregator statsreporter.Aggregator,
) Handlers {
return Handlers{
SavedView: implsavedview.NewHandler(modules.SavedView),
@@ -125,5 +128,6 @@ func NewHandlers(
TraceDetail: impltracedetail.NewHandler(modules.TraceDetail),
RulerHandler: signozruler.NewHandler(rulerService),
LLMPricingRuleHandler: impllmpricingrule.NewHandler(modules.LLMPricingRule),
StatsHandler: statsreporter.NewHandler(statsAggregator),
}
}

View File

@@ -63,7 +63,7 @@ func TestNewHandlers(t *testing.T) {
querierHandler := querier.NewHandler(providerSettings, nil, nil)
registryHandler := factory.NewHandler(nil)
handlers := NewHandlers(modules, providerSettings, nil, querierHandler, nil, nil, nil, nil, nil, nil, nil, registryHandler, alertmanager, nil)
handlers := NewHandlers(modules, providerSettings, nil, querierHandler, nil, nil, nil, nil, nil, nil, nil, registryHandler, alertmanager, nil, nil)
reflectVal := reflect.ValueOf(handlers)
for i := 0; i < reflectVal.NumField(); i++ {
f := reflectVal.Field(i)

View File

@@ -36,6 +36,7 @@ import (
"github.com/SigNoz/signoz/pkg/modules/user"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/ruler"
"github.com/SigNoz/signoz/pkg/statsreporter"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/zeus"
"github.com/swaggest/jsonschema-go"
@@ -83,6 +84,7 @@ func NewOpenAPI(ctx context.Context, instrumentation instrumentation.Instrumenta
struct{ llmpricingrule.Handler }{},
struct{ tracedetail.Handler }{},
struct{ ruler.Handler }{},
struct{ statsreporter.Handler }{},
).New(ctx, instrumentation.ToProviderSettings(), apiserver.Config{})
if err != nil {
return nil, err

View File

@@ -264,9 +264,9 @@ func NewSharderProviderFactories() factory.NamedMap[factory.ProviderFactory[shar
)
}
func NewStatsReporterProviderFactories(telemetryStore telemetrystore.TelemetryStore, collectors []statsreporter.StatsCollector, orgGetter organization.Getter, userGetter user.Getter, tokenizer tokenizer.Tokenizer, build version.Build, analyticsConfig analytics.Config) factory.NamedMap[factory.ProviderFactory[statsreporter.StatsReporter, statsreporter.Config]] {
func NewStatsReporterProviderFactories(aggregator statsreporter.Aggregator, orgGetter organization.Getter, userGetter user.Getter, tokenizer tokenizer.Tokenizer, build version.Build, analyticsConfig analytics.Config) factory.NamedMap[factory.ProviderFactory[statsreporter.StatsReporter, statsreporter.Config]] {
return factory.MustNewNamedMap(
analyticsstatsreporter.NewFactory(telemetryStore, collectors, orgGetter, userGetter, tokenizer, build, analyticsConfig),
analyticsstatsreporter.NewFactory(aggregator, orgGetter, userGetter, tokenizer, build, analyticsConfig),
noopstatsreporter.NewFactory(),
)
}
@@ -309,6 +309,7 @@ func NewAPIServerProviderFactories(orgGetter organization.Getter, authz authz.Au
handlers.LLMPricingRuleHandler,
handlers.TraceDetail,
handlers.RulerHandler,
handlers.StatsHandler,
),
)
}

View File

@@ -85,8 +85,8 @@ func TestNewProviderFactories(t *testing.T) {
userGetter := impluser.NewGetter(impluser.NewStore(sqlstoretest.New(sqlstore.Config{Provider: "sqlite"}, sqlmock.QueryMatcherEqual), instrumentationtest.New().ToProviderSettings()), userRoleStore, flagger)
orgGetter := implorganization.NewGetter(implorganization.NewStore(sqlstoretest.New(sqlstore.Config{Provider: "sqlite"}, sqlmock.QueryMatcherEqual)), nil)
telemetryStore := telemetrystoretest.New(telemetrystore.Config{Provider: "clickhouse"}, sqlmock.QueryMatcherEqual)
NewStatsReporterProviderFactories(telemetryStore, []statsreporter.StatsCollector{}, orgGetter, userGetter, tokenizertest.NewMockTokenizer(t), version.Build{}, analytics.Config{Enabled: true})
statsAggregator := statsreporter.NewAggregator(providerSettings, []statsreporter.StatsCollector{})
NewStatsReporterProviderFactories(statsAggregator, orgGetter, userGetter, tokenizertest.NewMockTokenizer(t), version.Build{}, analytics.Config{Enabled: true})
})
assert.NotPanics(t, func() {

View File

@@ -499,14 +499,18 @@ func New(
serviceAccount,
cloudIntegrationModule,
modules.LogsPipeline,
querier,
}
// Initialize the stats aggregator (always-on, independent of whether reporting is enabled)
statsAggregator := statsreporter.NewAggregator(providerSettings, statsCollectors)
// Initialize stats reporter from the available stats reporter provider factories
statsReporter, err := factory.NewProviderFromNamedMap(
ctx,
providerSettings,
config.StatsReporter,
NewStatsReporterProviderFactories(telemetrystore, statsCollectors, orgGetter, userGetter, tokenizer, version.Info, config.Analytics),
NewStatsReporterProviderFactories(statsAggregator, orgGetter, userGetter, tokenizer, version.Info, config.Analytics),
config.StatsReporter.Provider(),
)
if err != nil {
@@ -535,7 +539,7 @@ func New(
// Initialize all handlers for the modules
registryHandler := factory.NewHandler(registry)
handlers := NewHandlers(modules, providerSettings, analytics, querierHandler, licensing, global, flagger, gateway, telemetryMetadataStore, authz, zeus, registryHandler, alertmanager, rulerInstance)
handlers := NewHandlers(modules, providerSettings, analytics, querierHandler, licensing, global, flagger, gateway, telemetryMetadataStore, authz, zeus, registryHandler, alertmanager, rulerInstance, statsAggregator)
// Initialize the API server (after registry so it can access service health)
apiserverInstance, err := factory.NewProviderFromNamedMap(

View File

@@ -0,0 +1,67 @@
package statsreporter
import (
"context"
"sync"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/types/ctxtypes"
"github.com/SigNoz/signoz/pkg/types/instrumentationtypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
// Aggregator aggregates stats from every registered StatsCollector for a single organization.
type Aggregator interface {
Aggregate(ctx context.Context, orgID valuer.UUID) (map[string]any, error)
}
type aggregator struct {
// settings
settings factory.ScopedProviderSettings
// a list of collectors, used to collect stats from across the codebase
collectors []StatsCollector
}
func NewAggregator(providerSettings factory.ProviderSettings, collectors []StatsCollector) Aggregator {
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/statsreporter")
return &aggregator{
settings: settings,
collectors: collectors,
}
}
func (aggregator *aggregator) Aggregate(ctx context.Context, orgID valuer.UUID) (map[string]any, error) {
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
instrumentationtypes.CodeNamespace: "statsreporter",
instrumentationtypes.CodeFunctionName: "Aggregate",
})
var wg sync.WaitGroup
wg.Add(len(aggregator.collectors))
stats := make(map[string]any, 0)
mtx := sync.Mutex{}
for _, collector := range aggregator.collectors {
go func(collector StatsCollector) {
defer wg.Done()
collectorStats, err := collector.Collect(ctx, orgID)
if err != nil {
aggregator.settings.Logger().ErrorContext(ctx, "failed to collect stats", errors.Attr(err))
return
}
mtx.Lock()
for k, v := range collectorStats {
stats[k] = v
}
mtx.Unlock()
}(collector)
}
wg.Wait()
return stats, nil
}

View File

@@ -3,7 +3,6 @@ package analyticsstatsreporter
import (
"context"
"log/slog"
"sync"
"time"
"go.opentelemetry.io/otel/attribute"
@@ -16,11 +15,8 @@ import (
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/user"
"github.com/SigNoz/signoz/pkg/statsreporter"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/tokenizer"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/ctxtypes"
"github.com/SigNoz/signoz/pkg/types/instrumentationtypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/SigNoz/signoz/pkg/version"
)
@@ -32,11 +28,8 @@ type provider struct {
// config
config statsreporter.Config
// used to get telemetry details. srikanthcvv to move this to the querier layer
telemetryStore telemetrystore.TelemetryStore
// a list of collectors, used to collect stats from across the codebase
collectors []statsreporter.StatsCollector
// used to aggregate stats for an organization
aggregator statsreporter.Aggregator
// used to get organizations
orgGetter organization.Getter
@@ -60,9 +53,9 @@ type provider struct {
stopC chan struct{}
}
func NewFactory(telemetryStore telemetrystore.TelemetryStore, collectors []statsreporter.StatsCollector, orgGetter organization.Getter, userGetter user.Getter, tokenizer tokenizer.Tokenizer, build version.Build, analyticsConfig analytics.Config) factory.ProviderFactory[statsreporter.StatsReporter, statsreporter.Config] {
func NewFactory(aggregator statsreporter.Aggregator, orgGetter organization.Getter, userGetter user.Getter, tokenizer tokenizer.Tokenizer, build version.Build, analyticsConfig analytics.Config) factory.ProviderFactory[statsreporter.StatsReporter, statsreporter.Config] {
return factory.NewProviderFactory(factory.MustNewName("analytics"), func(ctx context.Context, settings factory.ProviderSettings, config statsreporter.Config) (statsreporter.StatsReporter, error) {
return New(ctx, settings, config, telemetryStore, collectors, orgGetter, userGetter, tokenizer, build, analyticsConfig)
return New(ctx, settings, config, aggregator, orgGetter, userGetter, tokenizer, build, analyticsConfig)
})
}
@@ -70,8 +63,7 @@ func New(
ctx context.Context,
providerSettings factory.ProviderSettings,
config statsreporter.Config,
telemetryStore telemetrystore.TelemetryStore,
collectors []statsreporter.StatsCollector,
aggregator statsreporter.Aggregator,
orgGetter organization.Getter,
userGetter user.Getter,
tokenizer tokenizer.Tokenizer,
@@ -86,17 +78,16 @@ func New(
}
return &provider{
settings: settings,
config: config,
telemetryStore: telemetryStore,
collectors: collectors,
orgGetter: orgGetter,
userGetter: userGetter,
analytics: analytics,
tokenizer: tokenizer,
build: build,
deployment: deployment,
stopC: make(chan struct{}),
settings: settings,
config: config,
aggregator: aggregator,
orgGetter: orgGetter,
userGetter: userGetter,
analytics: analytics,
tokenizer: tokenizer,
build: build,
deployment: deployment,
stopC: make(chan struct{}),
}, nil
}
@@ -134,7 +125,12 @@ func (provider *provider) Report(ctx context.Context) error {
}
for _, org := range orgs {
stats := provider.collectOrg(ctx, org.ID)
stats, err := provider.aggregator.Aggregate(ctx, org.ID)
if err != nil {
provider.settings.Logger().WarnContext(ctx, "failed to aggregate stats", errors.Attr(err), slog.Any("org_id", org.ID))
continue
}
if len(stats) == 0 {
provider.settings.Logger().WarnContext(ctx, "no stats collected", slog.Any("org_id", org.ID))
continue
@@ -204,75 +200,3 @@ func (provider *provider) Stop(ctx context.Context) error {
return nil
}
func (provider *provider) collectOrg(ctx context.Context, orgID valuer.UUID) map[string]any {
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
instrumentationtypes.CodeNamespace: "statsreporter",
instrumentationtypes.CodeFunctionName: "collectOrg",
})
var wg sync.WaitGroup
wg.Add(len(provider.collectors))
stats := make(map[string]any, 0)
mtx := sync.Mutex{}
for _, collector := range provider.collectors {
go func(collector statsreporter.StatsCollector) {
defer wg.Done()
collectorStats, err := collector.Collect(ctx, orgID)
if err != nil {
provider.settings.Logger().ErrorContext(ctx, "failed to collect stats", errors.Attr(err))
return
}
mtx.Lock()
for k, v := range collectorStats {
stats[k] = v
}
mtx.Unlock()
}(collector)
}
wg.Wait()
var traces uint64
if err := provider.telemetryStore.ClickhouseDB().QueryRow(ctx, "SELECT COUNT(*) FROM signoz_traces.distributed_signoz_index_v3").Scan(&traces); err == nil {
stats["telemetry.traces.count"] = traces
}
var logs uint64
if err := provider.telemetryStore.ClickhouseDB().QueryRow(ctx, "SELECT COUNT(*) FROM signoz_logs.distributed_logs_v2").Scan(&logs); err == nil {
stats["telemetry.logs.count"] = logs
}
var metrics uint64
if err := provider.telemetryStore.ClickhouseDB().QueryRow(ctx, "SELECT COUNT(*) FROM signoz_metrics.distributed_samples_v4").Scan(&metrics); err == nil {
stats["telemetry.metrics.count"] = metrics
}
var tracesLastSeenAt time.Time
if err := provider.telemetryStore.ClickhouseDB().QueryRow(ctx, "SELECT max(timestamp) FROM signoz_traces.distributed_signoz_index_v3").Scan(&tracesLastSeenAt); err == nil {
if tracesLastSeenAt.Unix() != 0 {
stats["telemetry.traces.last_observed.time"] = tracesLastSeenAt.UTC()
stats["telemetry.traces.last_observed.time_unix"] = tracesLastSeenAt.Unix()
}
}
var logsLastSeenAt time.Time
if err := provider.telemetryStore.ClickhouseDB().QueryRow(ctx, "SELECT fromUnixTimestamp64Nano(max(timestamp)) FROM signoz_logs.distributed_logs_v2").Scan(&logsLastSeenAt); err == nil {
if logsLastSeenAt.Unix() != 0 {
stats["telemetry.logs.last_observed.time"] = logsLastSeenAt.UTC()
stats["telemetry.logs.last_observed.time_unix"] = logsLastSeenAt.Unix()
}
}
var metricsLastSeenAt time.Time
if err := provider.telemetryStore.ClickhouseDB().QueryRow(ctx, "SELECT toDateTime(max(unix_milli) / 1000) FROM signoz_metrics.distributed_samples_v4").Scan(&metricsLastSeenAt); err == nil {
if metricsLastSeenAt.Unix() != 0 {
stats["telemetry.metrics.last_observed.time"] = metricsLastSeenAt.UTC()
stats["telemetry.metrics.last_observed.time_unix"] = metricsLastSeenAt.Unix()
}
}
return stats
}

View File

@@ -0,0 +1,50 @@
package statsreporter
import (
"context"
"net/http"
"time"
"github.com/SigNoz/signoz/pkg/http/render"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
type Handler interface {
Get(http.ResponseWriter, *http.Request)
}
type handler struct {
aggregator Aggregator
}
func NewHandler(aggregator Aggregator) Handler {
return &handler{
aggregator: aggregator,
}
}
func (handler *handler) Get(rw http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()
claims, err := authtypes.ClaimsFromContext(ctx)
if err != nil {
render.Error(rw, err)
return
}
orgID, err := valuer.NewUUID(claims.OrgID)
if err != nil {
render.Error(rw, err)
return
}
stats, err := handler.aggregator.Aggregate(ctx, orgID)
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusOK, stats)
}

View File

@@ -1,160 +0,0 @@
package spantypes
import (
"strings"
"github.com/SigNoz/signoz/pkg/errors"
"go.opentelemetry.io/collector/pdata/ptrace"
)
var resourceSourcePrefix = FieldContextResource.StringValue() + "."
var ErrCodeMappingPreviewFailed = errors.MustNewCode("span_attribute_mapping_preview_failed")
type SpanMappingPreviewSpan struct {
ResourceAttributes map[string]any `json:"resourceAttributes" nullable:"true"`
SpanAttributes map[string]any `json:"spanAttributes" nullable:"true"`
}
type SpanMappingPreviewRequest struct {
Attributes *SpanMappingPreviewSpan `json:"attributes" nullable:"true"`
OtlpTraces map[string]any `json:"otlpTraces" nullable:"true"`
GroupID *string `json:"groupId" nullable:"true"`
}
type SpanMappingPreviewResponse struct {
Attributes *SpanMappingPreviewSpan `json:"attributes,omitempty" nullable:"true"`
OtlpTraces map[string]any `json:"otlpTraces,omitempty" nullable:"true"`
}
func SimulateSpanMapping(groups []*SpanMapperGroupWithMappers, resourceAttrs, spanAttrs map[string]any) (outResource, outSpan map[string]any) {
cfg := buildProcessorConfig(filterEnabledGroupsWithMappers(groups))
outResource = cloneAttrs(resourceAttrs)
outSpan = cloneAttrs(spanAttrs)
applyEnabledGroups(cfg, outSpan, outResource)
return outResource, outSpan
}
func SimulateSpanMappingOTLP(groups []*SpanMapperGroupWithMappers, otlp []byte) ([]byte, error) {
td, err := (&ptrace.JSONUnmarshaler{}).UnmarshalTraces(otlp)
if err != nil {
return nil, errors.WrapInvalidInputf(err, ErrCodeMappingInvalidInput, "invalid OTLP traces payload")
}
cfg := buildProcessorConfig(filterEnabledGroupsWithMappers(groups))
rss := td.ResourceSpans()
for i := 0; i < rss.Len(); i++ {
rs := rss.At(i)
resourceAttrs := rs.Resource().Attributes().AsRaw()
scopeSpans := rs.ScopeSpans()
for j := 0; j < scopeSpans.Len(); j++ {
spans := scopeSpans.At(j).Spans()
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
spanAttrs := span.Attributes().AsRaw()
applyEnabledGroups(cfg, spanAttrs, resourceAttrs)
if err := span.Attributes().FromRaw(spanAttrs); err != nil {
return nil, errors.WrapInternalf(err, ErrCodeMappingPreviewFailed, "could not write transformed span attributes")
}
}
}
if err := rs.Resource().Attributes().FromRaw(resourceAttrs); err != nil {
return nil, errors.WrapInternalf(err, ErrCodeMappingPreviewFailed, "could not write transformed resource attributes")
}
}
out, err := (&ptrace.JSONMarshaler{}).MarshalTraces(td)
if err != nil {
return nil, errors.WrapInternalf(err, ErrCodeMappingPreviewFailed, "could not marshal transformed traces")
}
return out, nil
}
// TODO(spanmapper-preview): the apply logic below is a temporary copy of the signozspanmapper processor — remove it and call the real processor once signoz-otel-collector#796 is merged.
func applyEnabledGroups(cfg *spanMapperProcessorConfig, spanAttrs, resourceAttrs map[string]any) {
for i := range cfg.Groups {
g := &cfg.Groups[i]
if !spanMapperConditionMet(g.ExistsAny, spanAttrs, resourceAttrs) {
continue
}
for j := range g.Attributes {
applySpanMapperRule(&g.Attributes[j], spanAttrs, resourceAttrs)
}
}
}
// filterEnabledGroupsWithMappers keeps only enabled groups and their enabled
// mappers, dropping groups left with no enabled mappers.
func filterEnabledGroupsWithMappers(groups []*SpanMapperGroupWithMappers) []*SpanMapperGroupWithMappers {
out := make([]*SpanMapperGroupWithMappers, 0, len(groups))
for _, gm := range groups {
if gm == nil || gm.Group == nil || !gm.Group.Enabled {
continue
}
enabled := make([]*SpanMapper, 0, len(gm.Mappers))
for _, m := range gm.Mappers {
if m != nil && m.Enabled {
enabled = append(enabled, m)
}
}
if len(enabled) > 0 {
out = append(out, &SpanMapperGroupWithMappers{Group: gm.Group, Mappers: enabled})
}
}
return out
}
func spanMapperConditionMet(cond spanMapperProcessorExistsAny, spanAttrs, resourceAttrs map[string]any) bool {
return anyKeyContains(spanAttrs, cond.Attributes) || anyKeyContains(resourceAttrs, cond.Resource)
}
func anyKeyContains(attrs map[string]any, patterns []string) bool {
for k := range attrs {
for _, p := range patterns {
if strings.Contains(k, p) {
return true
}
}
}
return false
}
func applySpanMapperRule(rule *spanMapperProcessorAttribute, spanAttrs, resourceAttrs map[string]any) {
dst := spanAttrs
if rule.Context == FieldContextResource.StringValue() {
dst = resourceAttrs
}
for i := range rule.Sources {
src := &rule.Sources[i]
bare, isResource := strings.CutPrefix(src.Key, resourceSourcePrefix)
from := spanAttrs
if isResource {
from = resourceAttrs
}
val, ok := from[bare]
if !ok {
continue
}
dst[rule.Target] = val
if src.Action == SpanMapperOperationMove.StringValue() {
delete(from, bare)
}
return
}
}
func cloneAttrs(in map[string]any) map[string]any {
out := make(map[string]any, len(in))
for k, v := range in {
out[k] = v
}
return out
}

View File

@@ -1,192 +0,0 @@
package spantypes
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/ptrace"
)
func simGroup(name string, attrCond, resCond []string, mappers ...*SpanMapper) *SpanMapperGroupWithMappers {
return &SpanMapperGroupWithMappers{
Group: &SpanMapperGroup{
Name: name,
Condition: SpanMapperGroupCondition{Attributes: attrCond, Resource: resCond},
Enabled: true,
},
Mappers: mappers,
}
}
func simMapper(target string, ctx FieldContext, sources ...SpanMapperSource) *SpanMapper {
return &SpanMapper{
Name: target,
FieldContext: ctx,
Config: SpanMapperConfig{Sources: sources},
Enabled: true,
}
}
func simAttrSrc(key string, op SpanMapperOperation, priority int) SpanMapperSource {
return SpanMapperSource{Key: key, Context: FieldContextSpanAttribute, Operation: op, Priority: priority}
}
func simResSrc(key string, op SpanMapperOperation, priority int) SpanMapperSource {
return SpanMapperSource{Key: key, Context: FieldContextResource, Operation: op, Priority: priority}
}
func TestSimulate_MatchInSpanAttrs(t *testing.T) {
groups := []*SpanMapperGroupWithMappers{
simGroup("llm", []string{"model"}, nil,
simMapper("gen_ai.request.model", FieldContextSpanAttribute,
simAttrSrc("llm.model", SpanMapperOperationCopy, 1)),
),
}
_, outSpan := SimulateSpanMapping(groups, nil, map[string]any{"llm.model": "gpt-4", "gen_ai.llm.model": "gpt-40"})
assert.Equal(t, "gpt-4", outSpan["gen_ai.request.model"])
}
func TestSimulate_MatchInResourceAttrs(t *testing.T) {
groups := []*SpanMapperGroupWithMappers{
simGroup("llm", nil, []string{"service.name"},
simMapper("gen_ai.request.model", FieldContextSpanAttribute,
simResSrc("service.name", SpanMapperOperationCopy, 1)),
),
}
_, outSpan := SimulateSpanMapping(groups, map[string]any{"service.name": "my-llm-service"}, nil)
assert.Equal(t, "my-llm-service", outSpan["gen_ai.request.model"])
}
func TestSimulate_NoMatchSkipsGroup(t *testing.T) {
groups := []*SpanMapperGroupWithMappers{
simGroup("llm", []string{"model"}, nil,
simMapper("gen_ai.request.model", FieldContextSpanAttribute,
simAttrSrc("llm.model", SpanMapperOperationCopy, 1)),
),
}
_, outSpan := SimulateSpanMapping(groups, nil, map[string]any{"some.other.key": "value"})
_, ok := outSpan["gen_ai.request.model"]
assert.False(t, ok, "target must not be set when condition is not met")
}
func TestSimulate_SourceFirstMatchWins(t *testing.T) {
groups := []*SpanMapperGroupWithMappers{
simGroup("tokens", []string{"llm"}, nil,
simMapper("gen_ai.request.tokens", FieldContextSpanAttribute,
simAttrSrc("gen_ai.request_tokens", SpanMapperOperationCopy, 2),
simAttrSrc("llm.tokens", SpanMapperOperationCopy, 1)),
),
}
_, outSpan := SimulateSpanMapping(groups, nil, map[string]any{"gen_ai.request_tokens": "100", "llm.tokens": "200"})
assert.Equal(t, "100", outSpan["gen_ai.request.tokens"])
}
func TestSimulate_SourceFallsBackToSecond(t *testing.T) {
groups := []*SpanMapperGroupWithMappers{
simGroup("tokens", []string{"llm"}, nil,
simMapper("gen_ai.request.tokens", FieldContextSpanAttribute,
simAttrSrc("gen_ai.request_tokens", SpanMapperOperationCopy, 2),
simAttrSrc("llm.tokens", SpanMapperOperationCopy, 1)),
),
}
_, outSpan := SimulateSpanMapping(groups, nil, map[string]any{"llm.tokens": "200"})
assert.Equal(t, "200", outSpan["gen_ai.request.tokens"])
}
func TestSimulate_ActionMove(t *testing.T) {
groups := []*SpanMapperGroupWithMappers{
simGroup("input", []string{"gen_ai"}, nil,
simMapper("gen_ai.request.input", FieldContextSpanAttribute,
simAttrSrc("gen_ai.input", SpanMapperOperationMove, 1)),
),
}
_, outSpan := SimulateSpanMapping(groups, nil, map[string]any{"gen_ai.input": "hello"})
assert.Equal(t, "hello", outSpan["gen_ai.request.input"])
_, srcPresent := outSpan["gen_ai.input"]
assert.False(t, srcPresent, "source key must be deleted when action=move")
}
func TestSimulate_WriteToResourceContext(t *testing.T) {
groups := []*SpanMapperGroupWithMappers{
simGroup("llm", []string{"llm"}, nil,
simMapper("gen_ai.request.model", FieldContextResource,
simAttrSrc("llm.model", SpanMapperOperationCopy, 1)),
),
}
outResource, outSpan := SimulateSpanMapping(groups, nil, map[string]any{"llm.model": "gpt-4"})
assert.Equal(t, "gpt-4", outResource["gen_ai.request.model"], "target must be written to resource attributes")
_, inSpan := outSpan["gen_ai.request.model"]
assert.False(t, inSpan)
}
func TestSimulate_DisabledGroupsAndMappersSkipped(t *testing.T) {
disabledGroup := simGroup("g1", []string{"llm"}, nil,
simMapper("gen_ai.request.model", FieldContextSpanAttribute,
simAttrSrc("llm.model", SpanMapperOperationCopy, 1)))
disabledGroup.Group.Enabled = false
_, outSpan := SimulateSpanMapping([]*SpanMapperGroupWithMappers{disabledGroup}, nil, map[string]any{"llm.model": "gpt-4"})
_, ok := outSpan["gen_ai.request.model"]
assert.False(t, ok, "disabled groups must not be evaluated")
}
func TestSimulate_DoesNotMutateInput(t *testing.T) {
input := map[string]any{"gen_ai.input": "hi"}
groups := []*SpanMapperGroupWithMappers{
simGroup("input", []string{"gen_ai"}, nil,
simMapper("gen_ai.request.input", FieldContextSpanAttribute,
simAttrSrc("gen_ai.input", SpanMapperOperationMove, 1))),
}
_, _ = SimulateSpanMapping(groups, nil, input)
// Original input map must be untouched (move would have deleted the key).
_, ok := input["gen_ai.input"]
assert.True(t, ok, "input map must not be mutated by the preview")
}
func TestSimulateOTLP_TransformsSpan(t *testing.T) {
groups := []*SpanMapperGroupWithMappers{
simGroup("llm", []string{"model"}, nil,
simMapper("gen_ai.request.model", FieldContextSpanAttribute,
simAttrSrc("llm.model", SpanMapperOperationMove, 1)),
),
}
otlp := []byte(`{
"resourceSpans": [{
"resource": {"attributes": [{"key": "service.name", "value": {"stringValue": "checkout"}}]},
"scopeSpans": [{
"spans": [{"attributes": [{"key": "llm.model", "value": {"stringValue": "gpt-4"}}]}]
}]
}]
}`)
out, err := SimulateSpanMappingOTLP(groups, otlp)
require.NoError(t, err)
td, err := (&ptrace.JSONUnmarshaler{}).UnmarshalTraces(out)
require.NoError(t, err)
span := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0)
v, ok := span.Attributes().Get("gen_ai.request.model")
require.True(t, ok)
assert.Equal(t, "gpt-4", v.Str())
// move must have deleted the source.
_, srcPresent := span.Attributes().Get("llm.model")
assert.False(t, srcPresent)
}
func TestSimulateOTLP_Invalid(t *testing.T) {
_, err := SimulateSpanMappingOTLP(nil, []byte(`{ not json`))
assert.Error(t, err)
}