Compare commits

..

1 Commits

Author SHA1 Message Date
nityanandagohain
d1682f2ab6 feat: span mapper test endpoint 2026-06-19 18:31:31 +05:30
23 changed files with 678 additions and 571 deletions

View File

@@ -6992,6 +6992,16 @@ components:
required:
- items
type: object
SpantypesGettableSpanMapperTest:
properties:
spans:
items:
$ref: '#/components/schemas/SpantypesSpanMapperTestSpan'
nullable: true
type: array
required:
- spans
type: object
SpantypesGettableTraceAggregations:
properties:
aggregations:
@@ -7079,6 +7089,39 @@ components:
- name
- condition
type: object
SpantypesPostableSpanMapperTest:
properties:
groups:
items:
$ref: '#/components/schemas/SpantypesPostableSpanMapperTestGroup'
nullable: true
type: array
spans:
items:
$ref: '#/components/schemas/SpantypesSpanMapperTestSpan'
nullable: true
type: array
required:
- spans
- groups
type: object
SpantypesPostableSpanMapperTestGroup:
properties:
condition:
$ref: '#/components/schemas/SpantypesSpanMapperGroupCondition'
enabled:
type: boolean
mappers:
items:
$ref: '#/components/schemas/SpantypesPostableSpanMapper'
nullable: true
type: array
name:
type: string
required:
- name
- condition
type: object
SpantypesPostableTraceAggregations:
properties:
aggregations:
@@ -7240,6 +7283,17 @@ components:
- operation
- priority
type: object
SpantypesSpanMapperTestSpan:
properties:
attributes:
additionalProperties: {}
nullable: true
type: object
resource:
additionalProperties: {}
nullable: true
type: object
type: object
SpantypesUpdatableSpanMapper:
properties:
config:
@@ -12797,6 +12851,69 @@ paths:
summary: Update a span mapper
tags:
- spanmapper
/api/v1/span_mapper_groups/test:
post:
deprecated: false
description: Tests how span mappers would transform sample spans
operationId: TestSpanMappers
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/SpantypesPostableSpanMapperTest'
responses:
"200":
content:
application/json:
schema:
properties:
data:
$ref: '#/components/schemas/SpantypesGettableSpanMapperTest'
status:
type: string
required:
- status
- data
type: object
description: OK
"400":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Bad Request
"401":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unauthorized
"403":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Forbidden
"404":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Not Found
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Internal Server Error
security:
- api_key:
- VIEWER
- tokenizer:
- VIEWER
summary: Test span mappers against sample spans
tags:
- spanmapper
/api/v1/stats:
get:
deprecated: false

View File

@@ -8135,6 +8135,44 @@ export interface SpantypesGettableSpanMapperGroupsDTO {
items: SpantypesSpanMapperGroupDTO[];
}
export type SpantypesSpanMapperTestSpanDTOAttributesAnyOf = {
[key: string]: unknown;
};
/**
* @nullable
*/
export type SpantypesSpanMapperTestSpanDTOAttributes =
SpantypesSpanMapperTestSpanDTOAttributesAnyOf | null;
export type SpantypesSpanMapperTestSpanDTOResourceAnyOf = {
[key: string]: unknown;
};
/**
* @nullable
*/
export type SpantypesSpanMapperTestSpanDTOResource =
SpantypesSpanMapperTestSpanDTOResourceAnyOf | null;
export interface SpantypesSpanMapperTestSpanDTO {
/**
* @type object,null
*/
attributes?: SpantypesSpanMapperTestSpanDTOAttributes;
/**
* @type object,null
*/
resource?: SpantypesSpanMapperTestSpanDTOResource;
}
export interface SpantypesGettableSpanMapperTestDTO {
/**
* @type array,null
*/
spans: SpantypesSpanMapperTestSpanDTO[] | null;
}
export enum SpantypesSpanAggregationTypeDTO {
span_count = 'span_count',
execution_time_percentage = 'execution_time_percentage',
@@ -8430,6 +8468,33 @@ export interface SpantypesPostableSpanMapperGroupDTO {
name: string;
}
export interface SpantypesPostableSpanMapperTestGroupDTO {
condition: SpantypesSpanMapperGroupConditionDTO | null;
/**
* @type boolean
*/
enabled?: boolean;
/**
* @type array,null
*/
mappers?: SpantypesPostableSpanMapperDTO[] | null;
/**
* @type string
*/
name: string;
}
export interface SpantypesPostableSpanMapperTestDTO {
/**
* @type array,null
*/
groups: SpantypesPostableSpanMapperTestGroupDTO[] | null;
/**
* @type array,null
*/
spans: SpantypesSpanMapperTestSpanDTO[] | null;
}
export interface SpantypesSpanAggregationDTO {
aggregation: SpantypesSpanAggregationTypeDTO;
field: TelemetrytypesTelemetryFieldKeyDTO;
@@ -9798,6 +9863,14 @@ export type UpdateSpanMapperPathParameters = {
groupId: string;
mapperId: string;
};
export type TestSpanMappers200 = {
data: SpantypesGettableSpanMapperTestDTO;
/**
* @type string
*/
status: string;
};
export type GetStats200Data = { [key: string]: unknown };
export type GetStats200 = {

View File

@@ -30,8 +30,10 @@ import type {
RenderErrorResponseDTO,
SpantypesPostableSpanMapperDTO,
SpantypesPostableSpanMapperGroupDTO,
SpantypesPostableSpanMapperTestDTO,
SpantypesUpdatableSpanMapperDTO,
SpantypesUpdatableSpanMapperGroupDTO,
TestSpanMappers200,
UpdateSpanMapperGroupPathParameters,
UpdateSpanMapperPathParameters,
} from '../sigNoz.schemas';
@@ -780,3 +782,86 @@ export const useUpdateSpanMapper = <
> => {
return useMutation(getUpdateSpanMapperMutationOptions(options));
};
/**
* Tests how span mappers would transform sample spans
* @summary Test span mappers against sample spans
*/
export const testSpanMappers = (
spantypesPostableSpanMapperTestDTO?: BodyType<SpantypesPostableSpanMapperTestDTO>,
signal?: AbortSignal,
) => {
return GeneratedAPIInstance<TestSpanMappers200>({
url: `/api/v1/span_mapper_groups/test`,
method: 'POST',
headers: { 'Content-Type': 'application/json' },
data: spantypesPostableSpanMapperTestDTO,
signal,
});
};
export const getTestSpanMappersMutationOptions = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof testSpanMappers>>,
TError,
{ data?: BodyType<SpantypesPostableSpanMapperTestDTO> },
TContext
>;
}): UseMutationOptions<
Awaited<ReturnType<typeof testSpanMappers>>,
TError,
{ data?: BodyType<SpantypesPostableSpanMapperTestDTO> },
TContext
> => {
const mutationKey = ['testSpanMappers'];
const { mutation: mutationOptions } = options
? options.mutation &&
'mutationKey' in options.mutation &&
options.mutation.mutationKey
? options
: { ...options, mutation: { ...options.mutation, mutationKey } }
: { mutation: { mutationKey } };
const mutationFn: MutationFunction<
Awaited<ReturnType<typeof testSpanMappers>>,
{ data?: BodyType<SpantypesPostableSpanMapperTestDTO> }
> = (props) => {
const { data } = props ?? {};
return testSpanMappers(data);
};
return { mutationFn, ...mutationOptions };
};
export type TestSpanMappersMutationResult = NonNullable<
Awaited<ReturnType<typeof testSpanMappers>>
>;
export type TestSpanMappersMutationBody =
| BodyType<SpantypesPostableSpanMapperTestDTO>
| undefined;
export type TestSpanMappersMutationError = ErrorType<RenderErrorResponseDTO>;
/**
* @summary Test span mappers against sample spans
*/
export const useTestSpanMappers = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof testSpanMappers>>,
TError,
{ data?: BodyType<SpantypesPostableSpanMapperTestDTO> },
TContext
>;
}): UseMutationResult<
Awaited<ReturnType<typeof testSpanMappers>>,
TError,
{ data?: BodyType<SpantypesPostableSpanMapperTestDTO> },
TContext
> => {
return useMutation(getTestSpanMappersMutationOptions(options));
};

View File

