Compare commits

..

7 Commits

Author SHA1 Message Date
Srikanth Chekuri
b996b5812c Merge branch 'main' into issue_4967 2026-06-01 23:04:09 +05:30
Nikhil Soni
5e94f7ac6e feat(trace-details): Add API endpoint & module for trace aggregations (#11452)
Some checks are pending
build-staging / prepare (push) Waiting to run
build-staging / js-build (push) Blocked by required conditions
build-staging / go-build (push) Blocked by required conditions
build-staging / staging (push) Blocked by required conditions
Release Drafter / update_release_draft (push) Waiting to run
* feat: add endpoint & module for trace aggregations

* chore: use v1 for aggregations api

* chore: update openapi specs

* feat: add implementation for aggregation store

* chore: use query builder for count by field

* chore: remove support for aggregations on attributes

Only supporting resource fields for now, since this
is not user input but hard coded client experience

* chore: extract out inner query as cte

* chore: again extract out inner query

* chore: move end_ns computation to first cte for simplicity

* chore: use notEmpty function instead of having

* fix: type cast issue in sum function

* chore: use query builder for duration query as well

* chore: add tests for trace store sql

* chore: remove unnecessary column checks

* chore: format the expected sql queries

* fix: formating was breaking test

* fix: change import and remove formating from sql

* chore: remove formating from store impl as well

* fix: mark required fields as required

* fix: explicitly mark nullable false for required field

* fix: mark required fields in response as well
2026-06-01 15:32:09 +00:00
Gaurav Tewari
387ad06c2d fix: tag container styles in qb (#11503)
Co-authored-by: Gaurav Tewari <tewarig@users.noreply.github.com>
2026-06-01 14:09:06 +00:00
Nityananda Gohain
6603fe358f Merge branch 'main' into issue_4967 2026-05-19 16:02:34 +05:30
nityanandagohain
a6f97dbd89 fix: cleanup 2026-05-19 16:01:15 +05:30
nityanandagohain
bc2492ccec fix: more fixes 2026-05-19 15:41:53 +05:30
nityanandagohain
8afb78b17b fix: add adjustkeys in trace operator cte builder 2026-05-19 01:05:26 +05:30
21 changed files with 1094 additions and 148 deletions

View File

@@ -6525,6 +6525,15 @@ components:
required:
- items
type: object
SpantypesGettableTraceAggregations:
properties:
aggregations:
items:
$ref: '#/components/schemas/SpantypesSpanAggregationResult'
type: array
required:
- aggregations
type: object
SpantypesGettableWaterfallTrace:
properties:
aggregations:
@@ -6590,6 +6599,15 @@ components:
- name
- condition
type: object
SpantypesPostableTraceAggregations:
properties:
aggregations:
items:
$ref: '#/components/schemas/SpantypesSpanAggregation'
type: array
required:
- aggregations
type: object
SpantypesPostableWaterfall:
properties:
aggregations:
@@ -6614,6 +6632,9 @@ components:
$ref: '#/components/schemas/SpantypesSpanAggregationType'
field:
$ref: '#/components/schemas/TelemetrytypesTelemetryFieldKey'
required:
- field
- aggregation
type: object
SpantypesSpanAggregationResult:
properties:
@@ -6627,6 +6648,10 @@ components:
type: integer
nullable: true
type: object
required:
- field
- aggregation
- value
type: object
SpantypesSpanAggregationType:
enum:
@@ -12265,6 +12290,75 @@ paths:
summary: Test notification channel (deprecated)
tags:
- channels
/api/v1/traces/{traceID}/aggregations:
post:
deprecated: false
description: Computes span aggregations grouped by requested field.
operationId: GetTraceAggregations
parameters:
- in: path
name: traceID
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/SpantypesPostableTraceAggregations'
responses:
"200":
content:
application/json:
schema:
properties:
data:
$ref: '#/components/schemas/SpantypesGettableTraceAggregations'
status:
type: string
required:
- status
- data
type: object
description: OK
"400":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Bad Request
"401":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unauthorized
"403":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Forbidden
"404":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Not Found
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Internal Server Error
security:
- api_key:
- VIEWER
- tokenizer:
- VIEWER
summary: Get aggregations for a trace
tags:
- tracedetail
/api/v1/user:
get:
deprecated: false

View File

@@ -7753,12 +7753,19 @@ export type SpantypesSpanAggregationResultDTOValue =
SpantypesSpanAggregationResultDTOValueAnyOf | null;
export interface SpantypesSpanAggregationResultDTO {
aggregation?: SpantypesSpanAggregationTypeDTO;
field?: TelemetrytypesTelemetryFieldKeyDTO;
aggregation: SpantypesSpanAggregationTypeDTO;
field: TelemetrytypesTelemetryFieldKeyDTO;
/**
* @type object,null
*/
value?: SpantypesSpanAggregationResultDTOValue;
value: SpantypesSpanAggregationResultDTOValue;
}
export interface SpantypesGettableTraceAggregationsDTO {
/**
* @type array
*/
aggregations: SpantypesSpanAggregationResultDTO[];
}
export type SpantypesWaterfallSpanDTOAttributesAnyOf = {
@@ -8000,8 +8007,15 @@ export interface SpantypesPostableSpanMapperGroupDTO {
}
export interface SpantypesSpanAggregationDTO {
aggregation?: SpantypesSpanAggregationTypeDTO;
field?: TelemetrytypesTelemetryFieldKeyDTO;
aggregation: SpantypesSpanAggregationTypeDTO;
field: TelemetrytypesTelemetryFieldKeyDTO;
}
export interface SpantypesPostableTraceAggregationsDTO {
/**
* @type array
*/
aggregations: SpantypesSpanAggregationDTO[];
}
export interface SpantypesPostableWaterfallDTO {
@@ -9344,6 +9358,17 @@ export type UpdateSpanMapperPathParameters = {
groupId: string;
mapperId: string;
};
export type GetTraceAggregationsPathParameters = {
traceID: string;
};
export type GetTraceAggregations200 = {
data: SpantypesGettableTraceAggregationsDTO;
/**
* @type string
*/
status: string;
};
export type ListUsersDeprecated200 = {
/**
* @type array

View File

@@ -12,17 +12,120 @@ import type {
} from 'react-query';
import type {
GetTraceAggregations200,
GetTraceAggregationsPathParameters,
GetWaterfall200,
GetWaterfallPathParameters,
GetWaterfallV4200,
GetWaterfallV4PathParameters,
RenderErrorResponseDTO,
SpantypesPostableTraceAggregationsDTO,
SpantypesPostableWaterfallDTO,
} from '../sigNoz.schemas';
import { GeneratedAPIInstance } from '../../../generatedAPIInstance';
import type { ErrorType, BodyType } from '../../../generatedAPIInstance';
/**
* Computes span aggregations grouped by requested field.
* @summary Get aggregations for a trace
*/
export const getTraceAggregations = (
{ traceID }: GetTraceAggregationsPathParameters,
spantypesPostableTraceAggregationsDTO?: BodyType<SpantypesPostableTraceAggregationsDTO>,
signal?: AbortSignal,
) => {
return GeneratedAPIInstance<GetTraceAggregations200>({
url: `/api/v1/traces/${traceID}/aggregations`,
method: 'POST',
headers: { 'Content-Type': 'application/json' },
data: spantypesPostableTraceAggregationsDTO,
signal,
});
};
export const getGetTraceAggregationsMutationOptions = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof getTraceAggregations>>,
TError,
{
pathParams: GetTraceAggregationsPathParameters;
data?: BodyType<SpantypesPostableTraceAggregationsDTO>;
},
TContext
>;
}): UseMutationOptions<
Awaited<ReturnType<typeof getTraceAggregations>>,
TError,
{
pathParams: GetTraceAggregationsPathParameters;
data?: BodyType<SpantypesPostableTraceAggregationsDTO>;
},
TContext
> => {
const mutationKey = ['getTraceAggregations'];
const { mutation: mutationOptions } = options
? options.mutation &&
'mutationKey' in options.mutation &&
options.mutation.mutationKey
? options
: { ...options, mutation: { ...options.mutation, mutationKey } }
: { mutation: { mutationKey } };
const mutationFn: MutationFunction<
Awaited<ReturnType<typeof getTraceAggregations>>,
{
pathParams: GetTraceAggregationsPathParameters;
data?: BodyType<SpantypesPostableTraceAggregationsDTO>;
}
> = (props) => {
const { pathParams, data } = props ?? {};
return getTraceAggregations(pathParams, data);
};
return { mutationFn, ...mutationOptions };
};
export type GetTraceAggregationsMutationResult = NonNullable<
Awaited<ReturnType<typeof getTraceAggregations>>
>;
export type GetTraceAggregationsMutationBody =
| BodyType<SpantypesPostableTraceAggregationsDTO>
| undefined;
export type GetTraceAggregationsMutationError =
ErrorType<RenderErrorResponseDTO>;
/**
* @summary Get aggregations for a trace
*/
export const useGetTraceAggregations = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof getTraceAggregations>>,
TError,
{
pathParams: GetTraceAggregationsPathParameters;
data?: BodyType<SpantypesPostableTraceAggregationsDTO>;
},
TContext
>;
}): UseMutationResult<
Awaited<ReturnType<typeof getTraceAggregations>>,
TError,
{
pathParams: GetTraceAggregationsPathParameters;
data?: BodyType<SpantypesPostableTraceAggregationsDTO>;
},
TContext
> => {
return useMutation(getGetTraceAggregationsMutationOptions(options));
};
/**
* Returns the waterfall view of spans for a given trace ID with tree structure, metadata, and windowed pagination
* @summary Get waterfall view for a trace

View File

@@ -56,7 +56,7 @@ function MetricDetails({
);
const metadata = useMemo(() => {
if (!metricMetadataResponse?.data) {
if (!metricMetadataResponse) {
return null;
}
const { type, description, unit, temporality, isMonotonic } =

View File

@@ -9,6 +9,19 @@
width: 0.2rem;
height: 0.2rem;
}
.option-value {
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
min-width: 0;
}
.option-meta-data-container {
display: flex;
gap: 8px;
flex-shrink: 0;
}
}
.option-renderer-tooltip {

View File

@@ -24,7 +24,7 @@ export const StyledCheckOutlined = styled(Check)`
export const TagContainer = styled(Badge)`
&&& {
display: inline-block;
display: flex;
border-radius: 3px;
padding: 0.1rem 0.2rem;
font-weight: 300;

View File

@@ -48,5 +48,24 @@ func (provider *provider) addTraceDetailRoutes(router *mux.Router) error {
return err
}
if err := router.Handle("/api/v1/traces/{traceID}/aggregations", handler.New(
provider.authzMiddleware.ViewAccess(provider.traceDetailHandler.GetTraceAggregations),
handler.OpenAPIDef{
ID: "GetTraceAggregations",
Tags: []string{"tracedetail"},
Summary: "Get aggregations for a trace",
Description: "Computes span aggregations grouped by requested field.",
Request: new(spantypes.PostableTraceAggregations),
RequestContentType: "application/json",
Response: new(spantypes.GettableTraceAggregations),
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusOK,
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound},
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
},
)).Methods(http.MethodPost).GetError(); err != nil {
return err
}
return nil
}

View File

@@ -59,3 +59,24 @@ func (h *handler) GetWaterfallV4(rw http.ResponseWriter, r *http.Request) {
render.Success(rw, http.StatusOK, result)
}
func (h *handler) GetTraceAggregations(rw http.ResponseWriter, r *http.Request) {
req := new(spantypes.PostableTraceAggregations)
if err := binding.JSON.BindBody(r.Body, req); err != nil {
render.Error(rw, err)
return
}
if err := req.Validate(); err != nil {
render.Error(rw, err)
return
}
result, err := h.module.GetTraceAggregations(r.Context(), mux.Vars(r)["traceID"], req)
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusOK, result)
}

View File

@@ -105,6 +105,49 @@ func (m *module) getFullWaterfall(ctx context.Context, traceID string, summary *
return spantypes.NewGettableWaterfallTrace(waterfallTrace, selectedSpans, nil, true, nil), nil
}
func (m *module) GetTraceAggregations(ctx context.Context, traceID string, req *spantypes.PostableTraceAggregations) (*spantypes.GettableTraceAggregations, error) {
summary, err := m.store.GetTraceSummary(ctx, traceID)
if err != nil {
return nil, err
}
traceDurationNs := uint64(summary.End.UnixNano()) - uint64(summary.Start.UnixNano())
results := make([]spantypes.SpanAggregationResult, 0, len(req.Aggregations))
for _, agg := range req.Aggregations {
result := spantypes.SpanAggregationResult{Field: agg.Field, Aggregation: agg.Aggregation}
switch agg.Aggregation {
case spantypes.SpanAggregationSpanCount:
result.Value, err = m.store.GetSpanCountByField(ctx, traceID, summary, agg.Field)
if err != nil {
return nil, err
}
case spantypes.SpanAggregationDuration:
durationNs, err2 := m.store.GetSpanDurationByField(ctx, traceID, summary, agg.Field)
if err2 != nil {
return nil, err2
}
result.Value = make(map[string]uint64, len(durationNs))
for k, ns := range durationNs {
result.Value[k] = ns / 1_000_000
}
case spantypes.SpanAggregationExecutionTimePercentage:
durationNs, err2 := m.store.GetSpanDurationByField(ctx, traceID, summary, agg.Field)
if err2 != nil {
return nil, err2
}
result.Value = make(map[string]uint64, len(durationNs))
if traceDurationNs > 0 {
for k, ns := range durationNs {
result.Value[k] = ns * 100 / traceDurationNs
}
}
}
results = append(results, result)
}
return &spantypes.GettableTraceAggregations{Aggregations: results}, nil
}
// getWindowedWaterfall builds the waterfall tree with minimal data and then returns only a window of full spans.
func (m *module) getWindowedWaterfall(ctx context.Context, traceID, selectedSpanID string, uncollapsedSpans []string, start, end time.Time) (*spantypes.GettableWaterfallTrace, error) {
// Step 1: minimal fetch → build full tree → select visible window

View File

@@ -11,10 +11,30 @@ import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/types/spantypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
const colServiceName = `resource_string_service$$$$name` // $ gets escaped so $$$$ converts to $$.
func buildFieldExpr(fieldKey telemetrytypes.TelemetryFieldKey) (string, error) {
switch fieldKey.FieldContext {
case telemetrytypes.FieldContextResource:
// String cast required — Variant/Dynamic is rejected by GROUP BY.
return fmt.Sprintf("resource.`%s`::String", fieldKey.Name), nil
}
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported field context: %v", fieldKey.FieldContext)
}
type spanCountRow struct {
FieldValue string `ch:"field_value"`
Count uint64 `ch:"count"`
}
type spanDurationRow struct {
FieldValue string `ch:"field_value"`
TotalNs uint64 `ch:"total_ns"`
}
type traceStore struct {
telemetryStore telemetrystore.TelemetryStore
}
@@ -133,3 +153,85 @@ func (s *traceStore) GetTraceSpansByIDs(ctx context.Context, traceID string, sta
}
return spans, nil
}
func (s *traceStore) GetSpanCountByField(ctx context.Context, traceID string, summary *spantypes.TraceSummary, fieldKey telemetrytypes.TelemetryFieldKey) (map[string]uint64, error) {
fieldExpr, err := buildFieldExpr(fieldKey)
if err != nil {
return nil, err
}
sb := sqlbuilder.NewSelectBuilder()
sb.Select(fieldExpr+" AS field_value", "count(DISTINCT span_id) AS count")
sb.From(fmt.Sprintf("%s.%s", spantypes.TraceDB, spantypes.TraceTable))
sb.Where(
sb.E("trace_id", traceID),
sb.GE("ts_bucket_start", summary.Start.Unix()-1800),
sb.LE("ts_bucket_start", summary.End.Unix()),
"notEmpty("+fieldExpr+")",
)
sb.GroupBy("field_value")
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
var rows []spanCountRow
if err := s.telemetryStore.ClickhouseDB().Select(ctx, &rows, query, args...); err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "error querying span count by field")
}
result := make(map[string]uint64, len(rows))
for _, r := range rows {
result[r.FieldValue] = r.Count
}
return result, nil
}
func (s *traceStore) GetSpanDurationByField(ctx context.Context, traceID string, summary *spantypes.TraceSummary, fieldKey telemetrytypes.TelemetryFieldKey) (map[string]uint64, error) {
fieldExpr, err := buildFieldExpr(fieldKey)
if err != nil {
return nil, err
}
// CTE 1: all span with start and end timestamps.
allSpansSB := sqlbuilder.NewSelectBuilder()
allSpansSB.Select(
"DISTINCT ON (span_id) "+fieldExpr+" AS field_value",
"toUnixTimestamp64Nano(timestamp) AS start_ns",
"start_ns + duration_nano AS end_ns",
)
allSpansSB.From(fmt.Sprintf("%s.%s", spantypes.TraceDB, spantypes.TraceTable))
allSpansSB.Where(
allSpansSB.E("trace_id", traceID),
allSpansSB.GE("ts_bucket_start", summary.Start.Unix()-1800),
allSpansSB.LE("ts_bucket_start", summary.End.Unix()),
"notEmpty(field_value)",
)
allSpansSB.OrderByAsc("timestamp")
allSpansSB.OrderByAsc("name")
// CTE 2: find max end time of all preceding spans.
effectiveStartSB := sqlbuilder.NewSelectBuilder()
effectiveStartSB.Select(
"field_value", "end_ns",
"greatest(start_ns, ifNull(max(end_ns) OVER (PARTITION BY field_value ORDER BY start_ns ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), toUInt64(0))) AS effective_start_ns",
)
effectiveStartSB.From("all_spans")
// Final SELECT: each span contributes only the tail past its effective start.
sb := sqlbuilder.With(
sqlbuilder.CTEQuery("all_spans").As(allSpansSB),
sqlbuilder.CTEQuery("effective_start").As(effectiveStartSB),
).Select(
"field_value",
"sum(toUInt64(greatest(end_ns - effective_start_ns, 0))) AS total_ns",
)
sb.From("effective_start")
sb.GroupBy("field_value")
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
var rows []spanDurationRow
if err := s.telemetryStore.ClickhouseDB().Select(ctx, &rows, query, args...); err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "error querying span duration by field")
}
result := make(map[string]uint64, len(rows))
for _, r := range rows {
result[r.FieldValue] = r.TotalNs
}
return result, nil
}

View File

@@ -0,0 +1,118 @@
package impltracedetail_test
import (
"context"
"regexp"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
cmock "github.com/SigNoz/clickhouse-go-mock"
"github.com/SigNoz/signoz/pkg/modules/tracedetail/impltracedetail"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
"github.com/SigNoz/signoz/pkg/types/spantypes"
"github.com/SigNoz/signoz/pkg/types/spantypes/spantypestest"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/stretchr/testify/assert"
)
var (
testTraceID = "trace-abc123"
testStart = time.Unix(1000, 0).UTC()
testEnd = time.Unix(2000, 0).UTC()
testSummary = &spantypes.TraceSummary{
TraceID: testTraceID,
Start: testStart,
End: testEnd,
NumSpans: 10,
}
svcNameField = telemetrytypes.TelemetryFieldKey{
Name: "service.name",
FieldContext: telemetrytypes.FieldContextResource,
}
unsupportedField = telemetrytypes.TelemetryFieldKey{
Name: "http.method",
FieldContext: telemetrytypes.FieldContextSpan,
}
)
func newTestStore(matcher sqlmock.QueryMatcher) *spantypestest.TraceStoreTest {
ts := telemetrystoretest.New(telemetrystore.Config{}, matcher)
return spantypestest.New(impltracedetail.NewTraceStore(ts), ts.Mock())
}
func TestGetTraceSummary(t *testing.T) {
expectedSQL := "SELECT trace_id, min(start) AS start, max(end) AS end, sum(num_spans) AS num_spans FROM signoz_traces.distributed_trace_summary WHERE trace_id = ? GROUP BY trace_id"
t.Run("ValidTraceID_GeneratesExpectedSQL", func(t *testing.T) {
s := newTestStore(sqlmock.QueryMatcherRegexp)
s.Mock().ExpectQueryRow(regexp.QuoteMeta(expectedSQL)).
WillReturnRow(cmock.NewRow(nil, nil))
_, _ = s.Store().GetTraceSummary(context.Background(), testTraceID)
assert.NoError(t, s.Mock().ExpectationsWereMet())
})
}
func TestGetMinimalSpans(t *testing.T) {
expectedSQL := "SELECT DISTINCT ON (span_id) span_id, parent_span_id, timestamp, duration_nano, has_error, resource_string_service$$name FROM signoz_traces.distributed_signoz_index_v3 WHERE trace_id = ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? ORDER BY timestamp ASC, name ASC"
t.Run("ValidRange_GeneratesExpectedSQL", func(t *testing.T) {
s := newTestStore(sqlmock.QueryMatcherRegexp)
s.Mock().ExpectSelect(regexp.QuoteMeta(expectedSQL)).
WillReturnRows(cmock.NewRows(nil, nil))
_, _ = s.Store().GetMinimalSpans(context.Background(), testTraceID, testStart, testEnd)
assert.NoError(t, s.Mock().ExpectationsWereMet())
})
}
func TestGetSpanCountByField(t *testing.T) {
expectedSQL := "SELECT resource.`service.name`::String AS field_value, count(DISTINCT span_id) AS count FROM signoz_traces.distributed_signoz_index_v3 WHERE trace_id = ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND notEmpty(resource.`service.name`::String) GROUP BY field_value"
tests := []struct {
name string
field telemetrytypes.TelemetryFieldKey
wantQuery bool
}{
{name: "ResourceField_GeneratesExpectedSQL", field: svcNameField, wantQuery: true},
{name: "NonResourceField_NoSQLGenerated", field: unsupportedField},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
s := newTestStore(sqlmock.QueryMatcherRegexp)
if tc.wantQuery {
s.Mock().ExpectSelect(regexp.QuoteMeta(expectedSQL)).
WillReturnRows(cmock.NewRows(nil, nil))
}
_, _ = s.Store().GetSpanCountByField(context.Background(), testTraceID, testSummary, tc.field)
assert.NoError(t, s.Mock().ExpectationsWereMet())
})
}
}
func TestGetSpanDurationByField(t *testing.T) {
expectedSQL := "WITH all_spans AS (SELECT DISTINCT ON (span_id) resource.`service.name`::String AS field_value, toUnixTimestamp64Nano(timestamp) AS start_ns, start_ns + duration_nano AS end_ns FROM signoz_traces.distributed_signoz_index_v3 WHERE trace_id = ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND notEmpty(field_value) ORDER BY timestamp ASC, name ASC), effective_start AS (SELECT field_value, end_ns, greatest(start_ns, ifNull(max(end_ns) OVER (PARTITION BY field_value ORDER BY start_ns ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), toUInt64(0))) AS effective_start_ns FROM all_spans) SELECT field_value, sum(toUInt64(greatest(end_ns - effective_start_ns, 0))) AS total_ns FROM effective_start GROUP BY field_value"
tests := []struct {
name string
field telemetrytypes.TelemetryFieldKey
wantQuery bool
}{
{name: "ResourceField_GeneratesExpectedSQL", field: svcNameField, wantQuery: true},
{name: "NonResourceField_NoSQLGenerated", field: unsupportedField},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
s := newTestStore(sqlmock.QueryMatcherRegexp)
if tc.wantQuery {
s.Mock().ExpectSelect(regexp.QuoteMeta(expectedSQL)).
WillReturnRows(cmock.NewRows(nil, nil))
}
_, _ = s.Store().GetSpanDurationByField(context.Background(), testTraceID, testSummary, tc.field)
assert.NoError(t, s.Mock().ExpectationsWereMet())
})
}
}

View File

@@ -11,10 +11,12 @@ import (
type Handler interface {
GetWaterfall(http.ResponseWriter, *http.Request)
GetWaterfallV4(http.ResponseWriter, *http.Request)
GetTraceAggregations(http.ResponseWriter, *http.Request)
}
// Module defines the business logic for trace detail operations.
type Module interface {
GetWaterfall(ctx context.Context, traceID string, req *spantypes.PostableWaterfall) (*spantypes.GettableWaterfallTrace, error)
GetWaterfallV4(ctx context.Context, traceID string, selectedSpanID string, uncollapsedSpans []string, selectAllLimit uint) (*spantypes.GettableWaterfallTrace, error)
GetTraceAggregations(ctx context.Context, traceID string, req *spantypes.PostableTraceAggregations) (*spantypes.GettableTraceAggregations, error)
}

View File

@@ -124,7 +124,7 @@ func (b *traceQueryStatementBuilder) Build(
-------------------------------- End of tech debt ----------------------------
*/
query = b.adjustKeys(ctx, keys, query, requestType)
adjustTraceKeys(ctx, b.logger, keys, &query, requestType)
// Create SQL builder
q := sqlbuilder.NewSelectBuilder()
@@ -193,24 +193,30 @@ func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation])
return keySelectors
}
func (b *traceQueryStatementBuilder) adjustKeys(ctx context.Context, keys map[string][]*telemetrytypes.TelemetryFieldKey, query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation], requestType qbtypes.RequestType) qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation] {
// add deprecated fields only during statement building
// why?
// 1. to not fail filter expression that use deprecated cols
// 2. this could have been moved to metadata fetching itself, however, that
// would mean, they also show up in suggestions we we don't want to do
// 3. reason for not doing a simple append is to keep intrinsic/calculated field first so that it gets
// priority in multi_if sql expression
// mergeDeprecatedTraceKeys prepends deprecated intrinsic/calculated trace field
// definitions to the keys map. We do this during statement building, not at
// metadata fetch time, because:
// 1. Filter expressions that reference deprecated columns must continue to
// resolve — otherwise they fail with "key not found".
// 2. Doing it at metadata fetch time would also surface deprecated keys in
// autocomplete suggestions, which we don't want.
// 3. We prepend (not append) so the intrinsic/calculated entry wins ordering
// in the multi_if SQL expression.
func mergeDeprecatedTraceKeys(keys map[string][]*telemetrytypes.TelemetryFieldKey) {
for fieldKeyName, fieldKey := range IntrinsicFieldsDeprecated {
keys[fieldKeyName] = append([]*telemetrytypes.TelemetryFieldKey{&fieldKey}, keys[fieldKeyName]...)
}
for fieldKeyName, fieldKey := range CalculatedFieldsDeprecated {
keys[fieldKeyName] = append([]*telemetrytypes.TelemetryFieldKey{&fieldKey}, keys[fieldKeyName]...)
}
}
func adjustTraceKeys(ctx context.Context, logger *slog.Logger, keys map[string][]*telemetrytypes.TelemetryFieldKey, query *qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation], requestType qbtypes.RequestType) {
mergeDeprecatedTraceKeys(keys)
// Adjust keys for alias expressions in aggregations
actions := querybuilder.AdjustKeysForAliasExpressions(&query, requestType)
actions := querybuilder.AdjustKeysForAliasExpressions(query, requestType)
/*
Check if user is using multiple contexts or data types for same field name
@@ -228,7 +234,7 @@ func (b *traceQueryStatementBuilder) adjustKeys(ctx context.Context, keys map[st
and make it just http.status_code and remove the duplicate entry.
*/
actions = append(actions, querybuilder.AdjustDuplicateKeys(&query)...)
actions = append(actions, querybuilder.AdjustDuplicateKeys(query)...)
/*
Now adjust each key to have correct context and data type
@@ -236,24 +242,23 @@ func (b *traceQueryStatementBuilder) adjustKeys(ctx context.Context, keys map[st
Reason for doing this is to not create an unexpected behavior for users
*/
for idx := range query.SelectFields {
actions = append(actions, b.adjustKey(&query.SelectFields[idx], keys)...)
actions = append(actions, adjustTraceKey(&query.SelectFields[idx], keys)...)
}
for idx := range query.GroupBy {
actions = append(actions, b.adjustKey(&query.GroupBy[idx].TelemetryFieldKey, keys)...)
actions = append(actions, adjustTraceKey(&query.GroupBy[idx].TelemetryFieldKey, keys)...)
}
for idx := range query.Order {
actions = append(actions, b.adjustKey(&query.Order[idx].Key.TelemetryFieldKey, keys)...)
actions = append(actions, adjustTraceKey(&query.Order[idx].Key.TelemetryFieldKey, keys)...)
}
for _, action := range actions {
// TODO: change to debug level once we are confident about the behavior
b.logger.InfoContext(ctx, "key adjustment action", slog.String("action", action))
logger.InfoContext(ctx, "key adjustment action", slog.String("action", action))
}
return query
}
func (b *traceQueryStatementBuilder) adjustKey(key *telemetrytypes.TelemetryFieldKey, keys map[string][]*telemetrytypes.TelemetryFieldKey) []string {
// adjustTraceKey resolves a single TelemetryFieldKey against the keys map.
func adjustTraceKey(key *telemetrytypes.TelemetryFieldKey, keys map[string][]*telemetrytypes.TelemetryFieldKey) []string {
// for recording actions taken
actions := []string{}

View File

@@ -1125,28 +1125,13 @@ func TestAdjustKey(t *testing.T) {
},
}
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
fl := flaggertest.New(t)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
statementBuilder := NewTraceQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
fm,
cb,
aggExprRewriter,
nil,
fl,
)
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
// Create a copy of the input key to avoid modifying the original
key := c.inputKey
// Call adjustKey
statementBuilder.adjustKey(&key, c.keysMap)
adjustTraceKey(&key, c.keysMap)
// Verify the key was adjusted as expected
require.Equal(t, c.expectedKey.Name, key.Name, "key name should match")
@@ -1424,7 +1409,7 @@ func TestAdjustKeys(t *testing.T) {
}
// Call adjustKeys
c.query = statementBuilder.adjustKeys(context.Background(), keysMapCopy, c.query, qbtypes.RequestTypeScalar)
adjustTraceKeys(context.Background(), statementBuilder.logger, keysMapCopy, &c.query, qbtypes.RequestTypeScalar)
// Verify select fields were adjusted
if c.expectedSelectFields != nil {

View File

@@ -216,6 +216,10 @@ func (b *traceOperatorCTEBuilder) buildQueryCTE(ctx context.Context, queryName s
}
b.stmtBuilder.logger.DebugContext(ctx, "Retrieved keys for query", slog.String("query_name", queryName), slog.Int("keys_count", len(keys)))
// The CTE only selects spans matching the filter. Aggregations, group by
// and order by run later in buildFinalQuery, so RequestTypeRaw is fine here.
adjustTraceKeys(ctx, b.stmtBuilder.logger, keys, query, qbtypes.RequestTypeRaw)
// Build resource filter CTE for this specific query
resourceFilterCTEName := fmt.Sprintf("__resource_filter_%s", cteName)
resourceStmt, err := b.buildResourceFilterCTE(ctx, *query)
@@ -417,21 +421,28 @@ func (b *traceOperatorCTEBuilder) buildNotCTE(leftCTE, rightCTE string) (string,
}
func (b *traceOperatorCTEBuilder) buildFinalQuery(ctx context.Context, selectFromCTE string, requestType qbtypes.RequestType) (*qbtypes.Statement, error) {
keySelectors := b.getKeySelectors()
keys, _, err := b.stmtBuilder.metadataStore.GetKeysMulti(ctx, keySelectors)
if err != nil {
return nil, err
}
b.adjustOperatorKeys(ctx, keys, requestType)
switch requestType {
case qbtypes.RequestTypeRaw:
return b.buildListQuery(ctx, selectFromCTE)
return b.buildListQuery(ctx, selectFromCTE, keys)
case qbtypes.RequestTypeTimeSeries:
return b.buildTimeSeriesQuery(ctx, selectFromCTE)
return b.buildTimeSeriesQuery(ctx, selectFromCTE, keys)
case qbtypes.RequestTypeTrace:
return b.buildTraceQuery(ctx, selectFromCTE)
return b.buildTraceQuery(ctx, selectFromCTE, keys)
case qbtypes.RequestTypeScalar:
return b.buildScalarQuery(ctx, selectFromCTE)
return b.buildScalarQuery(ctx, selectFromCTE, keys)
default:
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported request type: %s", requestType)
}
}
func (b *traceOperatorCTEBuilder) buildListQuery(ctx context.Context, selectFromCTE string) (*qbtypes.Statement, error) {
func (b *traceOperatorCTEBuilder) buildListQuery(ctx context.Context, selectFromCTE string, keys map[string][]*telemetrytypes.TelemetryFieldKey) (*qbtypes.Statement, error) {
sb := sqlbuilder.NewSelectBuilder()
// Select core fields
@@ -453,22 +464,6 @@ func (b *traceOperatorCTEBuilder) buildListQuery(ctx context.Context, selectFrom
"parent_span_id": true,
}
// Get keys for selectFields
keySelectors := b.getKeySelectors()
for _, field := range b.operator.SelectFields {
keySelectors = append(keySelectors, &telemetrytypes.FieldKeySelector{
Name: field.Name,
Signal: telemetrytypes.SignalTraces,
FieldContext: field.FieldContext,
FieldDataType: field.FieldDataType,
})
}
keys, _, err := b.stmtBuilder.metadataStore.GetKeysMulti(ctx, keySelectors)
if err != nil {
return nil, err
}
// Add selectFields using ColumnExpressionFor since we now have all base table columns
for _, field := range b.operator.SelectFields {
if selectedFields[field.Name] {
@@ -518,6 +513,45 @@ func (b *traceOperatorCTEBuilder) buildListQuery(ctx context.Context, selectFrom
}, nil
}
// adjustOperatorKeys runs the same key adjustments as adjustTraceKeys, but on
// the operator's own fields. The operator has a different struct shape than
// QueryBuilderQuery, so we copy the relevant fields into a temp query, run
// the shared helpers, and copy the results back.
func (b *traceOperatorCTEBuilder) adjustOperatorKeys(ctx context.Context, keys map[string][]*telemetrytypes.TelemetryFieldKey, requestType qbtypes.RequestType) {
mergeDeprecatedTraceKeys(keys)
tmp := qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Aggregations: b.operator.Aggregations,
SelectFields: b.operator.SelectFields,
GroupBy: b.operator.GroupBy,
Order: b.operator.Order,
}
actions := querybuilder.AdjustKeysForAliasExpressions(&tmp, requestType)
actions = append(actions, querybuilder.AdjustDuplicateKeys(&tmp)...)
for idx := range tmp.SelectFields {
actions = append(actions, adjustTraceKey(&tmp.SelectFields[idx], keys)...)
}
for idx := range tmp.GroupBy {
actions = append(actions, adjustTraceKey(&tmp.GroupBy[idx].TelemetryFieldKey, keys)...)
}
for idx := range tmp.Order {
actions = append(actions, adjustTraceKey(&tmp.Order[idx].Key.TelemetryFieldKey, keys)...)
}
// Copy back the three slices that the helpers above can rewrite
// (AdjustDuplicateKeys reconstructs them, adjustTraceKey mutates in place).
// Aggregations is only read by the helpers, never reassigned, so no copy-back.
b.operator.SelectFields = tmp.SelectFields
b.operator.GroupBy = tmp.GroupBy
b.operator.Order = tmp.Order
for _, action := range actions {
b.stmtBuilder.logger.InfoContext(ctx, "key adjustment action", slog.String("action", action))
}
}
func (b *traceOperatorCTEBuilder) getKeySelectors() []*telemetrytypes.FieldKeySelector {
var keySelectors []*telemetrytypes.FieldKeySelector
@@ -545,6 +579,15 @@ func (b *traceOperatorCTEBuilder) getKeySelectors() []*telemetrytypes.FieldKeySe
})
}
for _, sf := range b.operator.SelectFields {
keySelectors = append(keySelectors, &telemetrytypes.FieldKeySelector{
Name: sf.Name,
Signal: telemetrytypes.SignalTraces,
FieldContext: sf.FieldContext,
FieldDataType: sf.FieldDataType,
})
}
for i := range keySelectors {
keySelectors[i].Signal = telemetrytypes.SignalTraces
}
@@ -552,7 +595,7 @@ func (b *traceOperatorCTEBuilder) getKeySelectors() []*telemetrytypes.FieldKeySe
return keySelectors
}
func (b *traceOperatorCTEBuilder) buildTimeSeriesQuery(ctx context.Context, selectFromCTE string) (*qbtypes.Statement, error) {
func (b *traceOperatorCTEBuilder) buildTimeSeriesQuery(ctx context.Context, selectFromCTE string, keys map[string][]*telemetrytypes.TelemetryFieldKey) (*qbtypes.Statement, error) {
sb := sqlbuilder.NewSelectBuilder()
sb.Select(fmt.Sprintf(
@@ -560,12 +603,6 @@ func (b *traceOperatorCTEBuilder) buildTimeSeriesQuery(ctx context.Context, sele
int64(b.operator.StepInterval.Seconds()),
))
keySelectors := b.getKeySelectors()
keys, _, err := b.stmtBuilder.metadataStore.GetKeysMulti(ctx, keySelectors)
if err != nil {
return nil, err
}
var allGroupByArgs []any
for _, gb := range b.operator.GroupBy {
@@ -644,8 +681,7 @@ func (b *traceOperatorCTEBuilder) buildTimeSeriesQuery(ctx context.Context, sele
combinedArgs := append(allGroupByArgs, allAggChArgs...)
// Add HAVING clause if specified
err = b.addHavingClause(sb)
if err != nil {
if err := b.addHavingClause(sb); err != nil {
return nil, err
}
@@ -672,17 +708,11 @@ func (b *traceOperatorCTEBuilder) buildTraceSummaryCTE(selectFromCTE string) {
b.addCTE("trace_summary", sql, args, []string{"all_spans", selectFromCTE})
}
func (b *traceOperatorCTEBuilder) buildTraceQuery(ctx context.Context, selectFromCTE string) (*qbtypes.Statement, error) {
func (b *traceOperatorCTEBuilder) buildTraceQuery(ctx context.Context, selectFromCTE string, keys map[string][]*telemetrytypes.TelemetryFieldKey) (*qbtypes.Statement, error) {
b.buildTraceSummaryCTE(selectFromCTE)
sb := sqlbuilder.NewSelectBuilder()
keySelectors := b.getKeySelectors()
keys, _, err := b.stmtBuilder.metadataStore.GetKeysMulti(ctx, keySelectors)
if err != nil {
return nil, err
}
var allGroupByArgs []any
for _, gb := range b.operator.GroupBy {
@@ -764,8 +794,7 @@ func (b *traceOperatorCTEBuilder) buildTraceQuery(ctx context.Context, selectFro
sb.GroupBy(groupByKeys...)
}
err = b.addHavingClause(sb)
if err != nil {
if err := b.addHavingClause(sb); err != nil {
return nil, err
}
@@ -821,15 +850,9 @@ func (b *traceOperatorCTEBuilder) buildTraceQuery(ctx context.Context, selectFro
}, nil
}
func (b *traceOperatorCTEBuilder) buildScalarQuery(ctx context.Context, selectFromCTE string) (*qbtypes.Statement, error) {
func (b *traceOperatorCTEBuilder) buildScalarQuery(ctx context.Context, selectFromCTE string, keys map[string][]*telemetrytypes.TelemetryFieldKey) (*qbtypes.Statement, error) {
sb := sqlbuilder.NewSelectBuilder()
keySelectors := b.getKeySelectors()
keys, _, err := b.stmtBuilder.metadataStore.GetKeysMulti(ctx, keySelectors)
if err != nil {
return nil, err
}
var allGroupByArgs []any
for _, gb := range b.operator.GroupBy {
@@ -911,8 +934,7 @@ func (b *traceOperatorCTEBuilder) buildScalarQuery(ctx context.Context, selectFr
combinedArgs := append(allGroupByArgs, allAggChArgs...)
// Add HAVING clause if specified
err = b.addHavingClause(sb)
if err != nil {
if err := b.addHavingClause(sb); err != nil {
return nil, err
}

View File

@@ -2,6 +2,7 @@ package telemetrytraces
import (
"context"
"strings"
"testing"
"time"
@@ -14,6 +15,24 @@ import (
"github.com/stretchr/testify/require"
)
func newTestTraceOperatorStatementBuilder(t *testing.T) *traceOperatorStatementBuilder {
t.Helper()
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
fl := flaggertest.New(t)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
traceStmtBuilder := NewTraceQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore, fm, cb, aggExprRewriter, nil, fl,
)
return NewTraceOperatorStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore, fm, cb, traceStmtBuilder, aggExprRewriter, fl,
)
}
func TestTraceOperatorStatementBuilder(t *testing.T) {
cases := []struct {
name string
@@ -463,32 +482,7 @@ func TestTraceOperatorStatementBuilder(t *testing.T) {
},
}
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
fl := flaggertest.New(t)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
traceStmtBuilder := NewTraceQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
fm,
cb,
aggExprRewriter,
nil,
fl,
)
statementBuilder := NewTraceOperatorStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
fm,
cb,
traceStmtBuilder,
aggExprRewriter,
fl,
)
statementBuilder := newTestTraceOperatorStatementBuilder(t)
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
@@ -579,32 +573,7 @@ func TestTraceOperatorStatementBuilderErrors(t *testing.T) {
},
}
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
fl := flaggertest.New(t)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
traceStmtBuilder := NewTraceQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
fm,
cb,
aggExprRewriter,
nil,
fl,
)
statementBuilder := NewTraceOperatorStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
fm,
cb,
traceStmtBuilder,
aggExprRewriter,
fl,
)
statementBuilder := newTestTraceOperatorStatementBuilder(t)
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
@@ -626,3 +595,143 @@ func TestTraceOperatorStatementBuilderErrors(t *testing.T) {
})
}
}
func TestTraceOperatorStatementBuilderAdjustsKeys(t *testing.T) {
cases := []struct {
name string
requestType qbtypes.RequestType
operator qbtypes.QueryBuilderTraceOperator
builderFilter string
wantSQL string
wantArgs []any
}{
{
name: "deprecated duration filter in referenced builder query",
requestType: qbtypes.RequestTypeRaw,
operator: qbtypes.QueryBuilderTraceOperator{
Expression: "A",
Limit: 10,
},
builderFilter: "durationNano = '3s'",
wantSQL: "duration_nano = ?",
wantArgs: []any{int64(3000000000)},
},
{
name: "context-prefixed aggregation alias in order by",
requestType: qbtypes.RequestTypeScalar,
operator: qbtypes.QueryBuilderTraceOperator{
Expression: "A",
Aggregations: []qbtypes.TraceAggregation{
{
Expression: "count()",
Alias: "span.count_",
},
},
Order: []qbtypes.OrderBy{
{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "count_",
FieldContext: telemetrytypes.FieldContextSpan,
},
},
Direction: qbtypes.OrderDirectionDesc,
},
},
},
wantSQL: "ORDER BY __result_0 desc",
},
}
statementBuilder := newTestTraceOperatorStatementBuilder(t)
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
err := c.operator.ParseExpression()
require.NoError(t, err)
filter := c.builderFilter
if filter == "" {
filter = "service.name = 'frontend'"
}
q, err := statementBuilder.Build(
context.Background(),
1747947419000,
1747983448000,
c.requestType,
c.operator,
&qbtypes.CompositeQuery{
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Name: "A",
Signal: telemetrytypes.SignalTraces,
Filter: &qbtypes.Filter{Expression: filter},
},
},
},
},
)
require.NoError(t, err)
require.Contains(t, q.Query, c.wantSQL)
for _, arg := range c.wantArgs {
require.Contains(t, q.Args, arg)
}
})
}
}
// TestTraceOperatorStatementBuilderDeduplicatesKeys checks that a trace
// operator with the same field name listed twice in GroupBy (once with a
// context, once without) ends up with a single column in the outer SELECT
// and a single entry in GROUP BY.
func TestTraceOperatorStatementBuilderDeduplicatesKeys(t *testing.T) {
statementBuilder := newTestTraceOperatorStatementBuilder(t)
operator := qbtypes.QueryBuilderTraceOperator{
Expression: "A",
Aggregations: []qbtypes.TraceAggregation{
{Expression: "count()"},
},
GroupBy: []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "http.method",
FieldContext: telemetrytypes.FieldContextAttribute,
}},
// Same name, no context — should be merged with the entry above.
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "http.method",
}},
},
}
require.NoError(t, operator.ParseExpression())
q, err := statementBuilder.Build(
context.Background(),
1747947419000,
1747983448000,
qbtypes.RequestTypeScalar,
operator,
&qbtypes.CompositeQuery{
Queries: []qbtypes.QueryEnvelope{
{
Type: qbtypes.QueryTypeBuilder,
Spec: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Name: "A",
Signal: telemetrytypes.SignalTraces,
Filter: &qbtypes.Filter{Expression: "service.name = 'frontend'"},
},
},
},
},
)
require.NoError(t, err)
require.Equal(t, 1, strings.Count(q.Query, "AS `http.method`"),
"http.method should appear once in SELECT after dedup, got: %s", q.Query)
require.NotContains(t, q.Query, "`http.method`, `http.method`",
"GROUP BY should list http.method once after dedup, got: %s", q.Query)
}

