Compare commits

...

13 Commits

Author SHA1 Message Date
Naman Verma
d7d907f687 test: max parametrising of the unit tests 2026-03-10 16:31:25 +05:30
Naman Verma
76b4549504 test: unit tests 2026-03-10 16:16:53 +05:30
Naman Verma
968a5089ff fix: check for tic when finding remaining keys 2026-03-10 16:16:29 +05:30
Naman Verma
c082bc3d76 chore: also sort by remaining group by keys 2026-03-10 15:43:45 +05:30
Naman Verma
59e0dcc865 chore: move order by ts asc out of all if branches 2026-03-10 15:40:26 +05:30
Naman Verma
89840189ef chore: remove comment 2026-03-10 15:16:09 +05:30
Naman Verma
b64a07db02 fix: limit in where subclause was coming as ? 2026-03-10 15:15:46 +05:30
Naman Verma
38d971b3c9 fix: add partition window when ordering by sum 2026-03-10 15:06:29 +05:30
Naman Verma
f8b266ce05 chore: first draft before testing 2026-03-10 14:55:38 +05:30
Naman Verma
20f7562cbc revert: wrong changes in stmt builder (vv wrong) 2026-03-10 13:36:33 +05:30
Naman Verma
29713964ce Merge branch 'main' into nv/6204 2026-03-10 13:33:26 +05:30
Naman Verma
afb252b4f9 Merge branch 'main' into nv/6204 2026-03-06 08:26:09 +05:30
Naman Verma
c808b4d759 fix: consume order by and limit for metrics in clickhouse query 2026-03-05 09:29:23 +05:30
4 changed files with 189 additions and 119 deletions

View File