@@ -51,6 +51,26 @@ func (provider *provider) addSpanMapperRoutes(router *mux.Router) error {
return err
}
if err := router.Handle("/api/v1/span_mapper_groups/test", handler.New(
provider.authzMiddleware.ViewAccess(provider.spanMapperHandler.TestMappers),
handler.OpenAPIDef{
ID: "TestSpanMappers",
Tags: []string{"spanmapper"},
Summary: "Test span mappers against sample spans",
Description: "Tests how span mappers would transform sample spans",
Request: new(spantypes.PostableSpanMapperTest),
RequestContentType: "application/json",
Response: new(spantypes.GettableSpanMapperTest),
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusOK,
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound},
Deprecated: false,
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
},
)).Methods(http.MethodPost).GetError(); err != nil {
return err
}
if err := router.Handle("/api/v1/span_mapper_groups/{groupId}", handler.New(
provider.authzMiddleware.AdminAccess(provider.spanMapperHandler.UpdateGroup),
handler.OpenAPIDef{

View File

@@ -3,16 +3,15 @@ package flagger
import "github.com/SigNoz/signoz/pkg/types/featuretypes"
var (
FeatureUseSpanMetrics = featuretypes.MustNewName("use_span_metrics")
FeatureKafkaSpanEval = featuretypes.MustNewName("kafka_span_eval")
FeatureHideRootUser = featuretypes.MustNewName("hide_root_user")
FeatureGetMetersFromZeus = featuretypes.MustNewName("get_meters_from_zeus")
FeaturePutMetersInZeus = featuretypes.MustNewName("put_meters_in_zeus")
FeatureUseMeterReporter = featuretypes.MustNewName("use_meter_reporter")
FeatureUseJSONBody = featuretypes.MustNewName("use_json_body")
FeatureUseFineGrainedAuthz = featuretypes.MustNewName("use_fine_grained_authz")
FeatureUseDashboardV2 = featuretypes.MustNewName("use_dashboard_v2")
FeatureEnableMetricsReduction = featuretypes.MustNewName("enable_metrics_reduction")
FeatureUseSpanMetrics = featuretypes.MustNewName("use_span_metrics")
FeatureKafkaSpanEval = featuretypes.MustNewName("kafka_span_eval")
FeatureHideRootUser = featuretypes.MustNewName("hide_root_user")
FeatureGetMetersFromZeus = featuretypes.MustNewName("get_meters_from_zeus")
FeaturePutMetersInZeus = featuretypes.MustNewName("put_meters_in_zeus")
FeatureUseMeterReporter = featuretypes.MustNewName("use_meter_reporter")
FeatureUseJSONBody = featuretypes.MustNewName("use_json_body")
FeatureUseFineGrainedAuthz = featuretypes.MustNewName("use_fine_grained_authz")
FeatureUseDashboardV2 = featuretypes.MustNewName("use_dashboard_v2")
)
func MustNewRegistry() featuretypes.Registry {
@@ -89,14 +88,6 @@ func MustNewRegistry() featuretypes.Registry {
DefaultVariant: featuretypes.MustNewName("disabled"),
Variants: featuretypes.NewBooleanVariants(),
},
&featuretypes.Feature{
Name: FeatureEnableMetricsReduction,
Kind: featuretypes.KindBoolean,
Stage: featuretypes.StageExperimental,
Description: "Controls whether metrics cardinality reduction (buffer/reduced tables) is read by the querier",
DefaultVariant: featuretypes.MustNewName("disabled"),
Variants: featuretypes.NewBooleanVariants(),
},
)
if err != nil {
panic(err)

View File

@@ -341,12 +341,12 @@ func alignedMetricWindow(startMs, endMs int64) (
}
tsAdjustedStartMs, _, distributedTSTable, localTSTable := telemetrymetrics.WhichTSTableToUse(
samplesAdjustedStartMs, flooredEndMs, false, nil,
samplesAdjustedStartMs, flooredEndMs, nil,
)
distributedSamplesTable, localSamplesTable := telemetrymetrics.WhichSamplesTableToUse(
samplesAdjustedStartMs, flooredEndMs,
metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, false, nil,
metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, nil,
)
return samplesAdjustedStartMs, flooredEndMs, tsAdjustedStartMs, distributedTSTable, localTSTable, distributedSamplesTable, localSamplesTable

View File

@@ -141,7 +141,7 @@ func (m *module) listMetrics(ctx context.Context, orgID valuer.UUID, params *met
sb.Select("DISTINCT metric_name")
if params.Start != nil && params.End != nil {
start, end, distributedTsTable, _ := telemetrymetrics.WhichTSTableToUse(uint64(*params.Start), uint64(*params.End), false, nil)
start, end, distributedTsTable, _ := telemetrymetrics.WhichTSTableToUse(uint64(*params.Start), uint64(*params.End), nil)
sb.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, distributedTsTable))
sb.Where(sb.Between("unix_milli", start, end))
} else {
@@ -527,7 +527,7 @@ func (m *module) InspectMetrics(
return nil, err
}
tsStart, _, tsTable, _ := telemetrymetrics.WhichTSTableToUse(start, end, false, nil)
tsStart, _, tsTable, _ := telemetrymetrics.WhichTSTableToUse(start, end, nil)
tsSb := sqlbuilder.NewSelectBuilder()
tsSb.Select("fingerprint", "labels")
tsSb.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, tsTable))
@@ -971,8 +971,8 @@ func (m *module) fetchMetricsStatsWithSamples(
}
}
start, end, distributedTsTable, localTsTable := telemetrymetrics.WhichTSTableToUse(uint64(req.Start), uint64(req.End), false, nil)
distributedSamplesTable, _ := telemetrymetrics.WhichSamplesTableToUse(uint64(req.Start), uint64(req.End), metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, false, nil)
start, end, distributedTsTable, localTsTable := telemetrymetrics.WhichTSTableToUse(uint64(req.Start), uint64(req.End), nil)
distributedSamplesTable, _ := telemetrymetrics.WhichSamplesTableToUse(uint64(req.Start), uint64(req.End), metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, nil)
countExp := telemetrymetrics.CountExpressionForSamplesTable(distributedSamplesTable)
// Timeseries counts per metric
@@ -1100,7 +1100,7 @@ func (m *module) computeTimeseriesTreemap(ctx context.Context, req *metricsexplo
}
}
start, end, distributedTsTable, _ := telemetrymetrics.WhichTSTableToUse(uint64(req.Start), uint64(req.End), false, nil)
start, end, distributedTsTable, _ := telemetrymetrics.WhichTSTableToUse(uint64(req.Start), uint64(req.End), nil)
totalTSBuilder := sqlbuilder.NewSelectBuilder()
totalTSBuilder.Select("uniq(fingerprint) AS total_time_series")
@@ -1176,8 +1176,8 @@ func (m *module) computeSamplesTreemap(ctx context.Context, req *metricsexplorer
}
}
start, end, distributedTsTable, localTsTable := telemetrymetrics.WhichTSTableToUse(uint64(req.Start), uint64(req.End), false, nil)
distributedSamplesTable, _ := telemetrymetrics.WhichSamplesTableToUse(uint64(req.Start), uint64(req.End), metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, false, nil)
start, end, distributedTsTable, localTsTable := telemetrymetrics.WhichTSTableToUse(uint64(req.Start), uint64(req.End), nil)
distributedSamplesTable, _ := telemetrymetrics.WhichSamplesTableToUse(uint64(req.Start), uint64(req.End), metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, nil)
countExp := telemetrymetrics.CountExpressionForSamplesTable(distributedSamplesTable)
candidateLimit := req.Limit + 50

View File

@@ -273,6 +273,35 @@ func (h *handler) DeleteMapper(rw http.ResponseWriter, r *http.Request) {
render.Success(rw, http.StatusNoContent, nil)
}
// TestMappers handles POST /api/v1/span_mapper_groups/test.
func (h *handler) TestMappers(rw http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()
claims, err := authtypes.ClaimsFromContext(ctx)
if err != nil {
render.Error(rw, err)
return
}
orgID := valuer.MustNewUUID(claims.OrgID)
req := new(spantypes.PostableSpanMapperTest)
if err := binding.JSON.BindBody(r.Body, req); err != nil {
render.Error(rw, err)
return
}
groups := spantypes.NewSpanMapperGroupsWithMappersFromPostable(orgID, req.Groups)
out, err := h.module.TestMappers(ctx, orgID, req.Spans, groups)
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusOK, &spantypes.GettableSpanMapperTest{Spans: out})
}
// groupIDFromPath extracts and validates the {id} or {groupId} path variable.
func groupIDFromPath(r *http.Request) (valuer.UUID, error) {
vars := mux.Vars(r)

View File

@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/modules/spanmapper"
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
@@ -102,6 +103,54 @@ func (module *module) DeleteMapper(ctx context.Context, orgID, groupID, id value
return nil
}
func (module *module) TestMappers(ctx context.Context, orgID valuer.UUID, spans []spantypes.SpanMapperTestSpan, groups []*spantypes.SpanMapperGroupWithMappers) ([]spantypes.SpanMapperTestSpan, error) {
if len(spans) == 0 {
return nil, errors.New(errors.TypeInvalidInput, spantypes.ErrCodeMappingInvalidInput, "'spans' must contain at least one span")
}
_, err := module.backfillMappers(ctx, orgID, groups)
if err != nil {
return nil, err
}
// out, _, err := spantypes.SimulateSpanMappersProcessing(ctx, resolved, spans)
// if err != nil {
// return nil, err
// }
// return out, nil
return nil, nil
}
// backfillMappers loads saved mappers for any group whose Mappers is nil.
func (module *module) backfillMappers(ctx context.Context, orgID valuer.UUID, groups []*spantypes.SpanMapperGroupWithMappers) ([]*spantypes.SpanMapperGroupWithMappers, error) {
// Load all the saved groups for this org, so we can look up by name.
savedGroups, err := module.store.ListGroups(ctx, orgID, &spantypes.ListSpanMapperGroupsQuery{})
if err != nil {
return nil, err
}
savedByName := make(map[string]*spantypes.SpanMapperGroup, len(savedGroups))
for _, g := range savedGroups {
savedByName[g.Name] = g
}
// For each group in the request, if Mappers is nil, load the saved mappers for that group name.
for _, g := range groups {
if g.Mappers != nil {
continue
}
saved, ok := savedByName[g.Group.Name]
if !ok {
return nil, errors.Newf(errors.TypeInvalidInput, spantypes.ErrCodeMappingGroupNotFound, "no saved group named %q to load mappers from; send 'mappers' for new or edited groups", g.Group.Name)
}
loaded, err := module.store.ListMappers(ctx, orgID, saved.ID)
if err != nil {
return nil, err
}
g.Mappers = loaded
}
return groups, nil
}
func (module *module) AgentFeatureType() agentConf.AgentFeatureType {
return spantypes.SpanAttrMappingFeatureType
}

View File

@@ -27,6 +27,7 @@ type Module interface {
CreateMapper(ctx context.Context, orgID, groupID valuer.UUID, mapper *spantypes.SpanMapper) error
UpdateMapper(ctx context.Context, orgID, groupID, id valuer.UUID, fieldContext spantypes.FieldContext, config *spantypes.SpanMapperConfig, enabled *bool, updatedBy string) error
DeleteMapper(ctx context.Context, orgID, groupID, id valuer.UUID) error
TestMappers(ctx context.Context, orgID valuer.UUID, spans []spantypes.SpanMapperTestSpan, groups []*spantypes.SpanMapperGroupWithMappers) ([]spantypes.SpanMapperTestSpan, error)
}
// Handler defines the HTTP handler interface for mapping group and mapper endpoints.
@@ -42,4 +43,5 @@ type Handler interface {
CreateMapper(rw http.ResponseWriter, r *http.Request)
UpdateMapper(rw http.ResponseWriter, r *http.Request)
DeleteMapper(rw http.ResponseWriter, r *http.Request)
TestMappers(rw http.ResponseWriter, r *http.Request)
}

View File

@@ -91,22 +91,13 @@ func (q *builderQuery[T]) Fingerprint() string {
if a.ComparisonSpaceAggregationParam != nil {
spaceAggParamStr = a.ComparisonSpaceAggregationParam.StringValue()
}
part := fmt.Sprintf("%s:%s:%s:%s:%s",
aggParts = append(aggParts, fmt.Sprintf("%s:%s:%s:%s:%s",
a.MetricName,
a.Temporality.StringValue(),
a.TimeAggregation.StringValue(),
a.SpaceAggregation.StringValue(),
spaceAggParamStr,
)
if a.Reduced {
oneDay := uint64(24 * time.Hour.Milliseconds())
route := "reduced"
if q.toMS-q.fromMS < oneDay && q.fromMS >= uint64(time.Now().UnixMilli())-oneDay {
route = "buffer"
}
part += ":" + route
}
aggParts = append(aggParts, part)
))
}
}
parts = append(parts, fmt.Sprintf("aggs=[%s]", strings.Join(aggParts, ",")))

