Compare commits

...

3 Commits

Author SHA1 Message Date
Naman Verma
018c78f4ff fix: add metadata fetch to list metrics query to avoid CH timeouts due to sequential queries 2026-04-09 17:50:18 +05:30
Naman Verma
31e2c1b585 fix: combine metadata fetch into stats query to avoid ClickHouse timeouts due to sequential queries 2026-04-09 17:26:13 +05:30
Nikhil Soni
6d1d028d4c refactor: setup types and interface for waterfall v3 (#10794)
* feat: setup types and interface for waterfall v3

v3 is required for udpating the response json of
the waterfall api. There wont' be any logical change.
Using this requirement as an opportunity to move
waterfall api to provider codebase architecture from
older query-service

* refactor: move type conversion logic to types pkg

* chore: add reason for using snake case in response

* fix: update span.attributes to map of string to any

To support otel format of diffrent types of attributes

* fix: remove unused fields and rename span type

To avoid confusing with otel span

* chore: rename resources field to follow otel

---------

Co-authored-by: Nityananda Gohain <nityanandagohain@gmail.com>
2026-04-09 05:21:33 +00:00
3 changed files with 312 additions and 29 deletions

View File

@@ -138,7 +138,14 @@ func (m *module) listMeterMetrics(ctx context.Context, params *metricsexplorerty
func (m *module) listMetrics(ctx context.Context, orgID valuer.UUID, params *metricsexplorertypes.ListMetricsParams) (*metricsexplorertypes.ListMetricsResponse, error) {
sb := sqlbuilder.NewSelectBuilder()
sb.Select("DISTINCT metric_name")
sb.Select(
"metric_name",
"anyLast(description) AS description",
"anyLast(type) AS metric_type",
"argMax(unit, unix_milli) AS metric_unit",
"anyLast(temporality) AS temporality",
"anyLast(is_monotonic) AS is_monotonic",
)
if params.Start != nil && params.End != nil {
start, end, distributedTsTable, _ := telemetrymetrics.WhichTSTableToUse(uint64(*params.Start), uint64(*params.End), nil)
@@ -157,6 +164,7 @@ func (m *module) listMetrics(ctx context.Context, orgID valuer.UUID, params *met
sb.Where(sb.Like("lower(metric_name)", fmt.Sprintf("%%%s%%", searchLower)))
}
sb.GroupBy("metric_name")
sb.OrderBy("metric_name ASC")
sb.Limit(params.Limit)
@@ -170,43 +178,47 @@ func (m *module) listMetrics(ctx context.Context, orgID valuer.UUID, params *met
}
defer rows.Close()
metricNames := make([]string, 0)
var metrics []metricsexplorertypes.ListMetric
var metricNames []string
for rows.Next() {
var name string
if err := rows.Scan(&name); err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to scan metric name")
var metric metricsexplorertypes.ListMetric
if err := rows.Scan(
&metric.MetricName,
&metric.Description,
&metric.MetricType,
&metric.MetricUnit,
&metric.Temporality,
&metric.IsMonotonic,
); err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to scan metric")
}
metricNames = append(metricNames, name)
metrics = append(metrics, metric)
metricNames = append(metricNames, metric.MetricName)
}
if err := rows.Err(); err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "error iterating metric names")
return nil, errors.WrapInternalf(err, errors.CodeInternal, "error iterating metrics")
}
if len(metricNames) == 0 {
if len(metrics) == 0 {
return &metricsexplorertypes.ListMetricsResponse{
Metrics: []metricsexplorertypes.ListMetric{},
}, nil
}
metadata, err := m.GetMetricMetadataMulti(ctx, orgID, metricNames)
// Overlay any user-updated metadata on top of the timeseries metadata.
updatedMetadata, err := m.fetchUpdatedMetadata(ctx, orgID, metricNames)
if err != nil {
return nil, err
}
metrics := make([]metricsexplorertypes.ListMetric, 0, len(metricNames))
for _, name := range metricNames {
metric := metricsexplorertypes.ListMetric{
MetricName: name,
for i := range metrics {
if meta, ok := updatedMetadata[metrics[i].MetricName]; ok && meta != nil {
metrics[i].Description = meta.Description
metrics[i].MetricType = meta.MetricType
metrics[i].MetricUnit = meta.MetricUnit
metrics[i].Temporality = meta.Temporality
metrics[i].IsMonotonic = meta.IsMonotonic
}
if meta, ok := metadata[name]; ok && meta != nil {
metric.Description = meta.Description
metric.MetricType = meta.MetricType
metric.MetricUnit = meta.MetricUnit
metric.Temporality = meta.Temporality
metric.IsMonotonic = meta.IsMonotonic
}
metrics = append(metrics, metric)
}
return &metricsexplorertypes.ListMetricsResponse{
@@ -243,19 +255,18 @@ func (m *module) GetStats(ctx context.Context, orgID valuer.UUID, req *metricsex
}, nil
}
// Get metadata for all metrics
// Overlay any user-updated metadata on top of the timeseries metadata
// that was already fetched in the combined query.
metricNames := make([]string, len(metricStats))
for i := range metricStats {
metricNames[i] = metricStats[i].MetricName
}
metadata, err := m.GetMetricMetadataMulti(ctx, orgID, metricNames)
updatedMetadata, err := m.fetchUpdatedMetadata(ctx, orgID, metricNames)
if err != nil {
return nil, err
}
// Enrich stats with metadata
enrichStatsWithMetadata(metricStats, metadata)
enrichStatsWithMetadata(metricStats, updatedMetadata)
return &metricsexplorertypes.StatsResponse{
Metrics: metricStats,
@@ -984,11 +995,14 @@ func (m *module) fetchMetricsStatsWithSamples(
samplesTable := telemetrymetrics.WhichSamplesTableToUse(uint64(req.Start), uint64(req.End), metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, nil)
countExp := telemetrymetrics.CountExpressionForSamplesTable(samplesTable)
// Timeseries counts per metric
// Timeseries counts and metadata per metric.
tsSB := sqlbuilder.NewSelectBuilder()
tsSB.Select(
"metric_name",
"uniq(fingerprint) AS timeseries",
"anyLast(description) AS description",
"anyLast(type) AS metric_type",
"argMax(unit, unix_milli) AS metric_unit",
)
tsSB.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, distributedTsTable))
tsSB.Where(tsSB.Between("unix_milli", start, end))
@@ -1036,6 +1050,9 @@ func (m *module) fetchMetricsStatsWithSamples(
"COALESCE(ts.timeseries, 0) AS timeseries",
"COALESCE(s.samples, 0) AS samples",
"COUNT(*) OVER() AS total",
"ts.description AS description",
"ts.metric_type AS metric_type",
"ts.metric_unit AS metric_unit",
)
finalSB.From("__time_series_counts ts")
finalSB.JoinWithOption(sqlbuilder.FullOuterJoin, "__sample_counts s", "ts.metric_name = s.metric_name")
@@ -1071,7 +1088,7 @@ func (m *module) fetchMetricsStatsWithSamples(
metricStat metricsexplorertypes.Stat
rowTotal uint64
)
if err := rows.Scan(&metricStat.MetricName, &metricStat.TimeSeries, &metricStat.Samples, &rowTotal); err != nil {
if err := rows.Scan(&metricStat.MetricName, &metricStat.TimeSeries, &metricStat.Samples, &rowTotal, &metricStat.Description, &metricStat.MetricType, &metricStat.MetricUnit); err != nil {
return nil, 0, errors.WrapInternalf(err, errors.CodeInternal, "failed to scan metrics stats row")
}
metricStats = append(metricStats, metricStat)

View File

@@ -0,0 +1,19 @@
package tracedetail
import (
"context"
"net/http"
"github.com/SigNoz/signoz/pkg/types/tracedetailtypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
// Handler exposes HTTP handlers for trace detail APIs.
type Handler interface {
GetWaterfall(http.ResponseWriter, *http.Request)
}
// Module defines the business logic for trace detail operations.
type Module interface {
GetWaterfall(ctx context.Context, orgID valuer.UUID, traceID string, req *tracedetailtypes.WaterfallRequest) (*tracedetailtypes.WaterfallResponse, error)
}

View File

@@ -0,0 +1,247 @@
package tracedetailtypes
import (
"encoding/json"
"maps"
"time"
"github.com/SigNoz/signoz/pkg/types/cachetypes"
)
// WaterfallRequest is the request body for the v3 waterfall API.
type WaterfallRequest struct {
SelectedSpanID string `json:"selectedSpanId"`
UncollapsedSpans []string `json:"uncollapsedSpans"`
Limit uint `json:"limit"`
}
// WaterfallResponse is the response for the v3 waterfall API.
type WaterfallResponse struct {
StartTimestampMillis uint64 `json:"startTimestampMillis"`
EndTimestampMillis uint64 `json:"endTimestampMillis"`
DurationNano uint64 `json:"durationNano"`
RootServiceName string `json:"rootServiceName"`
RootServiceEntryPoint string `json:"rootServiceEntryPoint"`
TotalSpansCount uint64 `json:"totalSpansCount"`
TotalErrorSpansCount uint64 `json:"totalErrorSpansCount"`
ServiceNameToTotalDurationMap map[string]uint64 `json:"serviceNameToTotalDurationMap"`
Spans []*WaterfallSpan `json:"spans"`
HasMissingSpans bool `json:"hasMissingSpans"`
UncollapsedSpans []string `json:"uncollapsedSpans"`
}
// Event represents a span event.
type Event struct {
Name string `json:"name,omitempty"`
TimeUnixNano uint64 `json:"timeUnixNano,omitempty"`
AttributeMap map[string]any `json:"attributeMap,omitempty"`
IsError bool `json:"isError,omitempty"`
}
// WaterfallSpan represents the span in waterfall response,
// this uses snake_case keys for response as a special case since these
// keys can be directly used to query spans and client need to know the actual fields.
// This pattern should not be copied elsewhere.
type WaterfallSpan struct {
Attributes map[string]any `json:"attributes"`
DBName string `json:"db_name"`
DBOperation string `json:"db_operation"`
DurationNano uint64 `json:"duration_nano"`
Events []Event `json:"events"`
ExternalHTTPMethod string `json:"external_http_method"`
ExternalHTTPURL string `json:"external_http_url"`
Flags uint32 `json:"flags"`
HasError bool `json:"has_error"`
HTTPHost string `json:"http_host"`
HTTPMethod string `json:"http_method"`
HTTPURL string `json:"http_url"`
IsRemote string `json:"is_remote"`
Kind int32 `json:"kind"`
KindString string `json:"kind_string"`
Links string `json:"links"`
Name string `json:"name"`
ParentSpanID string `json:"parent_span_id"`
Resource map[string]string `json:"resource"`
ResponseStatusCode string `json:"response_status_code"`
SpanID string `json:"span_id"`
StatusCode int32 `json:"status_code"`
StatusCodeString string `json:"status_code_string"`
StatusMessage string `json:"status_message"`
Timestamp string `json:"timestamp"`
TraceID string `json:"trace_id"`
TraceState string `json:"trace_state"`
// Tree structure fields
Children []*WaterfallSpan `json:"-"`
SubTreeNodeCount uint64 `json:"sub_tree_node_count"`
HasChildren bool `json:"has_children"`
Level uint64 `json:"level"`
// timeUnixNano is an internal field used for tree building and sorting.
// It is not serialized in the JSON response.
TimeUnixNano uint64 `json:"-"`
// serviceName is an internal field used for service time calculation.
ServiceName string `json:"-"`
}
// CopyWithoutChildren creates a shallow copy and reset computed tree fields.
func (s *WaterfallSpan) CopyWithoutChildren(level uint64) *WaterfallSpan {
cp := *s
cp.Level = level
cp.HasChildren = len(s.Children) > 0
cp.Children = make([]*WaterfallSpan, 0)
cp.SubTreeNodeCount = 0
return &cp
}
// SpanModel is the ClickHouse scan struct for the v3 waterfall query.
type SpanModel struct {
TimeUnixNano time.Time `ch:"timestamp"`
DurationNano uint64 `ch:"duration_nano"`
SpanID string `ch:"span_id"`
TraceID string `ch:"trace_id"`
HasError bool `ch:"has_error"`
Kind int8 `ch:"kind"`
ServiceName string `ch:"resource_string_service$$name"`
Name string `ch:"name"`
References string `ch:"references"`
AttributesString map[string]string `ch:"attributes_string"`
AttributesNumber map[string]float64 `ch:"attributes_number"`
AttributesBool map[string]bool `ch:"attributes_bool"`
ResourcesString map[string]string `ch:"resources_string"`
Events []string `ch:"events"`
StatusMessage string `ch:"status_message"`
StatusCodeString string `ch:"status_code_string"`
SpanKind string `ch:"kind_string"`
ParentSpanID string `ch:"parent_span_id"`
Flags uint32 `ch:"flags"`
IsRemote string `ch:"is_remote"`
TraceState string `ch:"trace_state"`
StatusCode int32 `ch:"status_code"`
DBName string `ch:"db_name"`
DBOperation string `ch:"db_operation"`
HTTPMethod string `ch:"http_method"`
HTTPURL string `ch:"http_url"`
HTTPHost string `ch:"http_host"`
ExternalHTTPMethod string `ch:"external_http_method"`
ExternalHTTPURL string `ch:"external_http_url"`
ResponseStatusCode string `ch:"response_status_code"`
}
// ToSpan converts a SpanModel (ClickHouse scan result) into a Span for the waterfall response.
func (item *SpanModel) ToSpan() *WaterfallSpan {
// Merge attributes_string, attributes_number, attributes_bool preserving native types
attributes := make(map[string]any, len(item.AttributesString)+len(item.AttributesNumber)+len(item.AttributesBool))
for k, v := range item.AttributesString {
attributes[k] = v
}
for k, v := range item.AttributesNumber {
attributes[k] = v
}
for k, v := range item.AttributesBool {
attributes[k] = v
}
resources := make(map[string]string)
maps.Copy(resources, item.ResourcesString)
events := make([]Event, 0, len(item.Events))
for _, eventStr := range item.Events {
var event Event
if err := json.Unmarshal([]byte(eventStr), &event); err != nil {
continue
}
events = append(events, event)
}
return &WaterfallSpan{
Attributes: attributes,
DBName: item.DBName,
DBOperation: item.DBOperation,
DurationNano: item.DurationNano,
Events: events,
ExternalHTTPMethod: item.ExternalHTTPMethod,
ExternalHTTPURL: item.ExternalHTTPURL,
Flags: item.Flags,
HasError: item.HasError,
HTTPHost: item.HTTPHost,
HTTPMethod: item.HTTPMethod,
HTTPURL: item.HTTPURL,
IsRemote: item.IsRemote,
Kind: int32(item.Kind),
KindString: item.SpanKind,
Links: item.References,
Name: item.Name,
ParentSpanID: item.ParentSpanID,
Resource: resources,
ResponseStatusCode: item.ResponseStatusCode,
SpanID: item.SpanID,
StatusCode: item.StatusCode,
StatusCodeString: item.StatusCodeString,
StatusMessage: item.StatusMessage,
Timestamp: item.TimeUnixNano.Format(time.RFC3339Nano),
TraceID: item.TraceID,
TraceState: item.TraceState,
Children: make([]*WaterfallSpan, 0),
TimeUnixNano: uint64(item.TimeUnixNano.UnixNano()),
ServiceName: item.ServiceName,
}
}
// TraceSummary is the ClickHouse scan struct for the trace_summary query.
type TraceSummary struct {
TraceID string `ch:"trace_id"`
Start time.Time `ch:"start"`
End time.Time `ch:"end"`
NumSpans uint64 `ch:"num_spans"`
}
// OtelSpanRef is used for parsing the references/links JSON from ClickHouse.
type OtelSpanRef struct {
TraceId string `json:"traceId,omitempty"`
SpanId string `json:"spanId,omitempty"`
RefType string `json:"refType,omitempty"`
}
// WaterfallCache holds pre-processed trace data for caching.
type WaterfallCache struct {
StartTime uint64 `json:"startTime"`
EndTime uint64 `json:"endTime"`
DurationNano uint64 `json:"durationNano"`
TotalSpans uint64 `json:"totalSpans"`
TotalErrorSpans uint64 `json:"totalErrorSpans"`
ServiceNameToTotalDurationMap map[string]uint64 `json:"serviceNameToTotalDurationMap"`
SpanIDToSpanNodeMap map[string]*WaterfallSpan `json:"spanIdToSpanNodeMap"`
TraceRoots []*WaterfallSpan `json:"traceRoots"`
HasMissingSpans bool `json:"hasMissingSpans"`
}
func (c *WaterfallCache) Clone() cachetypes.Cacheable {
copyOfServiceNameToTotalDurationMap := make(map[string]uint64)
maps.Copy(copyOfServiceNameToTotalDurationMap, c.ServiceNameToTotalDurationMap)
copyOfSpanIDToSpanNodeMap := make(map[string]*WaterfallSpan)
maps.Copy(copyOfSpanIDToSpanNodeMap, c.SpanIDToSpanNodeMap)
copyOfTraceRoots := make([]*WaterfallSpan, len(c.TraceRoots))
copy(copyOfTraceRoots, c.TraceRoots)
return &WaterfallCache{
StartTime: c.StartTime,
EndTime: c.EndTime,
DurationNano: c.DurationNano,
TotalSpans: c.TotalSpans,
TotalErrorSpans: c.TotalErrorSpans,
ServiceNameToTotalDurationMap: copyOfServiceNameToTotalDurationMap,
SpanIDToSpanNodeMap: copyOfSpanIDToSpanNodeMap,
TraceRoots: copyOfTraceRoots,
HasMissingSpans: c.HasMissingSpans,
}
}
func (c *WaterfallCache) MarshalBinary() (data []byte, err error) {
return json.Marshal(c)
}
func (c *WaterfallCache) UnmarshalBinary(data []byte) error {
return json.Unmarshal(data, c)
}