mirror of
https://github.com/SigNoz/signoz.git
synced 2026-04-25 05:10:27 +01:00
Compare commits
7 Commits
main
...
ns/color-b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0cbd4e0a95 | ||
|
|
19254f84cc | ||
|
|
261a5888ad | ||
|
|
8f1aab8e40 | ||
|
|
cc8064c5a6 | ||
|
|
1df7795386 | ||
|
|
62bfb0d5bd |
@@ -4603,6 +4603,11 @@ components:
|
||||
type: object
|
||||
TracedetailtypesGettableWaterfallTrace:
|
||||
properties:
|
||||
aggregations:
|
||||
items:
|
||||
$ref: '#/components/schemas/TracedetailtypesSpanAggregationResult'
|
||||
nullable: true
|
||||
type: array
|
||||
endTimestampMillis:
|
||||
minimum: 0
|
||||
type: integer
|
||||
@@ -4642,6 +4647,11 @@ components:
|
||||
type: object
|
||||
TracedetailtypesPostableWaterfall:
|
||||
properties:
|
||||
aggregations:
|
||||
items:
|
||||
$ref: '#/components/schemas/TracedetailtypesSpanAggregation'
|
||||
nullable: true
|
||||
type: array
|
||||
limit:
|
||||
minimum: 0
|
||||
type: integer
|
||||
@@ -4653,6 +4663,32 @@ components:
|
||||
nullable: true
|
||||
type: array
|
||||
type: object
|
||||
TracedetailtypesSpanAggregation:
|
||||
properties:
|
||||
aggregation:
|
||||
$ref: '#/components/schemas/TracedetailtypesSpanAggregationType'
|
||||
field:
|
||||
$ref: '#/components/schemas/TelemetrytypesTelemetryFieldKey'
|
||||
type: object
|
||||
TracedetailtypesSpanAggregationResult:
|
||||
properties:
|
||||
aggregation:
|
||||
$ref: '#/components/schemas/TracedetailtypesSpanAggregationType'
|
||||
field:
|
||||
$ref: '#/components/schemas/TelemetrytypesTelemetryFieldKey'
|
||||
value:
|
||||
additionalProperties:
|
||||
minimum: 0
|
||||
type: integer
|
||||
nullable: true
|
||||
type: object
|
||||
type: object
|
||||
TracedetailtypesSpanAggregationType:
|
||||
enum:
|
||||
- spanCount
|
||||
- executionTimePercentage
|
||||
- duration
|
||||
type: string
|
||||
TracedetailtypesWaterfallSpan:
|
||||
properties:
|
||||
attributes:
|
||||
|
||||
@@ -5593,6 +5593,11 @@ export type TracedetailtypesGettableWaterfallTraceDTOServiceNameToTotalDurationM
|
||||
{ [key: string]: number } | null;
|
||||
|
||||
export interface TracedetailtypesGettableWaterfallTraceDTO {
|
||||
/**
|
||||
* @type array
|
||||
* @nullable true
|
||||
*/
|
||||
aggregations?: TracedetailtypesSpanAggregationResultDTO[] | null;
|
||||
/**
|
||||
* @type integer
|
||||
* @minimum 0
|
||||
@@ -5647,6 +5652,11 @@ export interface TracedetailtypesGettableWaterfallTraceDTO {
|
||||
}
|
||||
|
||||
export interface TracedetailtypesPostableWaterfallDTO {
|
||||
/**
|
||||
* @type array
|
||||
* @nullable true
|
||||
*/
|
||||
aggregations?: TracedetailtypesSpanAggregationDTO[] | null;
|
||||
/**
|
||||
* @type integer
|
||||
* @minimum 0
|
||||
@@ -5663,6 +5673,33 @@ export interface TracedetailtypesPostableWaterfallDTO {
|
||||
uncollapsedSpans?: string[] | null;
|
||||
}
|
||||
|
||||
export interface TracedetailtypesSpanAggregationDTO {
|
||||
aggregation?: TracedetailtypesSpanAggregationTypeDTO;
|
||||
field?: TelemetrytypesTelemetryFieldKeyDTO;
|
||||
}
|
||||
|
||||
/**
|
||||
* @nullable
|
||||
*/
|
||||
export type TracedetailtypesSpanAggregationResultDTOValue = {
|
||||
[key: string]: number;
|
||||
} | null;
|
||||
|
||||
export interface TracedetailtypesSpanAggregationResultDTO {
|
||||
aggregation?: TracedetailtypesSpanAggregationTypeDTO;
|
||||
field?: TelemetrytypesTelemetryFieldKeyDTO;
|
||||
/**
|
||||
* @type object
|
||||
* @nullable true
|
||||
*/
|
||||
value?: TracedetailtypesSpanAggregationResultDTOValue;
|
||||
}
|
||||
|
||||
export enum TracedetailtypesSpanAggregationTypeDTO {
|
||||
spanCount = 'spanCount',
|
||||
executionTimePercentage = 'executionTimePercentage',
|
||||
duration = 'duration',
|
||||
}
|
||||
/**
|
||||
* @nullable
|
||||
*/
|
||||
|
||||
@@ -25,6 +25,11 @@ func (h *handler) GetWaterfall(rw http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
if err := req.Validate(); err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
result, err := h.module.GetWaterfall(r.Context(), mux.Vars(r)["traceID"], req)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
|
||||
@@ -37,7 +37,12 @@ func (m *module) GetWaterfall(ctx context.Context, traceID string, req *tracedet
|
||||
m.config.Waterfall.MaxDepthToAutoExpand,
|
||||
)
|
||||
|
||||
return tracedetailtypes.NewGettableWaterfallTrace(waterfallTrace, selectedSpans, uncollapsedSpans, selectedAllSpans), nil
|
||||
aggregationResults := make([]tracedetailtypes.SpanAggregationResult, 0, len(req.Aggregations))
|
||||
for _, a := range req.Aggregations {
|
||||
aggregationResults = append(aggregationResults, waterfallTrace.GetSpanAggregation(a.Aggregation, a.Field))
|
||||
}
|
||||
|
||||
return tracedetailtypes.NewGettableWaterfallTrace(waterfallTrace, selectedSpans, uncollapsedSpans, selectedAllSpans, aggregationResults), nil
|
||||
}
|
||||
|
||||
// getTraceData returns the waterfall cache for the given traceID with fallback on DB.
|
||||
|
||||
@@ -260,7 +260,7 @@ func TestGetSelectedSpans_MultipleRoots(t *testing.T) {
|
||||
trace := getWaterfallTrace([]*tracedetailtypes.WaterfallSpan{root1, root2}, spanMap)
|
||||
spans, _ := trace.GetSelectedSpans([]string{"root1", "root2"}, "root1", 500, 5)
|
||||
|
||||
traceRespnose := tracedetailtypes.NewGettableWaterfallTrace(trace, spans, nil, false)
|
||||
traceRespnose := tracedetailtypes.NewGettableWaterfallTrace(trace, spans, nil, false, nil)
|
||||
|
||||
assert.Equal(t, []string{"root1", "child1", "root2", "child2"}, spanIDs(spans), "root1 subtree must precede root2 subtree")
|
||||
assert.Equal(t, "svc-a", traceRespnose.RootServiceName, "metadata comes from first root")
|
||||
@@ -567,7 +567,7 @@ func TestGetAllSpans(t *testing.T) {
|
||||
)
|
||||
trace := getWaterfallTrace([]*tracedetailtypes.WaterfallSpan{root}, nil)
|
||||
spans := trace.GetAllSpans()
|
||||
traceResponse := tracedetailtypes.NewGettableWaterfallTrace(trace, spans, nil, true)
|
||||
traceResponse := tracedetailtypes.NewGettableWaterfallTrace(trace, spans, nil, true, nil)
|
||||
assert.ElementsMatch(t, spanIDs(spans), []string{"root", "childA", "grandchildA", "leafA", "childB", "grandchildB", "leafB"})
|
||||
assert.Equal(t, "svc", traceResponse.RootServiceName)
|
||||
assert.Equal(t, "root-op", traceResponse.RootServiceEntryPoint)
|
||||
|
||||
@@ -1154,7 +1154,13 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, orgID
|
||||
if err != nil {
|
||||
r.logger.Info("cache miss for getFlamegraphSpansForTrace", "traceID", traceID)
|
||||
|
||||
searchScanResponses, err := r.GetSpansForTrace(ctx, traceID, fmt.Sprintf("SELECT timestamp, duration_nano, span_id, trace_id, has_error,links as references, resource_string_service$$name, name, events FROM %s.%s WHERE trace_id=$1 and ts_bucket_start>=$2 and ts_bucket_start<=$3 ORDER BY timestamp ASC, name ASC", r.TraceDB, r.traceTableName))
|
||||
selectCols := "timestamp, duration_nano, span_id, trace_id, has_error, links as references, resource_string_service$$name, name, events"
|
||||
if len(req.RequiredFields) > 0 {
|
||||
selectCols += ", attributes_string, attributes_number, attributes_bool, resources_string"
|
||||
}
|
||||
flamegraphQuery := fmt.Sprintf("SELECT %s FROM %s.%s WHERE trace_id=$1 and ts_bucket_start>=$2 and ts_bucket_start<=$3 ORDER BY timestamp ASC, name ASC", selectCols, r.TraceDB, r.traceTableName)
|
||||
|
||||
searchScanResponses, err := r.GetSpansForTrace(ctx, traceID, flamegraphQuery)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -1193,6 +1199,10 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, orgID
|
||||
Children: make([]*model.FlamegraphSpan, 0),
|
||||
}
|
||||
|
||||
if len(req.RequiredFields) > 0 {
|
||||
jsonItem.SetRequestedFields(item, req.RequiredFields)
|
||||
}
|
||||
|
||||
// metadata calculation
|
||||
startTimeUnixNano := uint64(item.TimeUnixNano.UnixNano())
|
||||
if startTime == 0 || startTimeUnixNano < startTime {
|
||||
|
||||
@@ -2,6 +2,8 @@ package model
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
)
|
||||
|
||||
type InstantQueryMetricsParams struct {
|
||||
@@ -337,10 +339,11 @@ type GetWaterfallSpansForTraceWithMetadataParams struct {
|
||||
}
|
||||
|
||||
type GetFlamegraphSpansForTraceParams struct {
|
||||
SelectedSpanID string `json:"selectedSpanId"`
|
||||
Limit uint `json:"limit"`
|
||||
BoundaryStartTS uint64 `json:"boundaryStartTsMilli"`
|
||||
BoundaryEndTS uint64 `json:"boundarEndTsMilli"`
|
||||
SelectedSpanID string `json:"selectedSpanId"`
|
||||
Limit uint `json:"limit"`
|
||||
BoundaryStartTS uint64 `json:"boundaryStartTsMilli"`
|
||||
BoundaryEndTS uint64 `json:"boundarEndTsMilli"`
|
||||
RequiredFields []telemetrytypes.TelemetryFieldKey `json:"requiredFields"`
|
||||
}
|
||||
|
||||
type SpanFilterParams struct {
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/prometheus/promql/parser"
|
||||
"github.com/prometheus/prometheus/util/stats"
|
||||
@@ -314,6 +315,30 @@ type FlamegraphSpan struct {
|
||||
Events []Event `json:"event"`
|
||||
References []OtelSpanRef `json:"references,omitempty"`
|
||||
Children []*FlamegraphSpan `json:"children"`
|
||||
Attributes map[string]any `json:"attributes,omitempty"`
|
||||
Resource map[string]string `json:"resource,omitempty"`
|
||||
}
|
||||
|
||||
// SetRequestedFields extracts the requested attribute/resource fields from item into s.
|
||||
func (s *FlamegraphSpan) SetRequestedFields(item SpanItemV2, fields []telemetrytypes.TelemetryFieldKey) {
|
||||
for _, field := range fields {
|
||||
switch field.FieldContext {
|
||||
case telemetrytypes.FieldContextResource:
|
||||
if v, ok := item.Resources_string[field.Name]; ok && v != "" {
|
||||
if s.Resource == nil {
|
||||
s.Resource = make(map[string]string)
|
||||
}
|
||||
s.Resource[field.Name] = v
|
||||
}
|
||||
case telemetrytypes.FieldContextAttribute:
|
||||
if v := item.AttributeValue(field.Name); v != nil {
|
||||
if s.Attributes == nil {
|
||||
s.Attributes = make(map[string]any)
|
||||
}
|
||||
s.Attributes[field.Name] = v
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type GetWaterfallSpansForTraceWithMetadataResponse struct {
|
||||
|
||||
@@ -29,3 +29,17 @@ type TraceSummary struct {
|
||||
End time.Time `ch:"end"`
|
||||
NumSpans uint64 `ch:"num_spans"`
|
||||
}
|
||||
|
||||
// AttributeValue looks up an attribute across string, number, and bool maps in priority order.
|
||||
func (s SpanItemV2) AttributeValue(name string) any {
|
||||
if v, ok := s.Attributes_string[name]; ok && v != "" {
|
||||
return v
|
||||
}
|
||||
if v, ok := s.Attributes_number[name]; ok {
|
||||
return v
|
||||
}
|
||||
if v, ok := s.Attributes_bool[name]; ok {
|
||||
return v
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
50
pkg/types/tracedetailtypes/aggregation.go
Normal file
50
pkg/types/tracedetailtypes/aggregation.go
Normal file
@@ -0,0 +1,50 @@
|
||||
package tracedetailtypes
|
||||
|
||||
import (
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
)
|
||||
|
||||
const maxAggregationItems = 10
|
||||
|
||||
var ErrTooManyAggregationItems = errors.NewInvalidInputf(errors.CodeInvalidInput, "aggregations request exceeds maximum of %d items", maxAggregationItems)
|
||||
|
||||
// SpanAggregationType defines the aggregation to compute over spans grouped by a field.
|
||||
type SpanAggregationType string
|
||||
|
||||
const (
|
||||
SpanAggregationSpanCount SpanAggregationType = "spanCount"
|
||||
SpanAggregationExecutionTimePercentage SpanAggregationType = "executionTimePercentage"
|
||||
SpanAggregationDuration SpanAggregationType = "duration"
|
||||
)
|
||||
|
||||
// SpanAggregation is a single aggregation request item: which field to group by and how.
|
||||
type SpanAggregation struct {
|
||||
Field telemetrytypes.TelemetryFieldKey `json:"field"`
|
||||
Aggregation SpanAggregationType `json:"aggregation"`
|
||||
}
|
||||
|
||||
// SpanAggregationResult is the computed result for one aggregation request item.
|
||||
// Duration values are in milliseconds.
|
||||
type SpanAggregationResult struct {
|
||||
Field telemetrytypes.TelemetryFieldKey `json:"field"`
|
||||
Aggregation SpanAggregationType `json:"aggregation"`
|
||||
Value map[string]uint64 `json:"value" nullable:"true"`
|
||||
}
|
||||
|
||||
func (s SpanAggregationType) Enum() []any {
|
||||
return []any{
|
||||
SpanAggregationSpanCount,
|
||||
SpanAggregationExecutionTimePercentage,
|
||||
SpanAggregationDuration,
|
||||
}
|
||||
}
|
||||
|
||||
func (s SpanAggregationType) isValid() bool {
|
||||
for _, v := range s.Enum() {
|
||||
if v == s {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
240
pkg/types/tracedetailtypes/aggregation_test.go
Normal file
240
pkg/types/tracedetailtypes/aggregation_test.go
Normal file
@@ -0,0 +1,240 @@
|
||||
package tracedetailtypes
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
// mkASpan builds a WaterfallSpan with timing and field data for analytics tests.
|
||||
func mkASpan(id string, resource map[string]string, attributes map[string]any, startNs, durationNs uint64) *WaterfallSpan {
|
||||
return &WaterfallSpan{
|
||||
SpanID: id,
|
||||
Resource: resource,
|
||||
Attributes: attributes,
|
||||
TimeUnixNano: startNs,
|
||||
DurationNano: durationNs,
|
||||
Children: make([]*WaterfallSpan, 0),
|
||||
}
|
||||
}
|
||||
|
||||
func buildTraceFromSpans(spans ...*WaterfallSpan) *WaterfallTrace {
|
||||
spanMap := make(map[string]*WaterfallSpan, len(spans))
|
||||
var startTime, endTime uint64
|
||||
initialized := false
|
||||
for _, s := range spans {
|
||||
spanMap[s.SpanID] = s
|
||||
if !initialized || s.TimeUnixNano < startTime {
|
||||
startTime = s.TimeUnixNano
|
||||
initialized = true
|
||||
}
|
||||
if end := s.TimeUnixNano + s.DurationNano; end > endTime {
|
||||
endTime = end
|
||||
}
|
||||
}
|
||||
return NewWaterfallTrace(startTime, endTime, uint64(len(spanMap)), 0, spanMap, nil, nil, false)
|
||||
}
|
||||
|
||||
var (
|
||||
fieldServiceName = telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
}
|
||||
fieldHTTPMethod = telemetrytypes.TelemetryFieldKey{
|
||||
Name: "http.method",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
}
|
||||
fieldCached = telemetrytypes.TelemetryFieldKey{
|
||||
Name: "db.cached",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
}
|
||||
)
|
||||
|
||||
func TestGetSpanAggregation_SpanCount(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
trace *WaterfallTrace
|
||||
field telemetrytypes.TelemetryFieldKey
|
||||
want map[string]uint64
|
||||
}{
|
||||
{
|
||||
name: "counts by resource field",
|
||||
trace: buildTraceFromSpans(
|
||||
mkASpan("s1", map[string]string{"service.name": "frontend"}, nil, 0, 10),
|
||||
mkASpan("s2", map[string]string{"service.name": "frontend"}, nil, 10, 5),
|
||||
mkASpan("s3", map[string]string{"service.name": "backend"}, nil, 20, 8),
|
||||
),
|
||||
field: fieldServiceName,
|
||||
want: map[string]uint64{"frontend": 2, "backend": 1},
|
||||
},
|
||||
{
|
||||
name: "counts by string attribute field",
|
||||
trace: buildTraceFromSpans(
|
||||
mkASpan("s1", nil, map[string]any{"http.method": "GET"}, 0, 10),
|
||||
mkASpan("s2", nil, map[string]any{"http.method": "POST"}, 10, 5),
|
||||
mkASpan("s3", nil, map[string]any{"http.method": "GET"}, 20, 8),
|
||||
),
|
||||
field: fieldHTTPMethod,
|
||||
want: map[string]uint64{"GET": 2, "POST": 1},
|
||||
},
|
||||
{
|
||||
name: "counts by boolean attribute field",
|
||||
trace: buildTraceFromSpans(
|
||||
mkASpan("s1", nil, map[string]any{"db.cached": true}, 0, 10),
|
||||
mkASpan("s2", nil, map[string]any{"db.cached": false}, 10, 5),
|
||||
mkASpan("s3", nil, map[string]any{"db.cached": true}, 20, 8),
|
||||
),
|
||||
field: fieldCached,
|
||||
want: map[string]uint64{"true": 2, "false": 1},
|
||||
},
|
||||
{
|
||||
name: "spans missing the field are excluded",
|
||||
trace: buildTraceFromSpans(
|
||||
mkASpan("s1", map[string]string{"service.name": "frontend"}, nil, 0, 10),
|
||||
mkASpan("s2", map[string]string{}, nil, 10, 5), // no service.name
|
||||
mkASpan("s3", map[string]string{"service.name": "backend"}, nil, 20, 8),
|
||||
),
|
||||
field: fieldServiceName,
|
||||
want: map[string]uint64{"frontend": 1, "backend": 1},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
result := tc.trace.GetSpanAggregation(SpanAggregationSpanCount, tc.field)
|
||||
assert.Equal(t, tc.field, result.Field)
|
||||
assert.Equal(t, SpanAggregationSpanCount, result.Aggregation)
|
||||
assert.Equal(t, tc.want, result.Value)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetSpanAggregation_Duration(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
trace *WaterfallTrace
|
||||
field telemetrytypes.TelemetryFieldKey
|
||||
want map[string]uint64
|
||||
}{
|
||||
{
|
||||
name: "non-overlapping spans — merged equals sum",
|
||||
trace: buildTraceFromSpans(
|
||||
mkASpan("s1", map[string]string{"service.name": "frontend"}, nil, 0, 100),
|
||||
mkASpan("s2", map[string]string{"service.name": "frontend"}, nil, 100, 50),
|
||||
mkASpan("s3", map[string]string{"service.name": "backend"}, nil, 0, 80),
|
||||
),
|
||||
field: fieldServiceName,
|
||||
want: map[string]uint64{"frontend": 150, "backend": 80},
|
||||
},
|
||||
{
|
||||
name: "non-overlapping attribute groups — merged equals sum",
|
||||
trace: buildTraceFromSpans(
|
||||
mkASpan("s1", nil, map[string]any{"http.method": "GET"}, 0, 30),
|
||||
mkASpan("s2", nil, map[string]any{"http.method": "GET"}, 50, 20),
|
||||
mkASpan("s3", nil, map[string]any{"http.method": "POST"}, 0, 70),
|
||||
),
|
||||
field: fieldHTTPMethod,
|
||||
want: map[string]uint64{"GET": 50, "POST": 70},
|
||||
},
|
||||
{
|
||||
name: "overlapping spans — non-overlapping interval merge",
|
||||
trace: buildTraceFromSpans(
|
||||
mkASpan("s1", map[string]string{"service.name": "svc"}, nil, 0, 10),
|
||||
mkASpan("s2", map[string]string{"service.name": "svc"}, nil, 5, 10),
|
||||
),
|
||||
field: fieldServiceName,
|
||||
want: map[string]uint64{"svc": 15}, // [0,10] ∪ [5,15] = [0,15]
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
result := tc.trace.GetSpanAggregation(SpanAggregationDuration, tc.field)
|
||||
assert.Equal(t, tc.field, result.Field)
|
||||
assert.Equal(t, SpanAggregationDuration, result.Aggregation)
|
||||
assert.Equal(t, tc.want, result.Value)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetSpanAggregation_ExecutionTimePercentage(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
trace *WaterfallTrace
|
||||
field telemetrytypes.TelemetryFieldKey
|
||||
want map[string]uint64
|
||||
}{
|
||||
{
|
||||
// trace [0,30]: svc occupies [0,10]+[20,30]=20 → 20*100/30 = 66%
|
||||
name: "non-overlapping spans",
|
||||
trace: buildTraceFromSpans(
|
||||
mkASpan("s1", map[string]string{"service.name": "svc"}, nil, 0, 10),
|
||||
mkASpan("s2", map[string]string{"service.name": "svc"}, nil, 20, 10),
|
||||
),
|
||||
field: fieldServiceName,
|
||||
want: map[string]uint64{"svc": 66},
|
||||
},
|
||||
{
|
||||
// trace [0,15]: svc [0,15]=15 → 100%
|
||||
name: "partially overlapping spans",
|
||||
trace: buildTraceFromSpans(
|
||||
mkASpan("s1", map[string]string{"service.name": "svc"}, nil, 0, 10),
|
||||
mkASpan("s2", map[string]string{"service.name": "svc"}, nil, 5, 10),
|
||||
),
|
||||
field: fieldServiceName,
|
||||
want: map[string]uint64{"svc": 100},
|
||||
},
|
||||
{
|
||||
// trace [0,20]: outer absorbs inner → 100%
|
||||
name: "fully contained span",
|
||||
trace: buildTraceFromSpans(
|
||||
mkASpan("outer", map[string]string{"service.name": "svc"}, nil, 0, 20),
|
||||
mkASpan("inner", map[string]string{"service.name": "svc"}, nil, 5, 5),
|
||||
),
|
||||
field: fieldServiceName,
|
||||
want: map[string]uint64{"svc": 100},
|
||||
},
|
||||
{
|
||||
// trace [0,30]: svc [0,15]+[20,30]=25 → 25*100/30 = 83%
|
||||
name: "three spans with two merges",
|
||||
trace: buildTraceFromSpans(
|
||||
mkASpan("s1", map[string]string{"service.name": "svc"}, nil, 0, 10),
|
||||
mkASpan("s2", map[string]string{"service.name": "svc"}, nil, 5, 10),
|
||||
mkASpan("s3", map[string]string{"service.name": "svc"}, nil, 20, 10),
|
||||
),
|
||||
field: fieldServiceName,
|
||||
want: map[string]uint64{"svc": 83},
|
||||
},
|
||||
{
|
||||
// trace [0,28]: frontend [0,15]=15 → 53%, backend [0,5]+[20,28]=13 → 46%
|
||||
name: "independent groups are computed separately",
|
||||
trace: buildTraceFromSpans(
|
||||
mkASpan("a1", map[string]string{"service.name": "frontend"}, nil, 0, 10),
|
||||
mkASpan("a2", map[string]string{"service.name": "frontend"}, nil, 5, 10),
|
||||
mkASpan("b1", map[string]string{"service.name": "backend"}, nil, 0, 5),
|
||||
mkASpan("b2", map[string]string{"service.name": "backend"}, nil, 20, 8),
|
||||
),
|
||||
field: fieldServiceName,
|
||||
want: map[string]uint64{"frontend": 53, "backend": 46},
|
||||
},
|
||||
{
|
||||
// trace [100,150]: svc [100,150]=50 → 100%
|
||||
name: "single span",
|
||||
trace: buildTraceFromSpans(
|
||||
mkASpan("s1", map[string]string{"service.name": "svc"}, nil, 100, 50),
|
||||
),
|
||||
field: fieldServiceName,
|
||||
want: map[string]uint64{"svc": 100},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
result := tc.trace.GetSpanAggregation(SpanAggregationExecutionTimePercentage, tc.field)
|
||||
assert.Equal(t, tc.field, result.Field)
|
||||
assert.Equal(t, SpanAggregationExecutionTimePercentage, result.Aggregation)
|
||||
assert.Equal(t, tc.want, result.Value)
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -2,11 +2,13 @@ package tracedetailtypes
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"maps"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -21,9 +23,27 @@ var ErrTraceNotFound = errors.NewNotFoundf(errors.CodeNotFound, "trace not found
|
||||
|
||||
// PostableWaterfall is the request body for the v3 waterfall API.
|
||||
type PostableWaterfall struct {
|
||||
SelectedSpanID string `json:"selectedSpanId"`
|
||||
UncollapsedSpans []string `json:"uncollapsedSpans"`
|
||||
Limit uint `json:"limit"`
|
||||
SelectedSpanID string `json:"selectedSpanId"`
|
||||
UncollapsedSpans []string `json:"uncollapsedSpans"`
|
||||
Limit uint `json:"limit"`
|
||||
Aggregations []SpanAggregation `json:"aggregations"`
|
||||
}
|
||||
|
||||
func (p *PostableWaterfall) Validate() error {
|
||||
if len(p.Aggregations) > maxAggregationItems {
|
||||
return ErrTooManyAggregationItems
|
||||
}
|
||||
for _, a := range p.Aggregations {
|
||||
if !a.Aggregation.isValid() {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput, "unknown aggregation type: %q", a.Aggregation)
|
||||
}
|
||||
fc := a.Field.FieldContext
|
||||
if fc != telemetrytypes.FieldContextResource && fc != telemetrytypes.FieldContextAttribute {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput, "aggregation field context must be %q or %q, got %q",
|
||||
telemetrytypes.FieldContextResource, telemetrytypes.FieldContextAttribute, fc)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Event represents a span event.
|
||||
@@ -160,7 +180,24 @@ func (ws *WaterfallSpan) GetSubtreeNodeCount() uint64 {
|
||||
return count
|
||||
}
|
||||
|
||||
// getPreOrderedSpans returns spans in pre-order, uncollapsedSpanIDs must be pre-computed.
|
||||
// FieldValue returns the string representation of field's value on this span for grouping.
|
||||
// The bool reports whether the field was present with a non-empty value.
|
||||
func (ws *WaterfallSpan) FieldValue(field telemetrytypes.TelemetryFieldKey) (string, bool) {
|
||||
switch field.FieldContext {
|
||||
case telemetrytypes.FieldContextResource:
|
||||
v := ws.Resource[field.Name]
|
||||
return v, v != ""
|
||||
case telemetrytypes.FieldContextAttribute:
|
||||
v, ok := ws.Attributes[field.Name]
|
||||
if !ok {
|
||||
return "", false
|
||||
}
|
||||
str := fmt.Sprintf("%v", v)
|
||||
return str, str != ""
|
||||
}
|
||||
return "", false
|
||||
}
|
||||
|
||||
func (ws *WaterfallSpan) getPreOrderedSpans(uncollapsedSpanIDs map[string]struct{}, selectAll bool, level uint64) []*WaterfallSpan {
|
||||
result := []*WaterfallSpan{ws.GetWithoutChildren(level)}
|
||||
_, isUncollapsed := uncollapsedSpanIDs[ws.SpanID]
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/cachetypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
)
|
||||
|
||||
type TraceSummary struct {
|
||||
@@ -31,17 +32,19 @@ type WaterfallTrace struct {
|
||||
|
||||
// GettableWaterfallTrace is the response for the v3 waterfall API.
|
||||
type GettableWaterfallTrace struct {
|
||||
StartTimestampMillis uint64 `json:"startTimestampMillis"`
|
||||
EndTimestampMillis uint64 `json:"endTimestampMillis"`
|
||||
RootServiceName string `json:"rootServiceName"`
|
||||
RootServiceEntryPoint string `json:"rootServiceEntryPoint"`
|
||||
TotalSpansCount uint64 `json:"totalSpansCount"`
|
||||
TotalErrorSpansCount uint64 `json:"totalErrorSpansCount"`
|
||||
ServiceNameToTotalDurationMap map[string]uint64 `json:"serviceNameToTotalDurationMap"`
|
||||
Spans []*WaterfallSpan `json:"spans"`
|
||||
HasMissingSpans bool `json:"hasMissingSpans"`
|
||||
UncollapsedSpans []string `json:"uncollapsedSpans"`
|
||||
HasMore bool `json:"hasMore"`
|
||||
StartTimestampMillis uint64 `json:"startTimestampMillis"`
|
||||
EndTimestampMillis uint64 `json:"endTimestampMillis"`
|
||||
RootServiceName string `json:"rootServiceName"`
|
||||
RootServiceEntryPoint string `json:"rootServiceEntryPoint"`
|
||||
TotalSpansCount uint64 `json:"totalSpansCount"`
|
||||
TotalErrorSpansCount uint64 `json:"totalErrorSpansCount"`
|
||||
// Deprecated: use Aggregations with SpanAggregationExecutionTimePercentage on the service.name field instead.
|
||||
ServiceNameToTotalDurationMap map[string]uint64 `json:"serviceNameToTotalDurationMap"`
|
||||
Spans []*WaterfallSpan `json:"spans"`
|
||||
HasMissingSpans bool `json:"hasMissingSpans"`
|
||||
UncollapsedSpans []string `json:"uncollapsedSpans"`
|
||||
HasMore bool `json:"hasMore"`
|
||||
Aggregations []SpanAggregationResult `json:"aggregations"`
|
||||
}
|
||||
|
||||
// NewWaterfallTrace constructs a WaterfallTrace from processed span data.
|
||||
@@ -240,12 +243,13 @@ func (wt *WaterfallTrace) UnmarshalBinary(data []byte) error {
|
||||
return json.Unmarshal(data, wt)
|
||||
}
|
||||
|
||||
// NewGettableWaterfallTrace constructs a WaterfallResponse from processed trace data and selected spans.
|
||||
// NewGettableWaterfallTrace constructs a GettableWaterfallTrace from processed trace data and selected spans.
|
||||
func NewGettableWaterfallTrace(
|
||||
traceData *WaterfallTrace,
|
||||
selectedSpans []*WaterfallSpan,
|
||||
uncollapsedSpans []string,
|
||||
selectAllSpans bool,
|
||||
aggregations []SpanAggregationResult,
|
||||
) *GettableWaterfallTrace {
|
||||
var rootServiceName, rootServiceEntryPoint string
|
||||
if len(traceData.TraceRoots) > 0 {
|
||||
@@ -263,6 +267,15 @@ func NewGettableWaterfallTrace(
|
||||
span.TimeUnixNano = span.TimeUnixNano / 1_000_000
|
||||
}
|
||||
|
||||
// duration values are in nanoseconds; convert in-place to milliseconds.
|
||||
for i := range aggregations {
|
||||
if aggregations[i].Aggregation == SpanAggregationDuration {
|
||||
for k, v := range aggregations[i].Value {
|
||||
aggregations[i].Value[k] = v / 1_000_000
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return &GettableWaterfallTrace{
|
||||
Spans: selectedSpans,
|
||||
UncollapsedSpans: uncollapsedSpans,
|
||||
@@ -275,6 +288,7 @@ func NewGettableWaterfallTrace(
|
||||
ServiceNameToTotalDurationMap: serviceDurationsMillis,
|
||||
HasMissingSpans: traceData.HasMissingSpans,
|
||||
HasMore: !selectAllSpans,
|
||||
Aggregations: aggregations,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -307,29 +321,82 @@ func calculateServiceTime(spanIDToSpanNodeMap map[string]*WaterfallSpan) map[str
|
||||
|
||||
totalTimes := make(map[string]uint64)
|
||||
for service, spans := range serviceSpans {
|
||||
sort.Slice(spans, func(i, j int) bool {
|
||||
return spans[i].TimeUnixNano < spans[j].TimeUnixNano
|
||||
})
|
||||
|
||||
currentStart := spans[0].TimeUnixNano
|
||||
currentEnd := currentStart + spans[0].DurationNano
|
||||
total := uint64(0)
|
||||
|
||||
for _, span := range spans[1:] {
|
||||
startNano := span.TimeUnixNano
|
||||
endNano := startNano + span.DurationNano
|
||||
if currentEnd >= startNano {
|
||||
if endNano > currentEnd {
|
||||
currentEnd = endNano
|
||||
}
|
||||
} else {
|
||||
total += currentEnd - currentStart
|
||||
currentStart = startNano
|
||||
currentEnd = endNano
|
||||
}
|
||||
}
|
||||
total += currentEnd - currentStart
|
||||
totalTimes[service] = total
|
||||
totalTimes[service] = mergeSpanIntervals(spans)
|
||||
}
|
||||
return totalTimes
|
||||
}
|
||||
|
||||
// mergeSpanIntervals computes non-overlapping execution time for a set of spans.
|
||||
func mergeSpanIntervals(spans []*WaterfallSpan) uint64 {
|
||||
if len(spans) == 0 {
|
||||
return 0
|
||||
}
|
||||
sort.Slice(spans, func(i, j int) bool {
|
||||
return spans[i].TimeUnixNano < spans[j].TimeUnixNano
|
||||
})
|
||||
|
||||
currentStart := spans[0].TimeUnixNano
|
||||
currentEnd := currentStart + spans[0].DurationNano
|
||||
total := uint64(0)
|
||||
|
||||
for _, span := range spans[1:] {
|
||||
startNano := span.TimeUnixNano
|
||||
endNano := startNano + span.DurationNano
|
||||
if currentEnd >= startNano {
|
||||
if endNano > currentEnd {
|
||||
currentEnd = endNano
|
||||
}
|
||||
} else {
|
||||
total += currentEnd - currentStart
|
||||
currentStart = startNano
|
||||
currentEnd = endNano
|
||||
}
|
||||
}
|
||||
return total + (currentEnd - currentStart)
|
||||
}
|
||||
|
||||
// GetSpanAggregation computes one aggregation result over all spans in the trace.
|
||||
// Duration values are returned in nanoseconds; callers convert to milliseconds as needed.
|
||||
func (wt *WaterfallTrace) GetSpanAggregation(aggregation SpanAggregationType, field telemetrytypes.TelemetryFieldKey) SpanAggregationResult {
|
||||
result := SpanAggregationResult{
|
||||
Field: field,
|
||||
Aggregation: aggregation,
|
||||
Value: make(map[string]uint64),
|
||||
}
|
||||
|
||||
switch aggregation {
|
||||
case SpanAggregationSpanCount:
|
||||
for _, span := range wt.SpanIDToSpanNodeMap {
|
||||
if key, ok := span.FieldValue(field); ok {
|
||||
result.Value[key]++
|
||||
}
|
||||
}
|
||||
|
||||
case SpanAggregationDuration:
|
||||
spansByField := make(map[string][]*WaterfallSpan)
|
||||
for _, span := range wt.SpanIDToSpanNodeMap {
|
||||
if key, ok := span.FieldValue(field); ok {
|
||||
spansByField[key] = append(spansByField[key], span)
|
||||
}
|
||||
}
|
||||
for key, spans := range spansByField {
|
||||
result.Value[key] = mergeSpanIntervals(spans)
|
||||
}
|
||||
|
||||
case SpanAggregationExecutionTimePercentage:
|
||||
traceDuration := wt.EndTime - wt.StartTime
|
||||
spansByField := make(map[string][]*WaterfallSpan)
|
||||
for _, span := range wt.SpanIDToSpanNodeMap {
|
||||
if key, ok := span.FieldValue(field); ok {
|
||||
spansByField[key] = append(spansByField[key], span)
|
||||
}
|
||||
}
|
||||
if traceDuration > 0 {
|
||||
for key, spans := range spansByField {
|
||||
result.Value[key] = mergeSpanIntervals(spans) * 100 / traceDuration
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user