View File

@@ -119,7 +119,7 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
queries := make(map[string]qbtypes.Query)
steps := make(map[string]qbtypes.Step)
missingMetricQueries, metricWarnings, err := q.resolveMetricMetadata(ctx, orgID, req.CompositeQuery.Queries, req.Start, req.End)
missingMetricQueries, metricWarnings, err := q.resolveMetricMetadata(ctx, req.CompositeQuery.Queries, req.Start, req.End)
if err != nil {
return nil, err
}
@@ -304,7 +304,7 @@ func (q *querier) populateQBEvent(event *qbtypes.QBEvent, queries []qbtypes.Quer
// resolved: never-seen metrics and dormant metrics (seen but no data in
// the query window).
// - err: Internal when a metadata fetch fails.
func (q *querier) resolveMetricMetadata(ctx context.Context, orgID valuer.UUID, queries []qbtypes.QueryEnvelope, start, end uint64) (missingMetricQueries []string, metricWarnings []string, err error) {
func (q *querier) resolveMetricMetadata(ctx context.Context, queries []qbtypes.QueryEnvelope, start, end uint64) (missingMetricQueries []string, metricWarnings []string, err error) {
metricNames := make([]string, 0)
for idx := range queries {
if queries[idx].Type != qbtypes.QueryTypeBuilder {
@@ -325,7 +325,7 @@ func (q *querier) resolveMetricMetadata(ctx context.Context, orgID valuer.UUID,
return nil, nil, nil
}
metricTemporality, metricTypes, reducedMetricsSet, err := q.metadataStore.FetchTemporalityAndTypeMulti(ctx, orgID, start, end, metricNames...)
metricTemporality, metricTypes, err := q.metadataStore.FetchTemporalityAndTypeMulti(ctx, start, end, metricNames...)
if err != nil {
q.logger.WarnContext(ctx, "failed to fetch metric temporality", errors.Attr(err), slog.Any("metrics", metricNames))
return nil, nil, errors.NewInternalf(errors.CodeInternal, "failed to fetch metrics temporality")
@@ -362,9 +362,6 @@ func (q *querier) resolveMetricMetadata(ctx context.Context, orgID valuer.UUID,
if err := spec.Aggregations[i].ValidateForType(); err != nil {
return nil, nil, err
}
if reducedMetricsSet[spec.Aggregations[i].MetricName] {
spec.Aggregations[i].Reduced = true
}
presentAggregations = append(presentAggregations, spec.Aggregations[i])
}
if len(presentAggregations) == 0 {

View File

@@ -2136,12 +2136,12 @@ func (t *telemetryMetaStore) GetAllValues(ctx context.Context, fieldValueSelecto
return values, complete, nil
}
func (t *telemetryMetaStore) FetchTemporality(ctx context.Context, orgID valuer.UUID, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricName string) (metrictypes.Temporality, error) {
func (t *telemetryMetaStore) FetchTemporality(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricName string) (metrictypes.Temporality, error) {
if metricName == "" {
return metrictypes.Unknown, errors.Newf(errors.TypeInternal, errors.CodeInternal, "metric name cannot be empty")
}
temporalityMap, err := t.FetchTemporalityMulti(ctx, orgID, queryTimeRangeStartTs, queryTimeRangeEndTs, metricName)
temporalityMap, err := t.FetchTemporalityMulti(ctx, queryTimeRangeStartTs, queryTimeRangeEndTs, metricName)
if err != nil {
return metrictypes.Unknown, err
}
@@ -2154,27 +2154,25 @@ func (t *telemetryMetaStore) FetchTemporality(ctx context.Context, orgID valuer.
return temporality, nil
}
func (t *telemetryMetaStore) FetchTemporalityMulti(ctx context.Context, orgID valuer.UUID, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string]metrictypes.Temporality, error) {
temporalities, _, _, err := t.FetchTemporalityAndTypeMulti(ctx, orgID, queryTimeRangeStartTs, queryTimeRangeEndTs, metricNames...)
func (t *telemetryMetaStore) FetchTemporalityMulti(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string]metrictypes.Temporality, error) {
temporalities, _, err := t.FetchTemporalityAndTypeMulti(ctx, queryTimeRangeStartTs, queryTimeRangeEndTs, metricNames...)
return temporalities, err
}
func (t *telemetryMetaStore) FetchTemporalityAndTypeMulti(ctx context.Context, orgID valuer.UUID, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string]metrictypes.Temporality, map[string]metrictypes.Type, map[string]bool, error) {
func (t *telemetryMetaStore) FetchTemporalityAndTypeMulti(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string]metrictypes.Temporality, map[string]metrictypes.Type, error) {
if len(metricNames) == 0 {
return make(map[string]metrictypes.Temporality), make(map[string]metrictypes.Type), make(map[string]bool), nil
return make(map[string]metrictypes.Temporality), make(map[string]metrictypes.Type), nil
}
reductionEnabled := t.fl.BooleanOrEmpty(ctx, flagger.FeatureEnableMetricsReduction, featuretypes.NewFlaggerEvaluationContext(orgID))
temporalities := make(map[string]metrictypes.Temporality)
types := make(map[string]metrictypes.Type)
metricsTemporality, metricTypes, reduced, err := t.fetchMetricsTemporalityAndType(ctx, queryTimeRangeStartTs, queryTimeRangeEndTs, reductionEnabled, metricNames...)
metricsTemporality, metricTypes, err := t.fetchMetricsTemporalityAndType(ctx, queryTimeRangeStartTs, queryTimeRangeEndTs, metricNames...)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
meterMetricsTemporality, meterMetricsTypes, err := t.fetchMeterSourceMetricsTemporalityAndType(ctx, metricNames...)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
// For metrics not found in the database, set to Unknown
@@ -2199,10 +2197,10 @@ func (t *telemetryMetaStore) FetchTemporalityAndTypeMulti(ctx context.Context, o
}
}
return temporalities, types, reduced, nil
return temporalities, types, nil
}
func (t *telemetryMetaStore) fetchMetricsTemporalityAndType(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, reductionEnabled bool, metricNames ...string) (map[string][]metrictypes.Temporality, map[string]metrictypes.Type, map[string]bool, error) {
func (t *telemetryMetaStore) fetchMetricsTemporalityAndType(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string][]metrictypes.Temporality, map[string]metrictypes.Type, error) {
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
instrumentationtypes.TelemetrySignal: telemetrytypes.SignalMetrics.StringValue(),
instrumentationtypes.CodeNamespace: "metadata",
@@ -2210,58 +2208,48 @@ func (t *telemetryMetaStore) fetchMetricsTemporalityAndType(ctx context.Context,
})
temporalities := make(map[string][]metrictypes.Temporality)
types := make(map[string]metrictypes.Type)
reduced := make(map[string]bool)
adjustedStartTs, adjustedEndTs, tsTableName, _ := telemetrymetrics.WhichTSTableToUse(queryTimeRangeStartTs, queryTimeRangeEndTs, false, nil)
adjustedStartTs, adjustedEndTs, tsTableName, _ := telemetrymetrics.WhichTSTableToUse(queryTimeRangeStartTs, queryTimeRangeEndTs, nil)
cols := []string{"metric_name", "temporality", "any(type) AS type", "any(is_monotonic) as is_monotonic"}
// Build query to fetch temporality for all metrics
// We use attr_string_value where attr_name = '__temporality__'
// Note: The columns are mixed in the current data - temporality column contains metric_name
// and metric_name column contains temporality value, so we use the correct mapping
sb := sqlbuilder.Select(
"metric_name",
"temporality",
"any(type) AS type",
"any(is_monotonic) as is_monotonic",
).
From(t.metricsDBName + "." + tsTableName)
// When reduction is enabled, fold the reduced-catalog presence check into the
// same query so a metric's reduced status comes back in one round trip.
var reducedArgs []any
if reductionEnabled {
rs := sqlbuilder.NewSelectBuilder()
rs.Select("metric_name")
rs.From(t.metricsDBName + "." + telemetrymetrics.TimeseriesV4ReducedTableName)
rs.Where(rs.In("metric_name", metricNames), rs.GTE("unix_milli", adjustedStartTs), rs.LT("unix_milli", adjustedEndTs))
rs.GroupBy("metric_name")
rsQuery, rsArgs := rs.BuildWithFlavor(sqlbuilder.ClickHouse)
cols = append(cols, fmt.Sprintf("metric_name GLOBAL IN (%s) AS reduced", rsQuery))
reducedArgs = rsArgs
}
sb := sqlbuilder.NewSelectBuilder()
sb.Select(cols...)
sb.From(t.metricsDBName + "." + tsTableName)
// Filter by metric names (in the temporality column due to data mix-up)
sb.Where(
sb.In("metric_name", metricNames),
sb.GTE("unix_milli", adjustedStartTs),
sb.LT("unix_milli", adjustedEndTs),
)
sb.GroupBy("metric_name", "temporality")
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse, reducedArgs...)
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
t.logger.DebugContext(ctx, "fetching metric temporality", slog.String("query", query), slog.Any("args", args))
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
return nil, nil, nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to fetch metric temporality")
return nil, nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to fetch metric temporality")
}
defer rows.Close()
// Process results
for rows.Next() {
var metricName string
var temporality metrictypes.Temporality
var metricType metrictypes.Type
var isMonotonic bool
var isReduced uint8
dest := []any{&metricName, &temporality, &metricType, &isMonotonic}
if reductionEnabled {
dest = append(dest, &isReduced)
}
if err := rows.Scan(dest...); err != nil {
return nil, nil, nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to scan temporality result")
if err := rows.Scan(&metricName, &temporality, &metricType, &isMonotonic); err != nil {
return nil, nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to scan temporality result")
}
if temporality != metrictypes.Unknown {
temporalities[metricName] = append(temporalities[metricName], temporality)
@@ -2270,15 +2258,12 @@ func (t *telemetryMetaStore) fetchMetricsTemporalityAndType(ctx context.Context,
metricType = metrictypes.GaugeType
}
types[metricName] = metricType
if isReduced != 0 {
reduced[metricName] = true
}
}
if err := rows.Err(); err != nil {
return nil, nil, nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "error iterating over metrics temporality rows")
return nil, nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "error iterating over metrics temporality rows")
}
return temporalities, types, reduced, nil
return temporalities, types, nil
}
func (t *telemetryMetaStore) fetchMeterSourceMetricsTemporalityAndType(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, map[string]metrictypes.Type, error) {

View File

@@ -1,157 +0,0 @@
package telemetrymetrics
import (
"context"
"testing"
"time"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest"
"github.com/stretchr/testify/require"
)
func reducedQuery(metric string, ty metrictypes.Type, temp metrictypes.Temporality, ta metrictypes.TimeAggregation, sa metrictypes.SpaceAggregation) qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation] {
return qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Signal: telemetrytypes.SignalMetrics,
StepInterval: qbtypes.Step{Duration: 5 * time.Minute},
Aggregations: []qbtypes.MetricAggregation{{
MetricName: metric,
Type: ty,
Temporality: temp,
TimeAggregation: ta,
SpaceAggregation: sa,
Reduced: true,
}},
}
}
func TestReducedStatementBuilder(t *testing.T) {
cases := []struct {
name string
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]
expected qbtypes.Statement
}{
{
name: "gauge_sum_latest",
query: reducedQuery("test.metric", metrictypes.GaugeType, metrictypes.Unspecified, metrictypes.TimeAggregationLatest, metrictypes.SpaceAggregationSum),
expected: qbtypes.Statement{
Query: "SELECT * FROM (WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(300)) AS ts, anyLast(last) AS per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m AS points INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? GROUP BY fingerprint) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts) SELECT * FROM __spatial_aggregation_cte ORDER BY ts) UNION ALL SELECT * FROM (WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(300)) AS ts, argMax(value, unix_milli) AS per_series_value FROM (SELECT reduced_fingerprint AS fingerprint, unix_milli, argMax(`sum_last`, computed_at) AS value FROM signoz_metrics.distributed_samples_v4_reduced_last_60s WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY reduced_fingerprint, unix_milli) AS points INNER JOIN (SELECT fingerprint FROM signoz_metrics.distributed_time_series_v4_reduced WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND __normalized = ? GROUP BY fingerprint) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint GROUP BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, sum(per_series_value) AS value FROM __temporal_aggregation_cte GROUP BY ts) SELECT * FROM __spatial_aggregation_cte ORDER BY ts) ORDER BY ts",
Args: []any{"test.metric", uint64(1746921600000), uint64(1747172760000), "unspecified", false, "test.metric", uint64(1746999900000), uint64(1747172760000), 0, "test.metric", uint64(1746999900000), uint64(1747172760000), "test.metric", uint64(1746999900000), uint64(1747172760000), false},
},
},
{
name: "gauge_avg_avg",
query: reducedQuery("test.metric", metrictypes.GaugeType, metrictypes.Unspecified, metrictypes.TimeAggregationAvg, metrictypes.SpaceAggregationAvg),
expected: qbtypes.Statement{
Query: "SELECT * FROM (WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(300)) AS ts, sum(sum) / sum(count) AS per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m AS points INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? GROUP BY fingerprint) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, avg(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts) SELECT * FROM __spatial_aggregation_cte ORDER BY ts) UNION ALL SELECT * FROM (WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(300)) AS ts, avg(value) AS per_series_value, avg(weight) AS per_series_weight FROM (SELECT reduced_fingerprint AS fingerprint, unix_milli, argMax(`sum_last`, computed_at) AS value, argMax(`count_series`, computed_at) AS weight FROM signoz_metrics.distributed_samples_v4_reduced_last_60s WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY reduced_fingerprint, unix_milli) AS points INNER JOIN (SELECT fingerprint FROM signoz_metrics.distributed_time_series_v4_reduced WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND __normalized = ? GROUP BY fingerprint) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint GROUP BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, sum(per_series_value) / sum(per_series_weight) AS value FROM __temporal_aggregation_cte GROUP BY ts) SELECT * FROM __spatial_aggregation_cte ORDER BY ts) ORDER BY ts",
Args: []any{"test.metric", uint64(1746921600000), uint64(1747172760000), "unspecified", false, "test.metric", uint64(1746999900000), uint64(1747172760000), 0, "test.metric", uint64(1746999900000), uint64(1747172760000), "test.metric", uint64(1746999900000), uint64(1747172760000), false},
},
},
{
name: "gauge_min_min",
query: reducedQuery("test.metric", metrictypes.GaugeType, metrictypes.Unspecified, metrictypes.TimeAggregationMin, metrictypes.SpaceAggregationMin),
expected: qbtypes.Statement{
Query: "SELECT * FROM (WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(300)) AS ts, min(min) AS per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m AS points INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? GROUP BY fingerprint) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, min(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts) SELECT * FROM __spatial_aggregation_cte ORDER BY ts) UNION ALL SELECT * FROM (WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(300)) AS ts, min(value) AS per_series_value FROM (SELECT reduced_fingerprint AS fingerprint, unix_milli, argMax(`min`, computed_at) AS value FROM signoz_metrics.distributed_samples_v4_reduced_last_60s WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY reduced_fingerprint, unix_milli) AS points INNER JOIN (SELECT fingerprint FROM signoz_metrics.distributed_time_series_v4_reduced WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND __normalized = ? GROUP BY fingerprint) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint GROUP BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, min(per_series_value) AS value FROM __temporal_aggregation_cte GROUP BY ts) SELECT * FROM __spatial_aggregation_cte ORDER BY ts) ORDER BY ts",
Args: []any{"test.metric", uint64(1746921600000), uint64(1747172760000), "unspecified", false, "test.metric", uint64(1746999900000), uint64(1747172760000), 0, "test.metric", uint64(1746999900000), uint64(1747172760000), "test.metric", uint64(1746999900000), uint64(1747172760000), false},
},
},
{
name: "gauge_max_max",
query: reducedQuery("test.metric", metrictypes.GaugeType, metrictypes.Unspecified, metrictypes.TimeAggregationMax, metrictypes.SpaceAggregationMax),
expected: qbtypes.Statement{
Query: "SELECT * FROM (WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(300)) AS ts, max(max) AS per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m AS points INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? GROUP BY fingerprint) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, max(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts) SELECT * FROM __spatial_aggregation_cte ORDER BY ts) UNION ALL SELECT * FROM (WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(300)) AS ts, max(value) AS per_series_value FROM (SELECT reduced_fingerprint AS fingerprint, unix_milli, argMax(`max`, computed_at) AS value FROM signoz_metrics.distributed_samples_v4_reduced_last_60s WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY reduced_fingerprint, unix_milli) AS points INNER JOIN (SELECT fingerprint FROM signoz_metrics.distributed_time_series_v4_reduced WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND __normalized = ? GROUP BY fingerprint) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint GROUP BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, max(per_series_value) AS value FROM __temporal_aggregation_cte GROUP BY ts) SELECT * FROM __spatial_aggregation_cte ORDER BY ts) ORDER BY ts",
Args: []any{"test.metric", uint64(1746921600000), uint64(1747172760000), "unspecified", false, "test.metric", uint64(1746999900000), uint64(1747172760000), 0, "test.metric", uint64(1746999900000), uint64(1747172760000), "test.metric", uint64(1746999900000), uint64(1747172760000), false},
},
},
{
name: "counter_sum_rate",
query: reducedQuery("test.metric", metrictypes.SumType, metrictypes.Cumulative, metrictypes.TimeAggregationRate, metrictypes.SpaceAggregationSum),
expected: qbtypes.Statement{
Query: "SELECT * FROM (WITH __temporal_aggregation_cte AS (SELECT ts, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(300)) AS ts, max(max) AS per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m AS points INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? GROUP BY fingerprint) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts) SELECT * FROM __spatial_aggregation_cte ORDER BY ts) UNION ALL SELECT * FROM (WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(300)) AS ts, sum(value) / 300 AS per_series_value FROM (SELECT reduced_fingerprint AS fingerprint, unix_milli, argMax(`sum`, computed_at) AS value FROM signoz_metrics.distributed_samples_v4_reduced_sum_60s WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY reduced_fingerprint, unix_milli) AS points INNER JOIN (SELECT fingerprint FROM signoz_metrics.distributed_time_series_v4_reduced WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND __normalized = ? GROUP BY fingerprint) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint GROUP BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, sum(per_series_value) AS value FROM __temporal_aggregation_cte GROUP BY ts) SELECT * FROM __spatial_aggregation_cte ORDER BY ts) ORDER BY ts",
Args: []any{"test.metric", uint64(1746921600000), uint64(1747172760000), "cumulative", false, "test.metric", uint64(1746999600000), uint64(1747172760000), 0, "test.metric", uint64(1746999600000), uint64(1747172760000), "test.metric", uint64(1746999600000), uint64(1747172760000), false},
},
},
{
name: "counter_avg_increase",
query: reducedQuery("test.metric", metrictypes.SumType, metrictypes.Cumulative, metrictypes.TimeAggregationIncrease, metrictypes.SpaceAggregationAvg),
expected: qbtypes.Statement{
Query: "SELECT * FROM (WITH __temporal_aggregation_cte AS (SELECT ts, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value, per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(300)) AS ts, max(max) AS per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m AS points INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? GROUP BY fingerprint) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, avg(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts) SELECT * FROM __spatial_aggregation_cte ORDER BY ts) UNION ALL SELECT * FROM (WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(300)) AS ts, sum(value) AS per_series_value, avg(weight) AS per_series_weight FROM (SELECT reduced_fingerprint AS fingerprint, unix_milli, argMax(`sum`, computed_at) AS value, argMax(`count_series`, computed_at) AS weight FROM signoz_metrics.distributed_samples_v4_reduced_sum_60s WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY reduced_fingerprint, unix_milli) AS points INNER JOIN (SELECT fingerprint FROM signoz_metrics.distributed_time_series_v4_reduced WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND __normalized = ? GROUP BY fingerprint) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint GROUP BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, sum(per_series_value) / sum(per_series_weight) AS value FROM __temporal_aggregation_cte GROUP BY ts) SELECT * FROM __spatial_aggregation_cte ORDER BY ts) ORDER BY ts",
Args: []any{"test.metric", uint64(1746921600000), uint64(1747172760000), "cumulative", false, "test.metric", uint64(1746999600000), uint64(1747172760000), 0, "test.metric", uint64(1746999600000), uint64(1747172760000), "test.metric", uint64(1746999600000), uint64(1747172760000), false},
},
},
{
name: "counter_min_omitted",
query: reducedQuery("test.metric", metrictypes.SumType, metrictypes.Cumulative, metrictypes.TimeAggregationRate, metrictypes.SpaceAggregationMin),
expected: qbtypes.Statement{
Query: "WITH __temporal_aggregation_cte AS (SELECT ts, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(300)) AS ts, max(max) AS per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m AS points INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? GROUP BY fingerprint) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, min(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts) SELECT * FROM __spatial_aggregation_cte ORDER BY ts",
Args: []any{"test.metric", uint64(1746921600000), uint64(1747172760000), "cumulative", false, "test.metric", uint64(1746999600000), uint64(1747172760000), 0},
},
},
{
name: "counter_max_omitted",
query: reducedQuery("test.metric", metrictypes.SumType, metrictypes.Cumulative, metrictypes.TimeAggregationRate, metrictypes.SpaceAggregationMax),
expected: qbtypes.Statement{
Query: "WITH __temporal_aggregation_cte AS (SELECT ts, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(300)) AS ts, max(max) AS per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m AS points INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? GROUP BY fingerprint) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, max(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts) SELECT * FROM __spatial_aggregation_cte ORDER BY ts",
Args: []any{"test.metric", uint64(1746921600000), uint64(1747172760000), "cumulative", false, "test.metric", uint64(1746999600000), uint64(1747172760000), 0},
},
},
{
name: "histogram_p99",
query: reducedQuery("test.metric", metrictypes.HistogramType, metrictypes.Cumulative, metrictypes.TimeAggregationUnspecified, metrictypes.SpaceAggregationPercentile99),
expected: qbtypes.Statement{
Query: "SELECT * FROM (WITH __temporal_aggregation_cte AS (SELECT ts, `le`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(300)) AS ts, `le`, max(max) AS per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.time_series_v4_1day WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? GROUP BY fingerprint, `le`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `le` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `le`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `le`) SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) AS value FROM __spatial_aggregation_cte GROUP BY ts ORDER BY ts) UNION ALL SELECT * FROM (WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(300)) AS ts, `le`, sum(value) / 300 AS per_series_value FROM (SELECT reduced_fingerprint AS fingerprint, unix_milli, argMax(`sum`, computed_at) AS value FROM signoz_metrics.distributed_samples_v4_reduced_sum_60s WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY reduced_fingerprint, unix_milli) AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.distributed_time_series_v4_reduced WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND __normalized = ? GROUP BY fingerprint, `le`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint GROUP BY fingerprint, ts, `le`), __spatial_aggregation_cte AS (SELECT ts, `le`, sum(per_series_value) AS value FROM __temporal_aggregation_cte GROUP BY ts, `le`) SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) AS value FROM __spatial_aggregation_cte GROUP BY ts ORDER BY ts) ORDER BY ts",
Args: []any{"test.metric", uint64(1746921600000), uint64(1747172760000), "cumulative", false, "test.metric", uint64(1746999900000), uint64(1747172760000), 0, "test.metric", uint64(1746999900000), uint64(1747172760000), "test.metric", uint64(1746999900000), uint64(1747172760000), false},
},
},
{
name: "summary_avg",
query: reducedQuery("test.metric", metrictypes.SummaryType, metrictypes.Unspecified, metrictypes.TimeAggregationAvg, metrictypes.SpaceAggregationAvg),
expected: qbtypes.Statement{
Query: "SELECT * FROM (WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(300)) AS ts, sum(sum) / sum(count) AS per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m AS points INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? GROUP BY fingerprint) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, avg(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts) SELECT * FROM __spatial_aggregation_cte ORDER BY ts) UNION ALL SELECT * FROM (WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(300)) AS ts, avg(value) AS per_series_value, avg(weight) AS per_series_weight FROM (SELECT reduced_fingerprint AS fingerprint, unix_milli, argMax(`sum_last`, computed_at) AS value, argMax(`count_series`, computed_at) AS weight FROM signoz_metrics.distributed_samples_v4_reduced_last_60s WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY reduced_fingerprint, unix_milli) AS points INNER JOIN (SELECT fingerprint FROM signoz_metrics.distributed_time_series_v4_reduced WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND __normalized = ? GROUP BY fingerprint) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint GROUP BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, sum(per_series_value) / sum(per_series_weight) AS value FROM __temporal_aggregation_cte GROUP BY ts) SELECT * FROM __spatial_aggregation_cte ORDER BY ts) ORDER BY ts",
Args: []any{"test.metric", uint64(1746921600000), uint64(1747172760000), "unspecified", false, "test.metric", uint64(1746999900000), uint64(1747172760000), 0, "test.metric", uint64(1746999900000), uint64(1747172760000), "test.metric", uint64(1746999900000), uint64(1747172760000), false},
},
},
}
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
fl, err := flagger.New(context.Background(), instrumentationtest.New().ToProviderSettings(), flagger.Config{}, flagger.MustNewRegistry())
require.NoError(t, err)
sb := NewMetricQueryStatementBuilder(instrumentationtest.New().ToProviderSettings(), telemetrytypestest.NewMockMetadataStore(), fm, cb, fl)
const start, end = uint64(1747000000000), uint64(1747172800000)
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
got, err := sb.Build(context.Background(), start, end, qbtypes.RequestTypeTimeSeries, c.query, nil)
require.NoError(t, err)
require.Equal(t, c.expected.Query, got.Query)
require.Equal(t, c.expected.Args, got.Args)
})
}
t.Run("buffer_recent_window", func(t *testing.T) {
now := time.Now().UnixMilli()
q := reducedQuery("test.metric", metrictypes.GaugeType, metrictypes.Unspecified, metrictypes.TimeAggregationLatest, metrictypes.SpaceAggregationSum)
got, err := sb.Build(context.Background(), uint64(now-2*time.Hour.Milliseconds()), uint64(now), qbtypes.RequestTypeTimeSeries, q, nil)
require.NoError(t, err)
require.Contains(t, got.Query, "signoz_metrics.distributed_samples_v4_buffer")
require.Contains(t, got.Query, "signoz_metrics.time_series_v4_buffer")
require.Contains(t, got.Query, "is_reduced")
require.NotContains(t, got.Query, "UNION ALL")
})
t.Run("not_reduced", func(t *testing.T) {
q := reducedQuery("test.metric", metrictypes.GaugeType, metrictypes.Unspecified, metrictypes.TimeAggregationLatest, metrictypes.SpaceAggregationSum)
q.Aggregations[0].Reduced = false
got, err := sb.Build(context.Background(), start, end, qbtypes.RequestTypeTimeSeries, q, nil)
require.NoError(t, err)
require.NotContains(t, got.Query, "UNION ALL")
require.NotContains(t, got.Query, "reduced")
require.NotContains(t, got.Query, "buffer")
})
}

