Compare commits

..

4 Commits

Author SHA1 Message Date
Nikhil Soni
851d857807 feat: add implementation for aggregation store 2026-05-27 00:25:01 +05:30
Nikhil Soni
dd76af883a chore: update openapi specs 2026-05-26 16:15:43 +05:30
Nikhil Soni
412ec1813a chore: use v1 for aggregations api 2026-05-26 16:15:43 +05:30
Nikhil Soni
cb7b183171 feat: add endpoint & module for trace aggregations 2026-05-26 16:15:43 +05:30
11 changed files with 464 additions and 8 deletions

View File

@@ -5672,6 +5672,14 @@ components:
required:
- items
type: object
SpantypesGettableTraceAggregations:
properties:
aggregations:
items:
$ref: '#/components/schemas/SpantypesSpanAggregationResult'
nullable: true
type: array
type: object
SpantypesGettableWaterfallTrace:
properties:
aggregations:
@@ -5737,6 +5745,14 @@ components:
- name
- condition
type: object
SpantypesPostableTraceAggregations:
properties:
aggregations:
items:
$ref: '#/components/schemas/SpantypesSpanAggregation'
nullable: true
type: array
type: object
SpantypesPostableWaterfall:
properties:
aggregations:
@@ -11364,6 +11380,75 @@ paths:
summary: Test notification channel (deprecated)
tags:
- channels
/api/v1/traces/{traceID}/aggregations:
post:
deprecated: false
description: Computes span aggregations grouped by requested field.
operationId: GetTraceAggregations
parameters:
- in: path
name: traceID
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/SpantypesPostableTraceAggregations'
responses:
"200":
content:
application/json:
schema:
properties:
data:
$ref: '#/components/schemas/SpantypesGettableTraceAggregations'
status:
type: string
required:
- status
- data
type: object
description: OK
"400":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Bad Request
"401":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unauthorized
"403":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Forbidden
"404":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Not Found
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Internal Server Error
security:
- api_key:
- VIEWER
- tokenizer:
- VIEWER
summary: Get aggregations for a trace
tags:
- tracedetail
/api/v1/user:
get:
deprecated: false

View File