View File

@@ -1,6 +1,7 @@
package spantypes
import (
"regexp"
"slices"
"github.com/SigNoz/signoz/pkg/errors"
@@ -8,6 +9,8 @@ import (
"github.com/SigNoz/signoz/pkg/valuer"
)
var validAggregationFieldName = regexp.MustCompile(`^[a-zA-Z0-9._\-]+$`)
const maxAggregationItems = 10
var ErrTooManyAggregationItems = errors.NewInvalidInputf(errors.CodeInvalidInput, "aggregations request exceeds maximum of %d items", maxAggregationItems)
@@ -25,16 +28,16 @@ var (
// SpanAggregation is a single aggregation request item: which field to group by and how.
type SpanAggregation struct {
Field telemetrytypes.TelemetryFieldKey `json:"field"`
Aggregation SpanAggregationType `json:"aggregation"`
Field telemetrytypes.TelemetryFieldKey `json:"field" required:"true" nullable:"false"`
Aggregation SpanAggregationType `json:"aggregation" required:"true" nullable:"false"`
}
// SpanAggregationResult is the computed result for one aggregation request item.
// Duration values are in milliseconds.
type SpanAggregationResult struct {
Field telemetrytypes.TelemetryFieldKey `json:"field"`
Aggregation SpanAggregationType `json:"aggregation"`
Value map[string]uint64 `json:"value" nullable:"true"`
Field telemetrytypes.TelemetryFieldKey `json:"field" required:"true" nullable:"false"`
Aggregation SpanAggregationType `json:"aggregation" required:"true" nullable:"false"`
Value map[string]uint64 `json:"value" required:"true" nullable:"true"`
}
func (SpanAggregationType) Enum() []any {
@@ -48,3 +51,35 @@ func (SpanAggregationType) Enum() []any {
func (s SpanAggregationType) isValid() bool {
return slices.ContainsFunc(s.Enum(), func(v any) bool { return v == s })
}
// PostableTraceAggregations is the request body for the V4 aggregations endpoint.
type PostableTraceAggregations struct {
Aggregations []SpanAggregation `json:"aggregations" required:"true" nullable:"false"`
}
func (p *PostableTraceAggregations) Validate() error {
if len(p.Aggregations) == 0 {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "aggregations is required and must not be empty")
}
if len(p.Aggregations) > maxAggregationItems {
return ErrTooManyAggregationItems
}
for _, a := range p.Aggregations {
if !a.Aggregation.isValid() {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "unknown aggregation type: %q", a.Aggregation)
}
if a.Field.FieldContext != telemetrytypes.FieldContextResource {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "aggregation field context must be %q, got %q",
telemetrytypes.FieldContextResource, a.Field.FieldContext)
}
if !validAggregationFieldName.MatchString(a.Field.Name) {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid field name: %q", a.Field.Name)
}
}
return nil
}
// GettableTraceAggregations is the response for the V4 aggregations endpoint.
type GettableTraceAggregations struct {
Aggregations []SpanAggregationResult `json:"aggregations" required:"true" nullable:"false"`
}

View File

@@ -0,0 +1,22 @@
package spantypestest
import (
cmock "github.com/SigNoz/clickhouse-go-mock"
"github.com/SigNoz/signoz/pkg/types/spantypes"
)
// TraceStoreTest pairs a TraceStore with the ClickHouse mock.
type TraceStoreTest struct {
store spantypes.TraceStore
mock cmock.ClickConnMockCommon
}
func New(store spantypes.TraceStore, mock cmock.ClickConnMockCommon) *TraceStoreTest {
return &TraceStoreTest{store: store, mock: mock}
}
// Store returns the TraceStore for calling methods under test.
func (t *TraceStoreTest) Store() spantypes.TraceStore { return t.store }
// Mock returns the ClickHouse mock for setting query expectations.
func (t *TraceStoreTest) Mock() cmock.ClickConnMockCommon { return t.mock }

View File

@@ -4,6 +4,7 @@ import (
"context"
"time"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
@@ -29,4 +30,7 @@ type TraceStore interface {
GetTraceSpans(ctx context.Context, traceID string, summary *TraceSummary) ([]StorableSpan, error)
GetMinimalSpans(ctx context.Context, traceID string, start, end time.Time) ([]MinimalSpan, error)
GetTraceSpansByIDs(ctx context.Context, traceID string, start, end time.Time, spanIDs []string) ([]StorableSpan, error)
GetSpanCountByField(ctx context.Context, traceID string, summary *TraceSummary, fieldKey telemetrytypes.TelemetryFieldKey) (map[string]uint64, error)
GetSpanDurationByField(ctx context.Context, traceID string, summary *TraceSummary, fieldKey telemetrytypes.TelemetryFieldKey) (map[string]uint64, error)
}

View File

@@ -459,6 +459,57 @@ def find_named_result(
)
def assert_scalar_value(
response: requests.Response,
name: str,
expected: Any,
*,
row: int = 0,
col: int = 0,
) -> None:
"""Assert that the named scalar result has `expected` at data[row][col]."""
result = find_named_result(response.json()["data"]["data"]["results"], name)
assert result is not None, f"no result for query {name}"
assert result["data"][row][col] == expected, f"expected {expected} at [{row}][{col}], got {result['data'][row][col]}"
def assert_grouped_scalar(
response: requests.Response,
name: str,
*,
expected_groups: int,
expected_columns: int,
last_col_value: Any | None = None,
) -> None:
"""Assert grouped scalar result has the expected column count and group count.
If `last_col_value` is set and there is exactly one group, also assert the
last column of that single row equals it (a common aggregation-value check)."""
result = find_named_result(response.json()["data"]["data"]["results"], name)
assert result is not None, f"no result for query {name}"
columns = result["columns"]
rows = result["data"]
assert len(columns) == expected_columns, f"expected {expected_columns} columns, got {len(columns)}: {columns}"
assert len(rows) == expected_groups, f"expected {expected_groups} groups, got {len(rows)}: {rows}"
if last_col_value is not None and expected_groups == 1:
assert rows[0][-1] == last_col_value, f"expected last col {last_col_value}, got row {rows[0]}"
def assert_raw_row_subset(
response: requests.Response,
name: str,
expected: dict[str, Any],
*,
row: int = 0,
) -> None:
"""Assert that the named raw result's rows[row]['data'] is a superset of `expected`."""
result = find_named_result(response.json()["data"]["data"]["results"], name)
assert result is not None, f"no result for query {name}"
rows = result["rows"]
assert rows is not None, f"no rows for query {name}"
data = rows[row]["data"]
assert expected.items() <= data.items(), f"expected subset {expected}, got data {data}"
def build_scalar_query(
name: str,
signal: str,

View File

@@ -9,8 +9,11 @@ import requests
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.querier import (
assert_grouped_scalar,
assert_identical_query_response,
assert_minutely_bucket_values,
assert_raw_row_subset,
assert_scalar_value,
find_named_result,
format_timestamp,
generate_traces_with_corrupt_metadata,
@@ -693,6 +696,176 @@ def test_traces_list_with_corrupt_data(
assert data[key] == value
def _expected_trace_subset(trace: Traces) -> dict[str, Any]:
return {
"duration_nano": trace.duration_nano,
"name": trace.name,
"parent_span_id": trace.parent_span_id,
"span_id": trace.span_id,
"timestamp": format_timestamp(trace.timestamp),
"trace_id": trace.trace_id,
}
@pytest.mark.parametrize(
"payload_factory,request_type,assert_result",
[
# Case 1: CTE filter uses the deprecated intrinsic field `durationNano`.
pytest.param(
lambda traces: [
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "traces",
"filter": {"expression": 'durationNano = "3s"'},
},
},
{
"type": "builder_query",
"spec": {
"name": "B",
"signal": "traces",
"filter": {"expression": 'durationNano = "5s"'},
},
},
{
"type": "builder_trace_operator",
"spec": {
"name": "C",
"expression": "A => B",
"limit": 1,
},
},
],
"raw",
lambda response, traces: assert_raw_row_subset(response, "C", _expected_trace_subset(traces[0])),
id="deprecated-intrinsic-filter",
),
# Case 2: CTE filter uses the deprecated calculated field `responseStatusCode`.
pytest.param(
lambda traces: [
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "traces",
"filter": {"expression": 'responseStatusCode = "200"'},
},
},
{
"type": "builder_query",
"spec": {
"name": "B",
"signal": "traces",
"filter": {"expression": 'durationNano = "5s"'},
},
},
{
"type": "builder_trace_operator",
"spec": {
"name": "C",
"expression": "A => B",
"limit": 1,
},
},
],
"raw",
lambda response, traces: assert_raw_row_subset(response, "C", _expected_trace_subset(traces[0])),
id="deprecated-calculated-filter",
),
# Case 3: order by uses `count_` with fieldContext `span`, which has
# to be rewritten to the aggregation alias `span.count_`.
pytest.param(
lambda traces: [
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "traces",
"aggregations": [{"expression": "count()"}],
},
},
{
"type": "builder_trace_operator",
"spec": {
"name": "C",
"expression": "A",
"aggregations": [{"expression": "count()", "alias": "span.count_"}],
"order": [{"key": {"name": "count_", "fieldContext": "span"}, "direction": "desc"}],
},
},
],
"scalar",
lambda response, traces: assert_scalar_value(response, "C", len(traces)),
id="context-prefixed-aggregation-alias-order",
),
# Case 4: group by lists `cloud.provider` twice (once with a resource
# context, once without).
pytest.param(
lambda traces: [
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "traces",
"disabled": True,
"aggregations": [{"expression": "count()"}],
},
},
{
"type": "builder_trace_operator",
"spec": {
"name": "C",
"expression": "A",
"aggregations": [{"expression": "count()"}],
"groupBy": [
{"name": "cloud.provider", "fieldContext": "resource"},
{"name": "cloud.provider"},
],
},
},
],
"scalar",
lambda response, traces: assert_grouped_scalar(response, "C", expected_groups=1, expected_columns=2, last_col_value=len(traces)),
id="duplicate-group-by-deduplicated",
),
],
)
def test_trace_operator_with_adjusted_keys(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[list[Traces]], None],
payload_factory: Callable[[list[Traces]], list[dict[str, Any]]],
request_type: str,
assert_result: Callable[[requests.Response, list[Traces]], None],
) -> None:
"""
Trace operators build a CTE per referenced builder query and an outer
query on top. Both layers need the same key adjustment as regular trace
queries, otherwise deprecated keys and context-prefixed aliases don't
resolve.
"""
traces = generate_traces_with_corrupt_metadata()
insert_traces(traces)
payload = payload_factory(traces)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
response = make_query_request(
signoz,
token,
start_ms=int((datetime.now(tz=UTC) - timedelta(minutes=5)).timestamp() * 1000),
end_ms=int(datetime.now(tz=UTC).timestamp() * 1000),
request_type=request_type,
queries=payload,
)
assert response.status_code == HTTPStatus.OK, response.text
assert_result(response, traces)
@pytest.mark.parametrize(
"order_by,aggregation_alias,expected_status",
[