View File

@@ -4,7 +4,6 @@ import (
"context"
"fmt"
"log/slog"
"time"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/flagger"
@@ -181,30 +180,19 @@ func (b *MetricQueryStatementBuilder) buildPipelineStatement(
query.Aggregations[0].SpaceAggregation = metrictypes.SpaceAggregationSum
}
agg := query.Aggregations[0]
// A reduced metric reads the raw buffer for recent short windows, and
// samples_v4/agg (unioned with the reduced tables) otherwise. The buffer is
// shaped exactly like samples_v4 / time_series_v4, so once the table names are
// chosen the rest of the pipeline is unchanged.
useBuffer := agg.Reduced &&
end-start < oneDayInMilliseconds &&
start >= uint64(time.Now().UnixMilli())-oneDayInMilliseconds
samplesTable, _ := WhichSamplesTableToUse(start, end, agg.Type, agg.TimeAggregation, useBuffer, agg.TableHints)
tsStart, tsEnd, _, tsTable := WhichTSTableToUse(start, end, useBuffer, agg.TableHints)
var timeSeriesCTE string
var timeSeriesCTEArgs []any
var err error
if timeSeriesCTE, timeSeriesCTEArgs, err = b.buildTimeSeriesCTE(ctx, tsStart, tsEnd, query, keys, variables, tsTable); err != nil {
// time_series_cte
// this is applicable for all the queries
if timeSeriesCTE, timeSeriesCTEArgs, err = b.buildTimeSeriesCTE(ctx, start, end, query, keys, variables); err != nil {
return nil, err
}
if qbtypes.CanShortCircuitDelta(query.Aggregations[0]) {
// spatial_aggregation_cte directly for certain delta queries
if frag, args, err := b.buildTemporalAggDeltaFastPath(start, end, query, samplesTable, timeSeriesCTE, timeSeriesCTEArgs); err != nil {
if frag, args, err := b.buildTemporalAggDeltaFastPath(start, end, query, timeSeriesCTE, timeSeriesCTEArgs); err != nil {
return nil, err
} else if frag != "" {
cteFragments = append(cteFragments, frag)
@@ -212,7 +200,7 @@ func (b *MetricQueryStatementBuilder) buildPipelineStatement(
}
} else {
// temporal_aggregation_cte
if frag, args, err := b.buildTemporalAggregationCTE(ctx, start, end, query, keys, samplesTable, timeSeriesCTE, timeSeriesCTEArgs); err != nil {
if frag, args, err := b.buildTemporalAggregationCTE(ctx, start, end, query, keys, timeSeriesCTE, timeSeriesCTEArgs); err != nil {
return nil, err
} else if frag != "" {
cteFragments = append(cteFragments, frag)
@@ -226,188 +214,18 @@ func (b *MetricQueryStatementBuilder) buildPipelineStatement(
}
}
var reducedFragments []string
var reducedArgs [][]any
if agg.Reduced && !useBuffer {
var tsCTE string
var tsArgs []any
if tsCTE, tsArgs, err = b.buildReducedTimeSeriesCTE(ctx, start, end, query, keys, variables); err != nil {
return nil, err
}
if temporalFrag, temporalArgs, ok := b.buildReducedTemporalAggregationCTE(start, end, query, tsCTE, tsArgs); ok {
spatialFrag, spatialArgs := b.buildReducedSpatialAggregationCTE(query)
reducedFragments = []string{temporalFrag, spatialFrag}
reducedArgs = [][]any{temporalArgs, spatialArgs}
}
}
// reset the query to the original state
query.Aggregations[0].SpaceAggregation = origSpaceAgg
query.Aggregations[0].TimeAggregation = origTimeAgg
query.GroupBy = origGroupBy
mainStmt, err := b.BuildFinalSelect(cteFragments, cteArgs, query)
if err != nil {
return nil, err
}
if reducedFragments == nil {
return mainStmt, nil
}
reducedStmt, err := b.BuildFinalSelect(reducedFragments, reducedArgs, query)
if err != nil {
return nil, err
}
return unionStatements(mainStmt, reducedStmt, query)
}
func unionStatements(main, reduced *qbtypes.Statement, query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]) (*qbtypes.Statement, error) {
orderBy := "ts"
for _, g := range query.GroupBy {
orderBy = fmt.Sprintf("`%s`, ", g.Name) + orderBy
}
q := fmt.Sprintf("SELECT * FROM (%s) UNION ALL SELECT * FROM (%s) ORDER BY %s", main.Query, reduced.Query, orderBy)
args := append(append([]any{}, main.Args...), reduced.Args...)
warnings := append(append([]string{}, main.Warnings...), reduced.Warnings...)
return &qbtypes.Statement{Query: q, Args: args, Warnings: warnings}, nil
}
func (b *MetricQueryStatementBuilder) buildReducedTimeSeriesCTE(
ctx context.Context,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
keys map[string][]*telemetrytypes.TelemetryFieldKey,
variables map[string]qbtypes.VariableItem,
) (string, []any, error) {
sb := sqlbuilder.NewSelectBuilder()
var preparedWhereClause querybuilder.PreparedWhereClause
var err error
if query.Filter != nil && query.Filter.Expression != "" {
preparedWhereClause, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
Context: ctx,
Logger: b.logger,
FieldMapper: b.fm,
ConditionBuilder: b.cb,
FieldKeys: keys,
FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "labels"},
Variables: variables,
StartNs: start,
EndNs: end,
})
if err != nil {
return "", nil, err
}
}
sb.From(fmt.Sprintf("%s.%s", DBName, TimeseriesV4ReducedTableName))
sb.Select("fingerprint")
for _, g := range query.GroupBy {
col, err := b.fm.ColumnExpressionFor(ctx, start, end, &g.TelemetryFieldKey, keys)
if err != nil {
return "", nil, err
}
sb.SelectMore(col)
}
sb.Where(
sb.In("metric_name", query.Aggregations[0].MetricName),
sb.GTE("unix_milli", start),
sb.LTE("unix_milli", end),
sb.EQ("__normalized", false),
)
if !preparedWhereClause.IsEmpty() {
sb.AddWhereClause(preparedWhereClause.WhereClause)
}
sb.GroupBy("fingerprint")
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
return fmt.Sprintf("(%s) AS filtered_time_series", q), args, nil
}
func (b *MetricQueryStatementBuilder) buildReducedTemporalAggregationCTE(
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
timeSeriesCTE string,
timeSeriesCTEArgs []any,
) (string, []any, bool) {
agg := query.Aggregations[0]
stepSec := int64(query.StepInterval.Seconds())
value, weight, ok := ReducedValueColumn(agg.Type, agg.SpaceAggregation)
if !ok {
return "", nil, false
}
// dedup recomputed buckets: latest computed_at wins per (series, 60s bucket)
dedup := sqlbuilder.NewSelectBuilder()
dedup.Select("reduced_fingerprint AS fingerprint", "unix_milli")
dedup.SelectMore(fmt.Sprintf("argMax(%s, computed_at) AS value", value))
if weight != "" {
dedup.SelectMore(fmt.Sprintf("argMax(%s, computed_at) AS weight", weight))
}
dedup.From(fmt.Sprintf("%s.%s", DBName, WhichReducedSamplesTableToUse(agg.Type)))
dedup.Where(
dedup.In("metric_name", agg.MetricName),
dedup.GTE("unix_milli", start),
dedup.LT("unix_milli", end),
)
dedup.GroupBy("reduced_fingerprint", "unix_milli")
dedupQuery, dedupArgs := dedup.BuildWithFlavor(sqlbuilder.ClickHouse)
sb := sqlbuilder.NewSelectBuilder()
sb.Select("fingerprint")
sb.SelectMore(fmt.Sprintf("toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts", stepSec))
for _, g := range query.GroupBy {
sb.SelectMore(fmt.Sprintf("`%s`", g.Name))
}
sb.SelectMore(fmt.Sprintf("%s AS per_series_value", ReducedTimeAggregationColumn(agg.TimeAggregation, stepSec)))
if weight != "" {
// count_series is a series count, not additive over time, so the avg
// denominator is reduced with avg
sb.SelectMore("avg(weight) AS per_series_weight")
}
sb.From(fmt.Sprintf("(%s) AS points", dedupQuery))
sb.JoinWithOption(sqlbuilder.InnerJoin, timeSeriesCTE, "points.fingerprint = filtered_time_series.fingerprint")
sb.GroupBy("fingerprint", "ts")
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
initArgs := append(append([]any{}, dedupArgs...), timeSeriesCTEArgs...)
q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse, initArgs...)
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, true
}
func (b *MetricQueryStatementBuilder) buildReducedSpatialAggregationCTE(
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
) (string, []any) {
spatial := "sum(per_series_value)"
switch query.Aggregations[0].SpaceAggregation {
case metrictypes.SpaceAggregationAvg:
spatial = "sum(per_series_value) / sum(per_series_weight)"
case metrictypes.SpaceAggregationMin:
spatial = "min(per_series_value)"
case metrictypes.SpaceAggregationMax:
spatial = "max(per_series_value)"
}
sb := sqlbuilder.NewSelectBuilder()
sb.Select("ts")
for _, g := range query.GroupBy {
sb.SelectMore(fmt.Sprintf("`%s`", g.Name))
}
sb.SelectMore(spatial + " AS value")
sb.From("__temporal_aggregation_cte")
sb.GroupBy("ts")
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args
// final SELECT
return b.BuildFinalSelect(cteFragments, cteArgs, query)
}
func (b *MetricQueryStatementBuilder) buildTemporalAggDeltaFastPath(
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
samplesTable string,
timeSeriesCTE string,
timeSeriesCTEArgs []any,
) (string, []any, error) {
@@ -424,7 +242,8 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggDeltaFastPath(
}
aggCol, err := AggregationColumnForSamplesTable(
samplesTable, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation,
start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality,
query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints,
)
if err != nil {
return "", nil, err
@@ -441,7 +260,8 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggDeltaFastPath(
sb.SelectMore(fmt.Sprintf("%s AS value", aggCol))
sb.From(fmt.Sprintf("%s.%s AS points", DBName, samplesTable))
tbl, _ := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
sb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
sb.JoinWithOption(sqlbuilder.InnerJoin, timeSeriesCTE, "points.fingerprint = filtered_time_series.fingerprint")
sb.Where(
sb.In("metric_name", query.Aggregations[0].MetricName),
@@ -461,7 +281,6 @@ func (b *MetricQueryStatementBuilder) buildTimeSeriesCTE(
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
keys map[string][]*telemetrytypes.TelemetryFieldKey,
variables map[string]qbtypes.VariableItem,
tsTable string,
) (string, []any, error) {
sb := sqlbuilder.NewSelectBuilder()
@@ -485,7 +304,8 @@ func (b *MetricQueryStatementBuilder) buildTimeSeriesCTE(
}
}
sb.From(fmt.Sprintf("%s.%s", DBName, tsTable))
start, end, _, tbl := WhichTSTableToUse(start, end, query.Aggregations[0].TableHints)
sb.From(fmt.Sprintf("%s.%s", DBName, tbl))
sb.Select("fingerprint")
for _, g := range query.GroupBy {
@@ -511,12 +331,6 @@ func (b *MetricQueryStatementBuilder) buildTimeSeriesCTE(
sb.EQ("__normalized", false),
)
// the buffer holds both raw rows and the reduced catalog rows; the raw read
// only wants the original series
if tsTable == TimeseriesV4BufferLocalTableName {
sb.Where(sb.EQ("is_reduced", false))
}
if !preparedWhereClause.IsEmpty() {
sb.AddWhereClause(preparedWhereClause.WhereClause)
}
@@ -533,23 +347,21 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggregationCTE(
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
_ map[string][]*telemetrytypes.TelemetryFieldKey,
samplesTable string,
timeSeriesCTE string,
timeSeriesCTEArgs []any,
) (string, []any, error) {
if query.Aggregations[0].Temporality == metrictypes.Delta {
return b.buildTemporalAggDelta(ctx, start, end, query, samplesTable, timeSeriesCTE, timeSeriesCTEArgs)
return b.buildTemporalAggDelta(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
} else if query.Aggregations[0].Temporality != metrictypes.Multiple {
return b.buildTemporalAggCumulativeOrUnspecified(ctx, start, end, query, samplesTable, timeSeriesCTE, timeSeriesCTEArgs)
return b.buildTemporalAggCumulativeOrUnspecified(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
}
return b.buildTemporalAggForMultipleTemporalities(ctx, start, end, query, samplesTable, timeSeriesCTE, timeSeriesCTEArgs)
return b.buildTemporalAggForMultipleTemporalities(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
}
func (b *MetricQueryStatementBuilder) buildTemporalAggDelta(
_ context.Context,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
samplesTable string,
timeSeriesCTE string,
timeSeriesCTEArgs []any,
) (string, []any, error) {
@@ -566,7 +378,7 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggDelta(
sb.SelectMore(fmt.Sprintf("`%s`", g.Name))
}
aggCol, err := AggregationColumnForSamplesTable(samplesTable, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation)
aggCol, err := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
if err != nil {
return "", nil, err
}
@@ -577,7 +389,8 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggDelta(
sb.SelectMore(fmt.Sprintf("%s AS per_series_value", aggCol))
sb.From(fmt.Sprintf("%s.%s AS points", DBName, samplesTable))
tbl, _ := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
sb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
sb.JoinWithOption(sqlbuilder.InnerJoin, timeSeriesCTE, "points.fingerprint = filtered_time_series.fingerprint")
sb.Where(
sb.In("metric_name", query.Aggregations[0].MetricName),
@@ -596,7 +409,6 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
_ context.Context,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
samplesTable string,
timeSeriesCTE string,
timeSeriesCTEArgs []any,
) (string, []any, error) {
@@ -612,13 +424,14 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
baseSb.SelectMore(fmt.Sprintf("`%s`", g.Name))
}
aggCol, err := AggregationColumnForSamplesTable(samplesTable, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation)
aggCol, err := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
if err != nil {
return "", nil, err
}
baseSb.SelectMore(fmt.Sprintf("%s AS per_series_value", aggCol))
baseSb.From(fmt.Sprintf("%s.%s AS points", DBName, samplesTable))
tbl, _ := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
baseSb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
baseSb.JoinWithOption(sqlbuilder.InnerJoin, timeSeriesCTE, "points.fingerprint = filtered_time_series.fingerprint")
baseSb.Where(
baseSb.In("metric_name", query.Aggregations[0].MetricName),
@@ -662,7 +475,6 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggForMultipleTemporalities(
_ context.Context,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
samplesTable string,
timeSeriesCTE string,
timeSeriesCTEArgs []any,
) (string, []any, error) {
@@ -677,11 +489,11 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggForMultipleTemporalities(
sb.SelectMore(fmt.Sprintf("`%s`", g.Name))
}
aggForDeltaTemporality, err := AggregationColumnForSamplesTable(samplesTable, metrictypes.Delta, query.Aggregations[0].TimeAggregation)
aggForDeltaTemporality, err := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, metrictypes.Delta, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
if err != nil {
return "", nil, err
}
aggForCumulativeTemporality, err := AggregationColumnForSamplesTable(samplesTable, metrictypes.Cumulative, query.Aggregations[0].TimeAggregation)
aggForCumulativeTemporality, err := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, metrictypes.Cumulative, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
if err != nil {
return "", nil, err
}
@@ -709,7 +521,8 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggForMultipleTemporalities(
sb.SelectMore(expr)
}
sb.From(fmt.Sprintf("%s.%s AS points", DBName, samplesTable))
tbl, _ := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
sb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
sb.JoinWithOption(sqlbuilder.InnerJoin, timeSeriesCTE, "points.fingerprint = filtered_time_series.fingerprint")
sb.Where(
sb.In("metric_name", query.Aggregations[0].MetricName),

View File

@@ -30,17 +30,6 @@ const (
TimeseriesV41weekLocalTableName = "time_series_v4_1week"
AttributesMetadataTableName = "distributed_metadata"
AttributesMetadataLocalTableName = "metadata"
// The buffer holds raw points for ~24h; the reduced tables hold 60s
// aggregates of dropped-label series.
SamplesV4BufferTableName = "distributed_samples_v4_buffer"
SamplesV4BufferLocalTableName = "samples_v4_buffer"
TimeseriesV4BufferTableName = "distributed_time_series_v4_buffer"
TimeseriesV4BufferLocalTableName = "time_series_v4_buffer"
SamplesV4ReducedLastTableName = "distributed_samples_v4_reduced_last_60s"
SamplesV4ReducedSumTableName = "distributed_samples_v4_reduced_sum_60s"
TimeseriesV4ReducedTableName = "distributed_time_series_v4_reduced"
TimeseriesV4ReducedLocalTableName = "time_series_v4_reduced"
)
var (
@@ -60,16 +49,8 @@ var (
// in that order.
func WhichTSTableToUse(
start, end uint64,
useBuffer bool,
tableHints *metrictypes.MetricTableHints,
) (uint64, uint64, string, string) {
// the buffer holds the recent raw window for reduced metrics and has the same
// shape as time_series_v4; round the start to the hour like the v4 table.
if useBuffer {
start = start - (start % (oneHourInMilliseconds))
return start, end, TimeseriesV4BufferTableName, TimeseriesV4BufferLocalTableName
}
// if we have a hint for the table, we need to use it
// the hint will be used to override the default table selection logic
if tableHints != nil {
@@ -168,20 +149,14 @@ func WhichSamplesTableToUse(
start, end uint64,
metricType metrictypes.Type,
timeAggregation metrictypes.TimeAggregation,
useBuffer bool,
tableHints *metrictypes.MetricTableHints,
) (string, string) {
// the buffer holds the recent raw window for reduced metrics; same shape as samples_v4
if useBuffer {
return SamplesV4BufferTableName, SamplesV4BufferLocalTableName
}
// if we have a hint for the table, we need to use it
// the hint will be used to override the default table selection logic.
// SamplesTableName is the distributed name; derive the local via switch.
if tableHints != nil && tableHints.SamplesTableName != "" {
switch tableHints.SamplesTableName {
case SamplesV4TableName, SamplesV4BufferTableName:
case SamplesV4TableName:
return SamplesV4TableName, SamplesV4LocalTableName
case SamplesV4Agg5mTableName:
return SamplesV4Agg5mTableName, SamplesV4Agg5mLocalTableName
@@ -213,10 +188,13 @@ func WhichSamplesTableToUse(
}
func AggregationColumnForSamplesTable(
tableName string,
start, end uint64,
metricType metrictypes.Type,
temporality metrictypes.Temporality,
timeAggregation metrictypes.TimeAggregation,
tableHints *metrictypes.MetricTableHints,
) (string, error) {
tableName, _ := WhichSamplesTableToUse(start, end, metricType, timeAggregation, tableHints)
var aggregationColumn string
switch temporality {
case metrictypes.Delta:
@@ -224,7 +202,7 @@ func AggregationColumnForSamplesTable(
// although it doesn't make sense to use anyLast, avg, min, max, count on delta metrics,
// we are keeping it here to make sure that query will not be invalid
switch tableName {
case SamplesV4TableName, SamplesV4BufferTableName:
case SamplesV4TableName:
switch timeAggregation {
case metrictypes.TimeAggregationLatest:
aggregationColumn = "anyLast(value)"
@@ -266,7 +244,7 @@ func AggregationColumnForSamplesTable(
// for cumulative metrics, we only support `RATE`/`INCREASE`. The max value in window is
// used to calculate the sum which is then divided by the window size to get the rate
switch tableName {
case SamplesV4TableName, SamplesV4BufferTableName:
case SamplesV4TableName:
switch timeAggregation {
case metrictypes.TimeAggregationLatest:
aggregationColumn = "anyLast(value)"
@@ -306,7 +284,7 @@ func AggregationColumnForSamplesTable(
}
case metrictypes.Unspecified:
switch tableName {
case SamplesV4TableName, SamplesV4BufferTableName:
case SamplesV4TableName:
switch timeAggregation {
case metrictypes.TimeAggregationLatest:
aggregationColumn = "anyLast(value)"
@@ -354,65 +332,6 @@ func AggregationColumnForSamplesTable(
return aggregationColumn, nil
}
// WhichReducedSamplesTableToUse returns the 60s reduced samples table for a metric
// type: the last_60s table for gauge-like series, the sum_60s table for counters
// and histograms.
func WhichReducedSamplesTableToUse(metricType metrictypes.Type) string {
if metricType == metrictypes.SumType || metricType == metrictypes.HistogramType {
return SamplesV4ReducedSumTableName
}
return SamplesV4ReducedLastTableName
}
// ReducedValueColumn returns the reduced value column (and the avg-denominator
// weight) for a space aggregation. The reduced columns are pre-aggregated across
// the original series, so the space aggregation picks the underlying value; the
// sum table only has `sum`, so min/max across series have no column (ok=false).
func ReducedValueColumn(metricType metrictypes.Type, space metrictypes.SpaceAggregation) (value, weight string, ok bool) {
if metricType == metrictypes.SumType || metricType == metrictypes.HistogramType {
switch space {
case metrictypes.SpaceAggregationSum:
return "`sum`", "", true
case metrictypes.SpaceAggregationAvg:
return "`sum`", "`count_series`", true
}
return "", "", false
}
switch space {
case metrictypes.SpaceAggregationSum:
return "`sum_last`", "", true
case metrictypes.SpaceAggregationAvg:
return "`sum_last`", "`count_series`", true
case metrictypes.SpaceAggregationMin:
return "`min`", "", true
case metrictypes.SpaceAggregationMax:
return "`max`", "", true
}
return "", "", false
}
// ReducedTimeAggregationColumn applies the time aggregation to the reduced `value`
// column over the step's 60s buckets. latest uses argMax over the bucket timestamp
// (the buckets have no read order); rate divides the per-step sum by the step.
func ReducedTimeAggregationColumn(timeAggregation metrictypes.TimeAggregation, stepSec int64) string {
switch timeAggregation {
case metrictypes.TimeAggregationLatest:
return "argMax(value, unix_milli)"
case metrictypes.TimeAggregationAvg:
return "avg(value)"
case metrictypes.TimeAggregationMin:
return "min(value)"
case metrictypes.TimeAggregationMax:
return "max(value)"
case metrictypes.TimeAggregationCount:
return "count(value)"
case metrictypes.TimeAggregationRate:
return fmt.Sprintf("sum(value) / %d", stepSec)
default: // sum, increase
return "sum(value)"
}
}
func AggregationQueryForHistogramCountWithParams(param *metrictypes.ComparisonSpaceAggregationParam) (string, error) {
if param == nil {
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "no aggregation param provided for histogram count")

View File

@@ -480,9 +480,7 @@ type MetricAggregation struct {
// value filter to apply to the query
ValueFilter *metrictypes.MetricValueFilter `json:"-"`
// reduce to operator for metric scalar requests
ReduceTo ReduceTo `json:"reduceTo,omitzero"`
Reduced bool `json:"-"`
ReduceTo ReduceTo `json:"reduceTo,omitempty"`
}
// Copy creates a deep copy of MetricAggregation.

View File

@@ -0,0 +1,151 @@
package spantypes
import (
"github.com/SigNoz/signoz/pkg/errors"
"go.opentelemetry.io/collector/pdata/ptrace"
)
var (
ErrCodeProcessorFactoryMapFailed = errors.MustNewCode("processor_factory_map_failed")
ErrCodeSpanMapperSimulationFailed = errors.MustNewCode("span_mapper_simulation_failed")
)
// spanInputOrderAttr tags each input span with its index so the simulator
// output can be sorted back into input order. The collector simulator does
// not guarantee that traces come out in the order they went in.
const spanInputOrderAttr = "__signoz_input_idx__"
// SimulateSpanMappersProcessing runs the given spans through an in-memory
// collector pipeline that hosts signozspanmapperprocessor configured by the
// supplied groups, and returns the transformed spans. Mirrors
// SimulatePipelinesProcessing in pkg/query-service/app/logparsingpipeline.
// func SimulateSpanMappersProcessing(
// ctx context.Context,
// groups []*SpanMapperGroupWithMappers,
// spans []SpanMapperTestSpan,
// ) (
// []SpanMapperTestSpan, []string, error,
// ) {
// enabled := filterEnabledGroupsWithMappers(groups)
// if len(enabled) < 1 {
// return spans, nil, nil
// }
// for i := range spans {
// if spans[i].Attributes == nil {
// spans[i].Attributes = map[string]any{}
// }
// spans[i].Attributes[spanInputOrderAttr] = int64(i)
// }
// simulatorInput := SpansToPTraces(spans)
// processorFactories, err := otelcol.MakeFactoryMap(signozspanmapperprocessor.NewFactory())
// if err != nil {
// return nil, nil, errors.WrapInternalf(err, ErrCodeProcessorFactoryMapFailed, "could not construct processor factory map")
// }
// configGenerator := func(baseConf []byte) ([]byte, error) {
// return GenerateCollectorConfigWithSpanMapperProcessor(baseConf, enabled)
// }
// // signozspanmapperprocessor does no batching; spans flow through immediately.
// timeout := 200 * time.Millisecond
// outputTraces, collectorErrs, simErr := collectorsimulator.SimulateTracesProcessing(
// ctx,
// processorFactories,
// configGenerator,
// simulatorInput,
// timeout,
// )
// if simErr != nil {
// if errors.Is(simErr, collectorsimulator.ErrInvalidConfig) {
// return nil, nil, errors.WrapInvalidInputf(simErr, errors.CodeInvalidInput, "invalid config")
// }
// return nil, nil, errors.WrapInternalf(simErr, ErrCodeSpanMapperSimulationFailed, "could not simulate span mapper processing")
// }
// outputSpans := PTracesToSpans(outputTraces)
// sort.Slice(outputSpans, func(i, j int) bool {
// iIdx, _ := outputSpans[i].Attributes[spanInputOrderAttr].(int64)
// jIdx, _ := outputSpans[j].Attributes[spanInputOrderAttr].(int64)
// return iIdx < jIdx
// })
// for _, s := range outputSpans {
// delete(s.Attributes, spanInputOrderAttr)
// }
// collectorWarnAndErrorLogs := []string{}
// for _, log := range collectorErrs {
// if log == "" || strings.Contains(log, "featuregate.go") {
// continue
// }
// collectorWarnAndErrorLogs = append(collectorWarnAndErrorLogs, log)
// }
// return outputSpans, collectorWarnAndErrorLogs, nil
// }
// SpansToPTraces packs each input span into its own ptrace.Traces with one
// ResourceSpans / ScopeSpans / Span carrying its attribute and resource maps.
func SpansToPTraces(spans []SpanMapperTestSpan) []ptrace.Traces {
result := make([]ptrace.Traces, 0, len(spans))
for _, s := range spans {
td := ptrace.NewTraces()
rs := td.ResourceSpans().AppendEmpty()
if s.Resource != nil {
_ = rs.Resource().Attributes().FromRaw(s.Resource)
}
sl := rs.ScopeSpans().AppendEmpty()
span := sl.Spans().AppendEmpty()
if s.Attributes != nil {
_ = span.Attributes().FromRaw(s.Attributes)
}
result = append(result, td)
}
return result
}
// PTracesToSpans flattens simulator output back into SpanMapperTestSpan: one
// entry per individual Span across all ResourceSpans / ScopeSpans.
func PTracesToSpans(traces []ptrace.Traces) []SpanMapperTestSpan {
result := []SpanMapperTestSpan{}
for _, td := range traces {
rss := td.ResourceSpans()
for i := 0; i < rss.Len(); i++ {
rs := rss.At(i)
resourceAttrs := rs.Resource().Attributes().AsRaw()
ilss := rs.ScopeSpans()
for j := 0; j < ilss.Len(); j++ {
spans := ilss.At(j).Spans()
for k := 0; k < spans.Len(); k++ {
result = append(result, SpanMapperTestSpan{
Attributes: spans.At(k).Attributes().AsRaw(),
Resource: resourceAttrs,
})
}
}
}
}
return result
}
func filterEnabledGroupsWithMappers(groups []*SpanMapperGroupWithMappers) []*SpanMapperGroupWithMappers {
out := make([]*SpanMapperGroupWithMappers, 0, len(groups))
for _, gm := range groups {
if gm == nil || gm.Group == nil || !gm.Group.Enabled {
continue
}
enabled := make([]*SpanMapper, 0, len(gm.Mappers))
for _, m := range gm.Mappers {
if m != nil && m.Enabled {
enabled = append(enabled, m)
}
}
if len(enabled) > 0 {
out = append(out, &SpanMapperGroupWithMappers{Group: gm.Group, Mappers: enabled})
}
}
return out
}

View File

@@ -0,0 +1,53 @@
package spantypes
import (
"github.com/SigNoz/signoz/pkg/valuer"
)
type SpanMapperTestSpan struct {
Attributes map[string]any `json:"attributes"`
Resource map[string]any `json:"resource"`
}
// Mappers is optional because the module can backfill from the store by Group.Name.
type PostableSpanMapperTestGroup struct {
PostableSpanMapperGroup
Mappers []PostableSpanMapper `json:"mappers"`
}
type PostableSpanMapperTest struct {
Spans []SpanMapperTestSpan `json:"spans" required:"true"`
Groups []PostableSpanMapperTestGroup `json:"groups" required:"true"`
}
type GettableSpanMapperTest struct {
Spans []SpanMapperTestSpan `json:"spans" required:"true"`
}
func NewSpanMapperGroupsWithMappersFromPostable(orgID valuer.UUID, in []PostableSpanMapperTestGroup) []*SpanMapperGroupWithMappers {
out := make([]*SpanMapperGroupWithMappers, 0, len(in))
for _, pg := range in {
var mappers []*SpanMapper
if pg.Mappers != nil {
mappers = make([]*SpanMapper, 0, len(pg.Mappers))
for _, pm := range pg.Mappers {
mappers = append(mappers, &SpanMapper{
Name: pm.Name,
FieldContext: pm.FieldContext,
Config: pm.Config,
Enabled: pm.Enabled,
})
}
}
out = append(out, &SpanMapperGroupWithMappers{
Group: &SpanMapperGroup{
OrgID: orgID,
Name: pg.Name,
Condition: pg.Condition,
Enabled: pg.Enabled,
},
Mappers: mappers,
})
}
return out
}

View File

@@ -4,7 +4,6 @@ import (
"fmt"
"strings"
"github.com/SigNoz/signoz-otel-collector/exporter/jsontypeexporter"
"github.com/SigNoz/signoz/pkg/valuer"
)
@@ -22,7 +21,7 @@ const (
// BodyJSONStringSearchPrefix is the prefix used for body JSON search queries.
// e.g., "body.status" where "body." is the prefix.
BodyJSONStringSearchPrefix = "body."
ArraySep = jsontypeexporter.ArraySeparator
ArraySep = "[]."
ArraySepSuffix = "[]"
// TODO(Piyush): Remove once we've migrated to the new array syntax.
ArrayAnyIndex = "[*]."

View File

@@ -6,7 +6,6 @@ import (
"slices"
"strings"
"github.com/SigNoz/signoz-otel-collector/exporter/jsontypeexporter"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/valuer"
)
@@ -76,7 +75,7 @@ func (n *JSONAccessNode) Alias() string {
parentAlias := strings.TrimLeft(n.Parent.Alias(), "`")
parentAlias = strings.TrimRight(parentAlias, "`")
sep := jsontypeexporter.ArraySeparator
sep := "[]."
if n.Parent.isRoot {
sep = "."
}

View File

@@ -4,7 +4,6 @@ import (
"context"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
// MetadataStore is the interface for the telemetry metadata store.
@@ -27,12 +26,12 @@ type MetadataStore interface {
GetAllValues(ctx context.Context, fieldValueSelector *FieldValueSelector) (*TelemetryFieldValues, bool, error)
// FetchTemporality fetches the temporality for metric
FetchTemporality(ctx context.Context, orgID valuer.UUID, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricName string) (metrictypes.Temporality, error)
FetchTemporality(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricName string) (metrictypes.Temporality, error)
// FetchTemporalityMulti fetches the temporality for multiple metrics
FetchTemporalityMulti(ctx context.Context, orgID valuer.UUID, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string]metrictypes.Temporality, error)
FetchTemporalityMulti(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string]metrictypes.Temporality, error)
FetchTemporalityAndTypeMulti(ctx context.Context, orgID valuer.UUID, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string]metrictypes.Temporality, map[string]metrictypes.Type, map[string]bool, error)
FetchTemporalityAndTypeMulti(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string]metrictypes.Temporality, map[string]metrictypes.Type, error)
// ListLogsJSONIndexes lists the JSON indexes for the logs table.
ListLogsJSONIndexes(ctx context.Context, filters ...string) ([]TelemetryFieldKeySkipIndex, error)

View File

@@ -6,7 +6,6 @@ import (
"github.com/SigNoz/signoz/pkg/types/metrictypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
// MockMetadataStore implements the MetadataStore interface for testing purposes.
@@ -17,7 +16,6 @@ type MockMetadataStore struct {
AllValuesMap map[string]*telemetrytypes.TelemetryFieldValues
TemporalityMap map[string]metrictypes.Temporality
TypeMap map[string]metrictypes.Type
ReducedMap map[string]bool
PromotedPathsMap map[string]bool
LogsJSONIndexes []telemetrytypes.TelemetryFieldKeySkipIndex
ColumnEvolutionMetadataMap map[string][]*telemetrytypes.EvolutionEntry
@@ -308,7 +306,7 @@ func (m *MockMetadataStore) SetAllValues(lookupKey string, values *telemetrytype
}
// FetchTemporality fetches the temporality for a metric.
func (m *MockMetadataStore) FetchTemporality(ctx context.Context, orgID valuer.UUID, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricName string) (metrictypes.Temporality, error) {
func (m *MockMetadataStore) FetchTemporality(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricName string) (metrictypes.Temporality, error) {
if temporality, exists := m.TemporalityMap[metricName]; exists {
return temporality, nil
}
@@ -316,7 +314,7 @@ func (m *MockMetadataStore) FetchTemporality(ctx context.Context, orgID valuer.U
}
// FetchTemporalityMulti fetches the temporality for multiple metrics.
func (m *MockMetadataStore) FetchTemporalityMulti(ctx context.Context, orgID valuer.UUID, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string]metrictypes.Temporality, error) {
func (m *MockMetadataStore) FetchTemporalityMulti(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string]metrictypes.Temporality, error) {
result := make(map[string]metrictypes.Temporality)
for _, metricName := range metricNames {
@@ -331,10 +329,9 @@ func (m *MockMetadataStore) FetchTemporalityMulti(ctx context.Context, orgID val
}
// FetchTemporalityMulti fetches the temporality for multiple metrics.
func (m *MockMetadataStore) FetchTemporalityAndTypeMulti(ctx context.Context, orgID valuer.UUID, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string]metrictypes.Temporality, map[string]metrictypes.Type, map[string]bool, error) {
func (m *MockMetadataStore) FetchTemporalityAndTypeMulti(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string]metrictypes.Temporality, map[string]metrictypes.Type, error) {
temporalities := make(map[string]metrictypes.Temporality)
types := make(map[string]metrictypes.Type)
reduced := make(map[string]bool)
for _, metricName := range metricNames {
if temporality, exists := m.TemporalityMap[metricName]; exists {
@@ -347,12 +344,9 @@ func (m *MockMetadataStore) FetchTemporalityAndTypeMulti(ctx context.Context, or
} else {
types[metricName] = metrictypes.UnspecifiedType
}
if m.ReducedMap[metricName] {
reduced[metricName] = true
}
}
return temporalities, types, reduced, nil
return temporalities, types, nil
}
// SetTemporality sets the temporality for a metric in the mock store.