@@ -185,22 +185,6 @@ func postProcessMetricQuery(
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
req *qbtypes.QueryRangeRequest,
) *qbtypes.Result {
config := query.Aggregations[0]
spaceAggOrderBy := fmt.Sprintf("%s(%s)", config.SpaceAggregation.StringValue(), config.MetricName)
timeAggOrderBy := fmt.Sprintf("%s(%s)", config.TimeAggregation.StringValue(), config.MetricName)
timeSpaceAggOrderBy := fmt.Sprintf("%s(%s(%s))", config.SpaceAggregation.StringValue(), config.TimeAggregation.StringValue(), config.MetricName)
for idx := range query.Order {
if query.Order[idx].Key.Name == spaceAggOrderBy ||
query.Order[idx].Key.Name == timeAggOrderBy ||
query.Order[idx].Key.Name == timeSpaceAggOrderBy {
query.Order[idx].Key.Name = qbtypes.DefaultOrderByKey
}
}
result = q.applySeriesLimit(result, query.Limit, query.Order)
if len(query.Functions) > 0 {
step := query.StepInterval.Duration.Milliseconds()
functions := q.prepareFillZeroArgsWithStep(query.Functions, req, step)

View File

@@ -132,6 +132,14 @@ func GroupByKeys(keys []qbtypes.GroupByKey) []string {
return k
}
func OrderByKeys(keys []qbtypes.OrderBy) []string {
k := []string{}
for _, key := range keys {
k = append(k, "`"+key.Key.Name+"`")
}
return k
}
func FormatValueForContains(value any) string {
if value == nil {
return ""

View File

@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log/slog"
"strings"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/flagger"
@@ -607,8 +608,73 @@ func (b *MetricQueryStatementBuilder) BuildFinalSelect(
sb.Where(rewrittenExpr)
}
}
sb.OrderBy(querybuilder.GroupByKeys(query.GroupBy)...)
sb.OrderBy("ts")
groupByKeys := querybuilder.GroupByKeys(query.GroupBy)
hasOrder := len(query.Order) > 0
hasLimit := query.Limit > 0
hasGroupBy := len(groupByKeys) > 0
if hasOrder && hasLimit {
// order by with limit: add WHERE subquery to restrict to top N distinct key combinations
orderByKeys := querybuilder.OrderByKeys(query.Order)
var orderByClauses []string
for _, o := range query.Order {
orderByClauses = append(orderByClauses, fmt.Sprintf("`%s` %s", o.Key.Name, o.Direction.StringValue()))
}
subSb := sqlbuilder.NewSelectBuilder()
subSb.Select(fmt.Sprintf("DISTINCT %s", orderByKeys[0]))
if len(orderByKeys) > 1 {
subSb.SelectMore(orderByKeys[1:]...)
}
subSb.From("__spatial_aggregation_cte")
subSb.OrderBy(orderByClauses...)
subQ, _ := subSb.BuildWithFlavor(sqlbuilder.ClickHouse)
subQ = fmt.Sprintf("%s LIMIT %d", subQ, query.Limit)
sb.Where(fmt.Sprintf("(%s) IN (%s)", strings.Join(orderByKeys, ", "), subQ))
sb.OrderBy(orderByClauses...)
} else if hasOrder {
// order by without limit: apply order by clauses directly
for _, o := range query.Order {
key := o.Key.Name
if strings.Contains(key, query.Aggregations[0].MetricName) {
sb.OrderBy(fmt.Sprintf("avg(value) OVER (PARTITION BY %s) %s", strings.Join(groupByKeys, ", "), o.Direction.StringValue()))
continue
}
sb.OrderBy(fmt.Sprintf("`%s` %s", o.Key.Name, o.Direction.StringValue()))
}
} else if hasLimit && hasGroupBy {
// limit without order by: default ordering by avg(value)
subSb := sqlbuilder.NewSelectBuilder()
subSb.Select(groupByKeys[0])
if len(groupByKeys) > 1 {
subSb.SelectMore(groupByKeys[1:]...)
}
subSb.From("__spatial_aggregation_cte")
subSb.GroupBy(groupByKeys...)
subSb.OrderBy("avg(value)")
subQ, _ := subSb.BuildWithFlavor(sqlbuilder.ClickHouse)
subQ = fmt.Sprintf("%s LIMIT %d", subQ, query.Limit)
sb.Where(fmt.Sprintf("(%s) IN (%s)", strings.Join(groupByKeys, ", "), subQ))
} else if hasGroupBy {
// grouping without order by or limit: sort by avg(value) DESC with labels as tiebreakers
sb.OrderBy(fmt.Sprintf("avg(value) OVER (PARTITION BY %s) DESC", strings.Join(groupByKeys, ", ")))
}
// add any group-by keys not already in the order-by as tiebreakers
orderKeySet := make(map[string]struct{})
for _, o := range query.Order {
orderKeySet[fmt.Sprintf("`%s`", o.Key.Name)] = struct{}{}
}
for _, g := range groupByKeys {
if _, exists := orderKeySet[g]; !exists {
sb.OrderBy(g)
}
}
sb.OrderBy("ts ASC")
if metricType == metrictypes.HistogramType && spaceAgg == metrictypes.SpaceAggregationCount && query.Aggregations[0].ComparisonSpaceAggregationParam == nil {
sb.OrderBy("toFloat64(le)")
}

View File

@@ -15,16 +15,17 @@ import (
)
func TestStatementBuilder(t *testing.T) {
cases := []struct {
name string
requestType qbtypes.RequestType
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]
expected qbtypes.Statement
expectedErr error
}{
type baseQuery struct {
name string
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]
orderKey string
args []any
cte string
}
bases := []baseQuery{
{
name: "test_cumulative_rate_sum",
requestType: qbtypes.RequestTypeTimeSeries,
name: "cumulative_rate_sum",
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Signal: telemetrytypes.SignalMetrics,
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
@@ -40,24 +41,16 @@ func TestStatementBuilder(t *testing.T) {
Filter: &qbtypes.Filter{
Expression: "service.name = 'cartservice'",
},
Limit: 10,
GroupBy: []qbtypes.GroupByKey{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
},
},
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
},
},
expected: qbtypes.Statement{
Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name`, ts",
Args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "cartservice", "signoz_calls_total", uint64(1747947360000), uint64(1747983420000), 0},
},
expectedErr: nil,
orderKey: "service.name",
args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "cartservice", "signoz_calls_total", uint64(1747947360000), uint64(1747983420000), 0},
cte: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`)",
},
{
name: "test_cumulative_rate_sum_with_mat_column",
requestType: qbtypes.RequestTypeTimeSeries,
name: "cumulative_rate_sum_with_mat_column",
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Signal: telemetrytypes.SignalMetrics,
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
@@ -73,24 +66,16 @@ func TestStatementBuilder(t *testing.T) {
Filter: &qbtypes.Filter{
Expression: "materialized.key.name REGEXP 'cartservice' OR service.name = 'cartservice'",
},
Limit: 10,
GroupBy: []qbtypes.GroupByKey{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
},
},
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
},
},
expected: qbtypes.Statement{
Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND (match(JSONExtractString(labels, 'materialized.key.name'), ?) OR JSONExtractString(labels, 'service.name') = ?) GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name`, ts",
Args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "cartservice", "cartservice", "signoz_calls_total", uint64(1747947360000), uint64(1747983420000), 0},
},
expectedErr: nil,
orderKey: "service.name",
args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "cartservice", "cartservice", "signoz_calls_total", uint64(1747947360000), uint64(1747983420000), 0},
cte: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND (match(JSONExtractString(labels, 'materialized.key.name'), ?) OR JSONExtractString(labels, 'service.name') = ?) GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`)",
},
{
name: "test_delta_rate_sum",
requestType: qbtypes.RequestTypeTimeSeries,
name: "delta_rate_sum",
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Signal: telemetrytypes.SignalMetrics,
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
@@ -106,24 +91,16 @@ func TestStatementBuilder(t *testing.T) {
Filter: &qbtypes.Filter{
Expression: "service.name = 'cartservice'",
},
Limit: 10,
GroupBy: []qbtypes.GroupByKey{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
},
},
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
},
},
expected: qbtypes.Statement{
Query: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, sum(value)/30 AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name`, ts",
Args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "delta", false, "cartservice", "signoz_calls_total", uint64(1747947390000), uint64(1747983420000)},
},
expectedErr: nil,
orderKey: "service.name",
args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "delta", false, "cartservice", "signoz_calls_total", uint64(1747947390000), uint64(1747983420000)},
cte: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, sum(value)/30 AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ts, `service.name`)",
},
{
name: "test_histogram_percentile1",
requestType: qbtypes.RequestTypeTimeSeries,
name: "histogram_percentile1",
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Signal: telemetrytypes.SignalMetrics,
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
@@ -139,24 +116,16 @@ func TestStatementBuilder(t *testing.T) {
Filter: &qbtypes.Filter{
Expression: "service.name = 'cartservice'",
},
Limit: 10,
GroupBy: []qbtypes.GroupByKey{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
},
},
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
},
},
expected: qbtypes.Statement{
Query: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, `le`, sum(value)/30 AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name`, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`, `le`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ts, `service.name`, `le`) SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts ORDER BY `service.name`, ts",
Args: []any{"signoz_latency", uint64(1747936800000), uint64(1747983420000), "delta", false, "cartservice", "signoz_latency", uint64(1747947390000), uint64(1747983420000)},
},
expectedErr: nil,
orderKey: "service.name",
args: []any{"signoz_latency", uint64(1747936800000), uint64(1747983420000), "delta", false, "cartservice", "signoz_latency", uint64(1747947390000), uint64(1747983420000)},
cte: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, `le`, sum(value)/30 AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name`, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`, `le`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ts, `service.name`, `le`)",
},
{
name: "test_gauge_avg_sum",
requestType: qbtypes.RequestTypeTimeSeries,
name: "gauge_avg_sum",
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Signal: telemetrytypes.SignalMetrics,
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
@@ -172,24 +141,16 @@ func TestStatementBuilder(t *testing.T) {
Filter: &qbtypes.Filter{
Expression: "host.name = 'big-data-node-1'",
},
Limit: 10,
GroupBy: []qbtypes.GroupByKey{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "host.name",
},
},
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "host.name"}},
},
},
expected: qbtypes.Statement{
Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `host.name`, avg(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'host.name') AS `host.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'host.name') = ? GROUP BY fingerprint, `host.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `host.name` ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `host.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `host.name`) SELECT * FROM __spatial_aggregation_cte ORDER BY `host.name`, ts",
Args: []any{"system.memory.usage", uint64(1747936800000), uint64(1747983420000), "unspecified", false, "big-data-node-1", "system.memory.usage", uint64(1747947390000), uint64(1747983420000), 0},
},
expectedErr: nil,
orderKey: "host.name",
args: []any{"system.memory.usage", uint64(1747936800000), uint64(1747983420000), "unspecified", false, "big-data-node-1", "system.memory.usage", uint64(1747947390000), uint64(1747983420000), 0},
cte: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `host.name`, avg(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'host.name') AS `host.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'host.name') = ? GROUP BY fingerprint, `host.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `host.name` ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `host.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `host.name`)",
},
{
name: "test_histogram_percentile2",
requestType: qbtypes.RequestTypeTimeSeries,
name: "histogram_percentile2",
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Signal: telemetrytypes.SignalMetrics,
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
@@ -202,23 +163,69 @@ func TestStatementBuilder(t *testing.T) {
SpaceAggregation: metrictypes.SpaceAggregationPercentile95,
},
},
Limit: 10,
GroupBy: []qbtypes.GroupByKey{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
},
},
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
},
},
expected: qbtypes.Statement{
Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, `le`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, `le`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name`, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? GROUP BY fingerprint, `service.name`, `le`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name`, `le` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, `le`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`, `le`) SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts ORDER BY `service.name`, ts",
Args: []any{"http_server_duration_bucket", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "http_server_duration_bucket", uint64(1747947360000), uint64(1747983420000), 0},
},
expectedErr: nil,
orderKey: "service.name",
args: []any{"http_server_duration_bucket", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "http_server_duration_bucket", uint64(1747947360000), uint64(1747983420000), 0},
cte: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, `le`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, `le`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name`, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? GROUP BY fingerprint, `service.name`, `le`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name`, `le` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, `le`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`, `le`)",
},
}
type variant struct {
name string
limit int
hasOrder bool
}
variants := []variant{
{"with_limits", 10, false},
{"without_limits", 0, false},
{"with_order_by", 0, true},
{"with_order_by_and_limits", 10, true},
}
// expectedFinalSelects maps "base/variant" to the final SELECT portion after the CTE.
// The full expected query is: base.cte + expectedFinalSelects[name]
expectedFinalSelects := map[string]string{
// cumulative_rate_sum
"cumulative_rate_sum/with_limits": " SELECT * FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT `service.name` FROM __spatial_aggregation_cte GROUP BY `service.name` ORDER BY avg(value) LIMIT 10) ORDER BY `service.name`, ts ASC",
"cumulative_rate_sum/without_limits": " SELECT * FROM __spatial_aggregation_cte ORDER BY avg(value) OVER (PARTITION BY `service.name`) DESC, `service.name`, ts ASC",
"cumulative_rate_sum/with_order_by": " SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name` asc, ts ASC",
"cumulative_rate_sum/with_order_by_and_limits": " SELECT * FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT DISTINCT `service.name` FROM __spatial_aggregation_cte ORDER BY `service.name` asc LIMIT 10) ORDER BY `service.name` asc, ts ASC",
// cumulative_rate_sum_with_mat_column
"cumulative_rate_sum_with_mat_column/with_limits": " SELECT * FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT `service.name` FROM __spatial_aggregation_cte GROUP BY `service.name` ORDER BY avg(value) LIMIT 10) ORDER BY `service.name`, ts ASC",
"cumulative_rate_sum_with_mat_column/without_limits": " SELECT * FROM __spatial_aggregation_cte ORDER BY avg(value) OVER (PARTITION BY `service.name`) DESC, `service.name`, ts ASC",
"cumulative_rate_sum_with_mat_column/with_order_by": " SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name` asc, ts ASC",
"cumulative_rate_sum_with_mat_column/with_order_by_and_limits": " SELECT * FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT DISTINCT `service.name` FROM __spatial_aggregation_cte ORDER BY `service.name` asc LIMIT 10) ORDER BY `service.name` asc, ts ASC",
// delta_rate_sum
"delta_rate_sum/with_limits": " SELECT * FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT `service.name` FROM __spatial_aggregation_cte GROUP BY `service.name` ORDER BY avg(value) LIMIT 10) ORDER BY `service.name`, ts ASC",
"delta_rate_sum/without_limits": " SELECT * FROM __spatial_aggregation_cte ORDER BY avg(value) OVER (PARTITION BY `service.name`) DESC, `service.name`, ts ASC",
"delta_rate_sum/with_order_by": " SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name` asc, ts ASC",
"delta_rate_sum/with_order_by_and_limits": " SELECT * FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT DISTINCT `service.name` FROM __spatial_aggregation_cte ORDER BY `service.name` asc LIMIT 10) ORDER BY `service.name` asc, ts ASC",
// histogram_percentile1
"histogram_percentile1/with_limits": " SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT `service.name` FROM __spatial_aggregation_cte GROUP BY `service.name` ORDER BY avg(value) LIMIT 10) GROUP BY `service.name`, ts ORDER BY `service.name`, ts ASC",
"histogram_percentile1/without_limits": " SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts ORDER BY avg(value) OVER (PARTITION BY `service.name`) DESC, `service.name`, ts ASC",
"histogram_percentile1/with_order_by": " SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts ORDER BY `service.name` asc, ts ASC",
"histogram_percentile1/with_order_by_and_limits": " SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT DISTINCT `service.name` FROM __spatial_aggregation_cte ORDER BY `service.name` asc LIMIT 10) GROUP BY `service.name`, ts ORDER BY `service.name` asc, ts ASC",
// gauge_avg_sum
"gauge_avg_sum/with_limits": " SELECT * FROM __spatial_aggregation_cte WHERE (`host.name`) IN (SELECT `host.name` FROM __spatial_aggregation_cte GROUP BY `host.name` ORDER BY avg(value) LIMIT 10) ORDER BY `host.name`, ts ASC",
"gauge_avg_sum/without_limits": " SELECT * FROM __spatial_aggregation_cte ORDER BY avg(value) OVER (PARTITION BY `host.name`) DESC, `host.name`, ts ASC",
"gauge_avg_sum/with_order_by": " SELECT * FROM __spatial_aggregation_cte ORDER BY `host.name` asc, ts ASC",
"gauge_avg_sum/with_order_by_and_limits": " SELECT * FROM __spatial_aggregation_cte WHERE (`host.name`) IN (SELECT DISTINCT `host.name` FROM __spatial_aggregation_cte ORDER BY `host.name` asc LIMIT 10) ORDER BY `host.name` asc, ts ASC",
// histogram_percentile2
"histogram_percentile2/with_limits": " SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT `service.name` FROM __spatial_aggregation_cte GROUP BY `service.name` ORDER BY avg(value) LIMIT 10) GROUP BY `service.name`, ts ORDER BY `service.name`, ts ASC",
"histogram_percentile2/without_limits": " SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts ORDER BY avg(value) OVER (PARTITION BY `service.name`) DESC, `service.name`, ts ASC",
"histogram_percentile2/with_order_by": " SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts ORDER BY `service.name` asc, ts ASC",
"histogram_percentile2/with_order_by_and_limits": " SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT DISTINCT `service.name` FROM __spatial_aggregation_cte ORDER BY `service.name` asc LIMIT 10) GROUP BY `service.name`, ts ORDER BY `service.name` asc, ts ASC",
}
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
@@ -227,15 +234,13 @@ func TestStatementBuilder(t *testing.T) {
t.Fatalf("failed to load field keys: %v", err)
}
mockMetadataStore.KeysMap = keys
// NOTE: LoadFieldKeysFromJSON doesn't set Materialized field
// for keys, so we have to set it manually here for testing
if _, ok := mockMetadataStore.KeysMap["materialized.key.name"]; ok {
if len(mockMetadataStore.KeysMap["materialized.key.name"]) > 0 {
mockMetadataStore.KeysMap["materialized.key.name"][0].Materialized = true
}
}
flagger, err := flagger.New(context.Background(), instrumentationtest.New().ToProviderSettings(), flagger.Config{}, flagger.MustNewRegistry())
fl, err := flagger.New(context.Background(), instrumentationtest.New().ToProviderSettings(), flagger.Config{}, flagger.MustNewRegistry())
if err != nil {
t.Fatalf("failed to create flagger: %v", err)
}
@@ -245,23 +250,30 @@ func TestStatementBuilder(t *testing.T) {
mockMetadataStore,
fm,
cb,
flagger,
fl,
)
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
for _, b := range bases {
for _, v := range variants {
name := b.name + "/" + v.name
t.Run(name, func(t *testing.T) {
q := b.query
q.Limit = v.limit
if v.hasOrder {
q.Order = []qbtypes.OrderBy{
{
Key: qbtypes.OrderByKey{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: b.orderKey}},
Direction: qbtypes.OrderDirectionAsc,
},
}
}
q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query, nil)
result, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, qbtypes.RequestTypeTimeSeries, q, nil)
if c.expectedErr != nil {
require.Error(t, err)
require.Contains(t, err.Error(), c.expectedErr.Error())
} else {
require.NoError(t, err)
require.Equal(t, c.expected.Query, q.Query)
require.Equal(t, c.expected.Args, q.Args)
require.Equal(t, c.expected.Warnings, q.Warnings)
}
})
require.Equal(t, b.cte+expectedFinalSelects[name], result.Query)
require.Equal(t, b.args, result.Args)
})
}
}
}