mirror of
https://github.com/SigNoz/signoz.git
synced 2026-04-27 14:10:30 +01:00
Compare commits
1 Commits
nv/10890
...
issue_4785
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
afee062eaf |
@@ -138,14 +138,7 @@ func (m *module) listMeterMetrics(ctx context.Context, params *metricsexplorerty
|
||||
|
||||
func (m *module) listMetrics(ctx context.Context, orgID valuer.UUID, params *metricsexplorertypes.ListMetricsParams) (*metricsexplorertypes.ListMetricsResponse, error) {
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
sb.Select(
|
||||
"metric_name",
|
||||
"anyLast(description) AS description",
|
||||
"anyLast(type) AS metric_type",
|
||||
"argMax(unit, unix_milli) AS metric_unit",
|
||||
"anyLast(temporality) AS temporality",
|
||||
"anyLast(is_monotonic) AS is_monotonic",
|
||||
)
|
||||
sb.Select("DISTINCT metric_name")
|
||||
|
||||
if params.Start != nil && params.End != nil {
|
||||
start, end, distributedTsTable, _ := telemetrymetrics.WhichTSTableToUse(uint64(*params.Start), uint64(*params.End), nil)
|
||||
@@ -164,7 +157,6 @@ func (m *module) listMetrics(ctx context.Context, orgID valuer.UUID, params *met
|
||||
sb.Where(sb.Like("lower(metric_name)", fmt.Sprintf("%%%s%%", searchLower)))
|
||||
}
|
||||
|
||||
sb.GroupBy("metric_name")
|
||||
sb.OrderBy("metric_name ASC")
|
||||
sb.Limit(params.Limit)
|
||||
|
||||
@@ -178,47 +170,43 @@ func (m *module) listMetrics(ctx context.Context, orgID valuer.UUID, params *met
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var metrics []metricsexplorertypes.ListMetric
|
||||
var metricNames []string
|
||||
metricNames := make([]string, 0)
|
||||
for rows.Next() {
|
||||
var metric metricsexplorertypes.ListMetric
|
||||
if err := rows.Scan(
|
||||
&metric.MetricName,
|
||||
&metric.Description,
|
||||
&metric.MetricType,
|
||||
&metric.MetricUnit,
|
||||
&metric.Temporality,
|
||||
&metric.IsMonotonic,
|
||||
); err != nil {
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to scan metric")
|
||||
var name string
|
||||
if err := rows.Scan(&name); err != nil {
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to scan metric name")
|
||||
}
|
||||
metrics = append(metrics, metric)
|
||||
metricNames = append(metricNames, metric.MetricName)
|
||||
metricNames = append(metricNames, name)
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "error iterating metrics")
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "error iterating metric names")
|
||||
}
|
||||
|
||||
if len(metrics) == 0 {
|
||||
if len(metricNames) == 0 {
|
||||
return &metricsexplorertypes.ListMetricsResponse{
|
||||
Metrics: []metricsexplorertypes.ListMetric{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Overlay any user-updated metadata on top of the timeseries metadata.
|
||||
updatedMetadata, err := m.fetchUpdatedMetadata(ctx, orgID, metricNames)
|
||||
metadata, err := m.GetMetricMetadataMulti(ctx, orgID, metricNames)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i := range metrics {
|
||||
if meta, ok := updatedMetadata[metrics[i].MetricName]; ok && meta != nil {
|
||||
metrics[i].Description = meta.Description
|
||||
metrics[i].MetricType = meta.MetricType
|
||||
metrics[i].MetricUnit = meta.MetricUnit
|
||||
metrics[i].Temporality = meta.Temporality
|
||||
metrics[i].IsMonotonic = meta.IsMonotonic
|
||||
|
||||
metrics := make([]metricsexplorertypes.ListMetric, 0, len(metricNames))
|
||||
for _, name := range metricNames {
|
||||
metric := metricsexplorertypes.ListMetric{
|
||||
MetricName: name,
|
||||
}
|
||||
if meta, ok := metadata[name]; ok && meta != nil {
|
||||
metric.Description = meta.Description
|
||||
metric.MetricType = meta.MetricType
|
||||
metric.MetricUnit = meta.MetricUnit
|
||||
metric.Temporality = meta.Temporality
|
||||
metric.IsMonotonic = meta.IsMonotonic
|
||||
}
|
||||
metrics = append(metrics, metric)
|
||||
}
|
||||
|
||||
return &metricsexplorertypes.ListMetricsResponse{
|
||||
@@ -255,18 +243,19 @@ func (m *module) GetStats(ctx context.Context, orgID valuer.UUID, req *metricsex
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Overlay any user-updated metadata on top of the timeseries metadata
|
||||
// that was already fetched in the combined query.
|
||||
// Get metadata for all metrics
|
||||
metricNames := make([]string, len(metricStats))
|
||||
for i := range metricStats {
|
||||
metricNames[i] = metricStats[i].MetricName
|
||||
}
|
||||
|
||||
updatedMetadata, err := m.fetchUpdatedMetadata(ctx, orgID, metricNames)
|
||||
metadata, err := m.GetMetricMetadataMulti(ctx, orgID, metricNames)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
enrichStatsWithMetadata(metricStats, updatedMetadata)
|
||||
|
||||
// Enrich stats with metadata
|
||||
enrichStatsWithMetadata(metricStats, metadata)
|
||||
|
||||
return &metricsexplorertypes.StatsResponse{
|
||||
Metrics: metricStats,
|
||||
@@ -995,14 +984,11 @@ func (m *module) fetchMetricsStatsWithSamples(
|
||||
samplesTable := telemetrymetrics.WhichSamplesTableToUse(uint64(req.Start), uint64(req.End), metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, nil)
|
||||
countExp := telemetrymetrics.CountExpressionForSamplesTable(samplesTable)
|
||||
|
||||
// Timeseries counts and metadata per metric.
|
||||
// Timeseries counts per metric
|
||||
tsSB := sqlbuilder.NewSelectBuilder()
|
||||
tsSB.Select(
|
||||
"metric_name",
|
||||
"uniq(fingerprint) AS timeseries",
|
||||
"anyLast(description) AS description",
|
||||
"anyLast(type) AS metric_type",
|
||||
"argMax(unit, unix_milli) AS metric_unit",
|
||||
)
|
||||
tsSB.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, distributedTsTable))
|
||||
tsSB.Where(tsSB.Between("unix_milli", start, end))
|
||||
@@ -1050,9 +1036,6 @@ func (m *module) fetchMetricsStatsWithSamples(
|
||||
"COALESCE(ts.timeseries, 0) AS timeseries",
|
||||
"COALESCE(s.samples, 0) AS samples",
|
||||
"COUNT(*) OVER() AS total",
|
||||
"ts.description AS description",
|
||||
"ts.metric_type AS metric_type",
|
||||
"ts.metric_unit AS metric_unit",
|
||||
)
|
||||
finalSB.From("__time_series_counts ts")
|
||||
finalSB.JoinWithOption(sqlbuilder.FullOuterJoin, "__sample_counts s", "ts.metric_name = s.metric_name")
|
||||
@@ -1088,7 +1071,7 @@ func (m *module) fetchMetricsStatsWithSamples(
|
||||
metricStat metricsexplorertypes.Stat
|
||||
rowTotal uint64
|
||||
)
|
||||
if err := rows.Scan(&metricStat.MetricName, &metricStat.TimeSeries, &metricStat.Samples, &rowTotal, &metricStat.Description, &metricStat.MetricType, &metricStat.MetricUnit); err != nil {
|
||||
if err := rows.Scan(&metricStat.MetricName, &metricStat.TimeSeries, &metricStat.Samples, &rowTotal); err != nil {
|
||||
return nil, 0, errors.WrapInternalf(err, errors.CodeInternal, "failed to scan metrics stats row")
|
||||
}
|
||||
metricStats = append(metricStats, metricStat)
|
||||
|
||||
@@ -627,19 +627,29 @@ func convertTimeSeriesDataToScalar(tsData *qbtypes.TimeSeriesData, queryName str
|
||||
return &qbtypes.ScalarData{QueryName: queryName}
|
||||
}
|
||||
|
||||
columns := []*qbtypes.ColumnDescriptor{}
|
||||
|
||||
// Add group columns from first series
|
||||
if len(tsData.Aggregations[0].Series) > 0 {
|
||||
for _, label := range tsData.Aggregations[0].Series[0].Labels {
|
||||
columns = append(columns, &qbtypes.ColumnDescriptor{
|
||||
TelemetryFieldKey: label.Key,
|
||||
QueryName: queryName,
|
||||
Type: qbtypes.ColumnTypeGroup,
|
||||
})
|
||||
// Series can have ragged label sets; build the column schema from the
|
||||
// union of all label keys (first-seen order) and fill rows by key lookup.
|
||||
keyOrder := []telemetrytypes.TelemetryFieldKey{}
|
||||
keyIndex := map[string]int{}
|
||||
for _, series := range tsData.Aggregations[0].Series {
|
||||
for _, label := range series.Labels {
|
||||
if _, ok := keyIndex[label.Key.Name]; ok {
|
||||
continue
|
||||
}
|
||||
keyIndex[label.Key.Name] = len(keyOrder)
|
||||
keyOrder = append(keyOrder, label.Key)
|
||||
}
|
||||
}
|
||||
|
||||
columns := make([]*qbtypes.ColumnDescriptor, 0, len(keyOrder)+len(tsData.Aggregations))
|
||||
for _, key := range keyOrder {
|
||||
columns = append(columns, &qbtypes.ColumnDescriptor{
|
||||
TelemetryFieldKey: key,
|
||||
QueryName: queryName,
|
||||
Type: qbtypes.ColumnTypeGroup,
|
||||
})
|
||||
}
|
||||
|
||||
// Add aggregation columns
|
||||
for _, agg := range tsData.Aggregations {
|
||||
name := agg.Alias
|
||||
@@ -655,18 +665,18 @@ func convertTimeSeriesDataToScalar(tsData *qbtypes.TimeSeriesData, queryName str
|
||||
})
|
||||
}
|
||||
|
||||
// Build rows
|
||||
// Build rows.
|
||||
groupColCount := len(keyOrder)
|
||||
data := [][]any{}
|
||||
for seriesIdx, series := range tsData.Aggregations[0].Series {
|
||||
row := make([]any, len(columns))
|
||||
|
||||
// Add group values
|
||||
for i, label := range series.Labels {
|
||||
row[i] = label.Value
|
||||
// Place each label under its key's column (by lookup, not index).
|
||||
for _, label := range series.Labels {
|
||||
row[keyIndex[label.Key.Name]] = label.Value
|
||||
}
|
||||
|
||||
// Add aggregation values (last value)
|
||||
groupColCount := len(series.Labels)
|
||||
for aggIdx, agg := range tsData.Aggregations {
|
||||
if seriesIdx < len(agg.Series) && len(agg.Series[seriesIdx].Values) > 0 {
|
||||
lastValue := agg.Series[seriesIdx].Values[len(agg.Series[seriesIdx].Values)-1].Value
|
||||
|
||||
53
pkg/querier/postprocess_test.go
Normal file
53
pkg/querier/postprocess_test.go
Normal file
@@ -0,0 +1,53 @@
|
||||
package querier
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
)
|
||||
|
||||
// Multiple series with different number of labels, shouldn't panic and should align labels correctly.
|
||||
func TestConvertTimeSeriesDataToScalar_RaggedLabels(t *testing.T) {
|
||||
label := func(name string, value any) *qbtypes.Label {
|
||||
return &qbtypes.Label{
|
||||
Key: telemetrytypes.TelemetryFieldKey{Name: name},
|
||||
Value: value,
|
||||
}
|
||||
}
|
||||
series := func(labels []*qbtypes.Label, value float64) *qbtypes.TimeSeries {
|
||||
return &qbtypes.TimeSeries{
|
||||
Labels: labels,
|
||||
Values: []*qbtypes.TimeSeriesValue{{Timestamp: 1, Value: value}},
|
||||
}
|
||||
}
|
||||
|
||||
tsData := &qbtypes.TimeSeriesData{
|
||||
QueryName: "A",
|
||||
Aggregations: []*qbtypes.AggregationBucket{{
|
||||
Index: 0,
|
||||
Series: []*qbtypes.TimeSeries{
|
||||
series([]*qbtypes.Label{label("label_1", "orphan-0")}, 20),
|
||||
series([]*qbtypes.Label{label("label_1", "box-0"), label("label_2", "rpc-0")}, 10),
|
||||
},
|
||||
}},
|
||||
}
|
||||
|
||||
var sd *qbtypes.ScalarData
|
||||
require.NotPanics(t, func() {
|
||||
sd = convertTimeSeriesDataToScalar(tsData, "A")
|
||||
})
|
||||
|
||||
require.NotNil(t, sd)
|
||||
require.Len(t, sd.Columns, 3)
|
||||
assert.Equal(t, "label_1", sd.Columns[0].Name)
|
||||
assert.Equal(t, "label_2", sd.Columns[1].Name)
|
||||
assert.Equal(t, "__result_0", sd.Columns[2].Name)
|
||||
|
||||
require.Len(t, sd.Data, 2)
|
||||
assert.Equal(t, []any{"orphan-0", nil, 20.0}, sd.Data[0])
|
||||
assert.Equal(t, []any{"box-0", "rpc-0", 10.0}, sd.Data[1])
|
||||
}
|
||||
Reference in New Issue
Block a user