@@ -6775,6 +6775,13 @@ export interface SpantypesSpanAggregationResultDTO {
value?: SpantypesSpanAggregationResultDTOValue;
}
export interface SpantypesGettableTraceAggregationsDTO {
/**
* @type array,null
*/
aggregations?: SpantypesSpanAggregationResultDTO[] | null;
}
export type SpantypesWaterfallSpanDTOAttributesAnyOf = {
[key: string]: unknown;
};
@@ -7018,6 +7025,13 @@ export interface SpantypesSpanAggregationDTO {
field?: TelemetrytypesTelemetryFieldKeyDTO;
}
export interface SpantypesPostableTraceAggregationsDTO {
/**
* @type array,null
*/
aggregations?: SpantypesSpanAggregationDTO[] | null;
}
export interface SpantypesPostableWaterfallDTO {
/**
* @type array,null
@@ -8351,6 +8365,17 @@ export type UpdateSpanMapperPathParameters = {
groupId: string;
mapperId: string;
};
export type GetTraceAggregationsPathParameters = {
traceID: string;
};
export type GetTraceAggregations200 = {
data: SpantypesGettableTraceAggregationsDTO;
/**
* @type string
*/
status: string;
};
export type ListUsersDeprecated200 = {
/**
* @type array

View File

@@ -12,17 +12,120 @@ import type {
} from 'react-query';
import type {
GetTraceAggregations200,
GetTraceAggregationsPathParameters,
GetWaterfall200,
GetWaterfallPathParameters,
GetWaterfallV4200,
GetWaterfallV4PathParameters,
RenderErrorResponseDTO,
SpantypesPostableTraceAggregationsDTO,
SpantypesPostableWaterfallDTO,
} from '../sigNoz.schemas';
import { GeneratedAPIInstance } from '../../../generatedAPIInstance';
import type { ErrorType, BodyType } from '../../../generatedAPIInstance';
/**
* Computes span aggregations grouped by requested field.
* @summary Get aggregations for a trace
*/
export const getTraceAggregations = (
{ traceID }: GetTraceAggregationsPathParameters,
spantypesPostableTraceAggregationsDTO?: BodyType<SpantypesPostableTraceAggregationsDTO>,
signal?: AbortSignal,
) => {
return GeneratedAPIInstance<GetTraceAggregations200>({
url: `/api/v1/traces/${traceID}/aggregations`,
method: 'POST',
headers: { 'Content-Type': 'application/json' },
data: spantypesPostableTraceAggregationsDTO,
signal,
});
};
export const getGetTraceAggregationsMutationOptions = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof getTraceAggregations>>,
TError,
{
pathParams: GetTraceAggregationsPathParameters;
data?: BodyType<SpantypesPostableTraceAggregationsDTO>;
},
TContext
>;
}): UseMutationOptions<
Awaited<ReturnType<typeof getTraceAggregations>>,
TError,
{
pathParams: GetTraceAggregationsPathParameters;
data?: BodyType<SpantypesPostableTraceAggregationsDTO>;
},
TContext
> => {
const mutationKey = ['getTraceAggregations'];
const { mutation: mutationOptions } = options
? options.mutation &&
'mutationKey' in options.mutation &&
options.mutation.mutationKey
? options
: { ...options, mutation: { ...options.mutation, mutationKey } }
: { mutation: { mutationKey } };
const mutationFn: MutationFunction<
Awaited<ReturnType<typeof getTraceAggregations>>,
{
pathParams: GetTraceAggregationsPathParameters;
data?: BodyType<SpantypesPostableTraceAggregationsDTO>;
}
> = (props) => {
const { pathParams, data } = props ?? {};
return getTraceAggregations(pathParams, data);
};
return { mutationFn, ...mutationOptions };
};
export type GetTraceAggregationsMutationResult = NonNullable<
Awaited<ReturnType<typeof getTraceAggregations>>
>;
export type GetTraceAggregationsMutationBody =
| BodyType<SpantypesPostableTraceAggregationsDTO>
| undefined;
export type GetTraceAggregationsMutationError =
ErrorType<RenderErrorResponseDTO>;
/**
* @summary Get aggregations for a trace
*/
export const useGetTraceAggregations = <
TError = ErrorType<RenderErrorResponseDTO>,
TContext = unknown,
>(options?: {
mutation?: UseMutationOptions<
Awaited<ReturnType<typeof getTraceAggregations>>,
TError,
{
pathParams: GetTraceAggregationsPathParameters;
data?: BodyType<SpantypesPostableTraceAggregationsDTO>;
},
TContext
>;
}): UseMutationResult<
Awaited<ReturnType<typeof getTraceAggregations>>,
TError,
{
pathParams: GetTraceAggregationsPathParameters;
data?: BodyType<SpantypesPostableTraceAggregationsDTO>;
},
TContext
> => {
return useMutation(getGetTraceAggregationsMutationOptions(options));
};
/**
* Returns the waterfall view of spans for a given trace ID with tree structure, metadata, and windowed pagination
* @summary Get waterfall view for a trace

View File

@@ -1,4 +1,4 @@
import { forwardRef, type HTMLAttributes, type ReactNode } from 'react';
import type { HTMLAttributes, ReactNode } from 'react';
import cx from 'classnames';
import tableStyles from './TanStackTable.module.scss';
@@ -22,19 +22,21 @@ type WithDangerousHtml = BaseProps & {
export type TanStackTableTextProps = WithChildren | WithDangerousHtml;
const TanStackTableText = forwardRef<HTMLSpanElement, TanStackTableTextProps>(
({ children, className, dangerouslySetInnerHTML, ...rest }, ref) => (
function TanStackTableText({
children,
className,
dangerouslySetInnerHTML,
...rest
}: TanStackTableTextProps): JSX.Element {
return (
<span
ref={ref}
className={cx(tableStyles.tableCellText, className)}
dangerouslySetInnerHTML={dangerouslySetInnerHTML}
{...rest}
>
{children}
</span>
),
);
TanStackTableText.displayName = 'TanStackTableText';
);
}
export default TanStackTableText;

View File

@@ -48,5 +48,24 @@ func (provider *provider) addTraceDetailRoutes(router *mux.Router) error {
return err
}
if err := router.Handle("/api/v1/traces/{traceID}/aggregations", handler.New(
provider.authzMiddleware.ViewAccess(provider.traceDetailHandler.GetTraceAggregations),
handler.OpenAPIDef{
ID: "GetTraceAggregations",
Tags: []string{"tracedetail"},
Summary: "Get aggregations for a trace",
Description: "Computes span aggregations grouped by requested field.",
Request: new(spantypes.PostableTraceAggregations),
RequestContentType: "application/json",
Response: new(spantypes.GettableTraceAggregations),
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusOK,
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound},
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
},
)).Methods(http.MethodPost).GetError(); err != nil {
return err
}
return nil
}

View File

@@ -59,3 +59,24 @@ func (h *handler) GetWaterfallV4(rw http.ResponseWriter, r *http.Request) {
render.Success(rw, http.StatusOK, result)
}
func (h *handler) GetTraceAggregations(rw http.ResponseWriter, r *http.Request) {
req := new(spantypes.PostableTraceAggregations)
if err := binding.JSON.BindBody(r.Body, req); err != nil {
render.Error(rw, err)
return
}
if err := req.Validate(); err != nil {
render.Error(rw, err)
return
}
result, err := h.module.GetTraceAggregations(r.Context(), mux.Vars(r)["traceID"], req)
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusOK, result)
}

View File

@@ -105,6 +105,49 @@ func (m *module) getFullWaterfall(ctx context.Context, traceID string, summary *
return spantypes.NewGettableWaterfallTrace(waterfallTrace, selectedSpans, nil, true, nil), nil
}
func (m *module) GetTraceAggregations(ctx context.Context, traceID string, req *spantypes.PostableTraceAggregations) (*spantypes.GettableTraceAggregations, error) {
summary, err := m.store.GetTraceSummary(ctx, traceID)
if err != nil {
return nil, err
}
traceDurationNs := uint64(summary.End.UnixNano()) - uint64(summary.Start.UnixNano())
results := make([]spantypes.SpanAggregationResult, 0, len(req.Aggregations))
for _, agg := range req.Aggregations {
result := spantypes.SpanAggregationResult{Field: agg.Field, Aggregation: agg.Aggregation}
switch agg.Aggregation {
case spantypes.SpanAggregationSpanCount:
result.Value, err = m.store.GetSpanCountByField(ctx, traceID, summary, agg.Field)
if err != nil {
return nil, err
}
case spantypes.SpanAggregationDuration:
durationNs, err2 := m.store.GetSpanDurationByField(ctx, traceID, summary, agg.Field)
if err2 != nil {
return nil, err2
}
result.Value = make(map[string]uint64, len(durationNs))
for k, ns := range durationNs {
result.Value[k] = ns / 1_000_000
}
case spantypes.SpanAggregationExecutionTimePercentage:
durationNs, err2 := m.store.GetSpanDurationByField(ctx, traceID, summary, agg.Field)
if err2 != nil {
return nil, err2
}
result.Value = make(map[string]uint64, len(durationNs))
if traceDurationNs > 0 {
for k, ns := range durationNs {
result.Value[k] = ns * 100 / traceDurationNs
}
}
}
results = append(results, result)
}
return &spantypes.GettableTraceAggregations{Aggregations: results}, nil
}
// getWindowedWaterfall builds the waterfall tree with minimal data and then returns only a window of full spans.
func (m *module) getWindowedWaterfall(ctx context.Context, traceID, selectedSpanID string, uncollapsedSpans []string, start, end time.Time) (*spantypes.GettableWaterfallTrace, error) {
// Step 1: minimal fetch → build full tree → select visible window

View File

@@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"regexp"
"time"
sqlbuilder "github.com/huandu/go-sqlbuilder"
@@ -11,10 +12,33 @@ import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/types/spantypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
const colServiceName = `resource_string_service$$$$name` // $ gets escaped so $$$$ converts to $$.
// validFieldName only allows characters safe to embed as ClickHouse map subscript literals.
var validFieldName = regexp.MustCompile(`^[a-zA-Z0-9._\-]+$`)
// buildFieldExpr returns a ClickHouse SQL expression for the value of fieldKey per span.
func buildFieldExpr(fieldKey telemetrytypes.TelemetryFieldKey) (string, error) {
if !validFieldName.MatchString(fieldKey.Name) {
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid field name: %q", fieldKey.Name)
}
n := fieldKey.Name
switch fieldKey.FieldContext {
case telemetrytypes.FieldContextResource:
return fmt.Sprintf("resources_string['%s']", n), nil
case telemetrytypes.FieldContextAttribute:
return fmt.Sprintf(
"multiIf(mapContains(attributes_string,'%[1]s'),attributes_string['%[1]s'],"+
"mapContains(attributes_number,'%[1]s'),toString(attributes_number['%[1]s']),"+
"mapContains(attributes_bool,'%[1]s'),toString(attributes_bool['%[1]s']),'')",
n), nil
}
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported field context: %v", fieldKey.FieldContext)
}
type traceStore struct {
telemetryStore telemetrystore.TelemetryStore
}
@@ -97,6 +121,14 @@ func (s *traceStore) GetMinimalSpans(ctx context.Context, traceID string, start,
return spans, nil
}
func (s *traceStore) GetSpanCountByField(ctx context.Context, traceID string, summary *spantypes.TraceSummary, fieldKey telemetrytypes.TelemetryFieldKey) (map[string]uint64, error) {
return nil, nil
}
func (s *traceStore) GetSpanDurationByField(ctx context.Context, traceID string, summary *spantypes.TraceSummary, fieldKey telemetrytypes.TelemetryFieldKey) (map[string]uint64, error) {
return nil, nil
}
func (s *traceStore) GetTraceSpansByIDs(ctx context.Context, traceID string, start, end time.Time, spanIDs []string) ([]spantypes.StorableSpan, error) {
if len(spanIDs) == 0 {
return []spantypes.StorableSpan{}, nil
@@ -133,3 +165,96 @@ func (s *traceStore) GetTraceSpansByIDs(ctx context.Context, traceID string, sta
}
return spans, nil
}
type spanCountRow struct {
FieldValue string `ch:"field_value"`
Count uint64 `ch:"count"`
}
func (s *traceStore) GetSpanCountByField(ctx context.Context, traceID string, summary *spantypes.TraceSummary, fieldKey telemetrytypes.TelemetryFieldKey) (map[string]uint64, error) {
fieldExpr, err := buildFieldExpr(fieldKey)
if err != nil {
return nil, err
}
query := fmt.Sprintf(`
SELECT %[1]s AS field_value, count() AS count
FROM %[2]s.%[3]s
WHERE trace_id=? AND ts_bucket_start>=? AND ts_bucket_start<=?
AND %[1]s != ''
GROUP BY field_value`,
fieldExpr, spantypes.TraceDB, spantypes.TraceTable,
)
var rows []spanCountRow
if err := s.telemetryStore.ClickhouseDB().Select(ctx, &rows, query,
traceID, summary.Start.Unix()-1800, summary.End.Unix(),
); err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "error querying span count by field")
}
result := make(map[string]uint64, len(rows))
for _, r := range rows {
result[r.FieldValue] = r.Count
}
return result, nil
}
type spanDurationRow struct {
FieldValue string `ch:"field_value"`
TotalNs uint64 `ch:"total_ns"`
}
func (s *traceStore) GetSpanDurationByField(ctx context.Context, traceID string, summary *spantypes.TraceSummary, fieldKey telemetrytypes.TelemetryFieldKey) (map[string]uint64, error) {
fieldExpr, err := buildFieldExpr(fieldKey)
if err != nil {
return nil, err
}
// 4-level query: deduplicate → window function → non-overlapping per span → sum per field.
// The window function computes the running max end time of preceding spans in the same
// field partition (ordered by start), so each span only contributes its non-overlapping
// tail — matching the Go-side mergeSpanIntervals semantics in V3.
query := fmt.Sprintf(`
SELECT field_value, sum(non_overlapping_ns) AS total_ns
FROM (
SELECT
field_value,
multiIf(
start_ns >= prev_max_end_ns, duration_nano,
start_ns + duration_nano > prev_max_end_ns, start_ns + duration_nano - prev_max_end_ns,
0
) AS non_overlapping_ns
FROM (
SELECT
field_value,
start_ns,
duration_nano,
ifNull(max(start_ns + duration_nano) OVER (
PARTITION BY field_value
ORDER BY start_ns
ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING
), 0) AS prev_max_end_ns
FROM (
SELECT DISTINCT ON (span_id)
%[1]s AS field_value,
toUnixTimestamp64Nano(timestamp) AS start_ns,
duration_nano
FROM %[2]s.%[3]s
WHERE trace_id=? AND ts_bucket_start>=? AND ts_bucket_start<=?
ORDER BY toUnixTimestamp64Nano(timestamp) ASC, name ASC
)
WHERE field_value != ''
)
)
GROUP BY field_value`,
fieldExpr, spantypes.TraceDB, spantypes.TraceTable,
)
var rows []spanDurationRow
if err := s.telemetryStore.ClickhouseDB().Select(ctx, &rows, query,
traceID, summary.Start.Unix()-1800, summary.End.Unix(),
); err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "error querying span duration by field")
}
result := make(map[string]uint64, len(rows))
for _, r := range rows {
result[r.FieldValue] = r.TotalNs
}
return result, nil
}

View File

@@ -11,10 +11,12 @@ import (
type Handler interface {
GetWaterfall(http.ResponseWriter, *http.Request)
GetWaterfallV4(http.ResponseWriter, *http.Request)
GetTraceAggregations(http.ResponseWriter, *http.Request)
}
// Module defines the business logic for trace detail operations.
type Module interface {
GetWaterfall(ctx context.Context, traceID string, req *spantypes.PostableWaterfall) (*spantypes.GettableWaterfallTrace, error)
GetWaterfallV4(ctx context.Context, traceID string, selectedSpanID string, uncollapsedSpans []string, selectAllLimit uint) (*spantypes.GettableWaterfallTrace, error)
GetTraceAggregations(ctx context.Context, traceID string, req *spantypes.PostableTraceAggregations) (*spantypes.GettableTraceAggregations, error)
}

View File

@@ -48,3 +48,30 @@ func (SpanAggregationType) Enum() []any {
func (s SpanAggregationType) isValid() bool {
return slices.ContainsFunc(s.Enum(), func(v any) bool { return v == s })
}
// PostableTraceAggregations is the request body for the V4 aggregations endpoint.
type PostableTraceAggregations struct {
Aggregations []SpanAggregation `json:"aggregations"`
}
func (p *PostableTraceAggregations) Validate() error {
if len(p.Aggregations) > maxAggregationItems {
return ErrTooManyAggregationItems
}
for _, a := range p.Aggregations {
if !a.Aggregation.isValid() {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "unknown aggregation type: %q", a.Aggregation)
}
fc := a.Field.FieldContext
if fc != telemetrytypes.FieldContextResource && fc != telemetrytypes.FieldContextAttribute {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "aggregation field context must be %q or %q, got %q",
telemetrytypes.FieldContextResource, telemetrytypes.FieldContextAttribute, fc)
}
}
return nil
}
// GettableTraceAggregations is the response for the V4 aggregations endpoint.
type GettableTraceAggregations struct {
Aggregations []SpanAggregationResult `json:"aggregations"`
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"time"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
@@ -29,4 +30,7 @@ type TraceStore interface {
GetTraceSpans(ctx context.Context, traceID string, summary *TraceSummary) ([]StorableSpan, error)
GetMinimalSpans(ctx context.Context, traceID string, start, end time.Time) ([]MinimalSpan, error)
GetTraceSpansByIDs(ctx context.Context, traceID string, start, end time.Time, spanIDs []string) ([]StorableSpan, error)
GetSpanCountByField(ctx context.Context, traceID string, summary *TraceSummary, fieldKey telemetrytypes.TelemetryFieldKey) (map[string]uint64, error)
GetSpanDurationByField(ctx context.Context, traceID string, summary *TraceSummary, fieldKey telemetrytypes.TelemetryFieldKey) (map[string]uint64, error)
}