Compare commits

..

2 Commits

Author SHA1 Message Date
nityanandagohain
9064db9c84 fix: cleanup 2026-03-18 20:12:29 +05:30
nityanandagohain
095becaf93 fix: updated implementation for using trace summary in list view 2026-03-18 19:33:10 +05:30
20 changed files with 776 additions and 4003 deletions

View File

@@ -190,7 +190,7 @@ services:
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
signoz:
!!merge <<: *db-depend
image: signoz/signoz:v0.116.0
image: signoz/signoz:v0.115.0
ports:
- "8080:8080" # signoz port
# - "6060:6060" # pprof port

View File

@@ -117,7 +117,7 @@ services:
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
signoz:
!!merge <<: *db-depend
image: signoz/signoz:v0.116.0
image: signoz/signoz:v0.115.0
ports:
- "8080:8080" # signoz port
volumes:

View File

@@ -181,7 +181,7 @@ services:
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
signoz:
!!merge <<: *db-depend
image: signoz/signoz:${VERSION:-v0.116.0}
image: signoz/signoz:${VERSION:-v0.115.0}
container_name: signoz
ports:
- "8080:8080" # signoz port

View File

@@ -109,7 +109,7 @@ services:
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
signoz:
!!merge <<: *db-depend
image: signoz/signoz:${VERSION:-v0.116.0}
image: signoz/signoz:${VERSION:-v0.115.0}
container_name: signoz
ports:
- "8080:8080" # signoz port

View File

@@ -361,6 +361,10 @@ func (q *builderQuery[T]) executeWindowList(ctx context.Context) (*qbtypes.Resul
if err != nil {
return nil, err
}
// skip = true means we have indentified from the query itself that we don't need to execute this bucket
if stmt.Skip {
continue
}
warnings = stmt.Warnings
warningsDocURL = stmt.WarningsDocURL
// Execute with proper context for partial value detection

View File

@@ -69,7 +69,6 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
key string // deterministic join of label values
}
seriesMap := map[sKey]*qbtypes.TimeSeries{}
var keyOrder []sKey // preserves ClickHouse row-arrival order
stepMs := uint64(step.Duration.Milliseconds())
@@ -220,7 +219,6 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
if !ok {
series = &qbtypes.TimeSeries{Labels: lblObjs}
seriesMap[key] = series
keyOrder = append(keyOrder, key)
}
series.Values = append(series.Values, &qbtypes.TimeSeriesValue{
Timestamp: ts,
@@ -252,8 +250,8 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
Alias: "__result_" + strconv.Itoa(i),
}
}
for _, k := range keyOrder {
buckets[k.agg].Series = append(buckets[k.agg].Series, seriesMap[k])
for k, s := range seriesMap {
buckets[k.agg].Series = append(buckets[k.agg].Series, s)
}
var nonEmpty []*qbtypes.AggregationBucket

View File

@@ -185,6 +185,22 @@ func postProcessMetricQuery(
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
req *qbtypes.QueryRangeRequest,
) *qbtypes.Result {
config := query.Aggregations[0]
spaceAggOrderBy := fmt.Sprintf("%s(%s)", config.SpaceAggregation.StringValue(), config.MetricName)
timeAggOrderBy := fmt.Sprintf("%s(%s)", config.TimeAggregation.StringValue(), config.MetricName)
timeSpaceAggOrderBy := fmt.Sprintf("%s(%s(%s))", config.SpaceAggregation.StringValue(), config.TimeAggregation.StringValue(), config.MetricName)
for idx := range query.Order {
if query.Order[idx].Key.Name == spaceAggOrderBy ||
query.Order[idx].Key.Name == timeAggOrderBy ||
query.Order[idx].Key.Name == timeSpaceAggOrderBy {
query.Order[idx].Key.Name = qbtypes.DefaultOrderByKey
}
}
result = q.applySeriesLimit(result, query.Limit, query.Order)
if len(query.Functions) > 0 {
step := query.StepInterval.Duration.Milliseconds()
functions := q.prepareFillZeroArgsWithStep(query.Functions, req, step)

View File

@@ -51,7 +51,7 @@ func TestStatementBuilder(t *testing.T) {
},
},
expected: qbtypes.Statement{
Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'service.name') AS `service.name`, max(value) AS per_series_value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'service.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT `service.name` FROM __spatial_aggregation_cte GROUP BY `service.name` ORDER BY avg(value) DESC LIMIT 10) ORDER BY avg(value) OVER (PARTITION BY `service.name`) DESC, `service.name`, ts ASC",
Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'service.name') AS `service.name`, max(value) AS per_series_value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'service.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name`, ts",
Args: []any{"signoz_calls_total", uint64(1747785600000), uint64(1747983420000), "cartservice", "cumulative", 0},
},
expectedErr: nil,
@@ -84,7 +84,7 @@ func TestStatementBuilder(t *testing.T) {
},
},
expected: qbtypes.Statement{
Query: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'service.name') AS `service.name`, sum(value)/86400 AS value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'service.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT `service.name` FROM __spatial_aggregation_cte GROUP BY `service.name` ORDER BY avg(value) DESC LIMIT 10) ORDER BY avg(value) OVER (PARTITION BY `service.name`) DESC, `service.name`, ts ASC",
Query: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'service.name') AS `service.name`, sum(value)/86400 AS value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'service.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name`, ts",
Args: []any{"signoz_calls_total", uint64(1747872000000), uint64(1747983420000), "cartservice", "delta"},
},
expectedErr: nil,
@@ -117,7 +117,7 @@ func TestStatementBuilder(t *testing.T) {
},
},
expected: qbtypes.Statement{
Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'service.name') AS `service.name`, sum(value)/86400 AS per_series_value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'service.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `service.name`, avg(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT `service.name` FROM __spatial_aggregation_cte GROUP BY `service.name` ORDER BY avg(value) DESC LIMIT 10) ORDER BY avg(value) OVER (PARTITION BY `service.name`) DESC, `service.name`, ts ASC",
Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'service.name') AS `service.name`, sum(value)/86400 AS per_series_value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'service.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `service.name`, avg(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name`, ts",
Args: []any{"signoz_calls_total", uint64(1747872000000), uint64(1747983420000), "cartservice", "delta", 0},
},
expectedErr: nil,
@@ -150,7 +150,7 @@ func TestStatementBuilder(t *testing.T) {
},
},
expected: qbtypes.Statement{
Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'host.name') AS `host.name`, avg(value) AS per_series_value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'host.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY fingerprint, ts, `host.name` ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `host.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `host.name`) SELECT * FROM __spatial_aggregation_cte WHERE (`host.name`) IN (SELECT `host.name` FROM __spatial_aggregation_cte GROUP BY `host.name` ORDER BY avg(value) DESC LIMIT 10) ORDER BY avg(value) OVER (PARTITION BY `host.name`) DESC, `host.name`, ts ASC",
Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'host.name') AS `host.name`, avg(value) AS per_series_value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'host.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY fingerprint, ts, `host.name` ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `host.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `host.name`) SELECT * FROM __spatial_aggregation_cte ORDER BY `host.name`, ts",
Args: []any{"system.memory.usage", uint64(1747872000000), uint64(1747983420000), "big-data-node-1", "unspecified", 0},
},
expectedErr: nil,

View File

@@ -4,7 +4,6 @@ import (
"context"
"fmt"
"log/slog"
"strings"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/flagger"
@@ -540,13 +539,6 @@ func (b *MetricQueryStatementBuilder) buildSpatialAggregationCTE(
return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args
}
func isMetricAggOrderByKey(key string, config qbtypes.MetricAggregation) bool {
spaceAggOrderBy := fmt.Sprintf("%s(%s)", config.SpaceAggregation.StringValue(), config.MetricName)
timeAggOrderBy := fmt.Sprintf("%s(%s)", config.TimeAggregation.StringValue(), config.MetricName)
timeSpaceAggOrderBy := fmt.Sprintf("%s(%s(%s))", config.SpaceAggregation.StringValue(), config.TimeAggregation.StringValue(), config.MetricName)
return key == spaceAggOrderBy || key == timeAggOrderBy || key == timeSpaceAggOrderBy
}
func (b *MetricQueryStatementBuilder) BuildFinalSelect(
cteFragments []string,
cteArgs [][]any,
@@ -554,159 +546,73 @@ func (b *MetricQueryStatementBuilder) BuildFinalSelect(
) (*qbtypes.Statement, error) {
metricType := query.Aggregations[0].Type
spaceAgg := query.Aggregations[0].SpaceAggregation
finalCTE := "__spatial_aggregation_cte"
if metricType == metrictypes.HistogramType {
histogramCTE, histogramCTEArgs, err := b.buildHistogramCTE(query)
if err != nil {
return nil, err
}
cteFragments = append(cteFragments, histogramCTE)
cteArgs = append(cteArgs, histogramCTEArgs)
finalCTE = "__histogram_cte"
}
groupByKeys := querybuilder.GroupByKeys(query.GroupBy)
hasGroupBy := len(groupByKeys) > 0
if hasGroupBy {
cteWithAvgColumn := b.buildCTEWithAvgColumn(query, finalCTE)
cteFragments = append(cteFragments, cteWithAvgColumn)
cteWithGroupRankColumn := b.buildCTEWithGroupRank(query)
cteFragments = append(cteFragments, cteWithGroupRankColumn)
finalCTE = "__with_group_rank_cte"
}
combined := querybuilder.CombineCTEs(cteFragments)
var args []any
for _, a := range cteArgs {
args = append(args, a...)
}
sb := sqlbuilder.NewSelectBuilder()
sb.Select("ts")
if metricType == metrictypes.HistogramType && spaceAgg == metrictypes.SpaceAggregationCount && query.Aggregations[0].ComparisonSpaceAggregationParam == nil {
sb.SelectMore("le")
}
sb.SelectMore(groupByKeys...)
sb.SelectMore("value")
sb.From(finalCTE)
if query.Having != nil && query.Having.Expression != "" {
rewriter := querybuilder.NewHavingExpressionRewriter()
rewrittenExpr := rewriter.RewriteForMetrics(query.Having.Expression, query.Aggregations)
sb.Where(rewrittenExpr)
}
if hasGroupBy {
sb.OrderBy("group_rank")
if query.Limit > 0 {
sb.Where(fmt.Sprintf("group_rank <= %d", query.Limit))
if metricType == metrictypes.HistogramType && spaceAgg.IsPercentile() {
quantile := query.Aggregations[0].SpaceAggregation.Percentile()
sb.Select("ts")
for _, g := range query.GroupBy {
sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
sb.SelectMore(fmt.Sprintf(
"histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) AS value",
quantile,
))
sb.From("__spatial_aggregation_cte")
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
sb.GroupBy("ts")
if query.Having != nil && query.Having.Expression != "" {
rewriter := querybuilder.NewHavingExpressionRewriter()
rewrittenExpr := rewriter.RewriteForMetrics(query.Having.Expression, query.Aggregations)
sb.Having(rewrittenExpr)
}
} else if metricType == metrictypes.HistogramType && spaceAgg == metrictypes.SpaceAggregationCount && query.Aggregations[0].ComparisonSpaceAggregationParam != nil {
sb.Select("ts")
for _, g := range query.GroupBy {
sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
aggQuery, err := AggregationQueryForHistogramCountWithParams(query.Aggregations[0].ComparisonSpaceAggregationParam)
if err != nil {
return nil, err
}
sb.SelectMore(aggQuery)
sb.From("__spatial_aggregation_cte")
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
sb.GroupBy("ts")
if query.Having != nil && query.Having.Expression != "" {
rewriter := querybuilder.NewHavingExpressionRewriter()
rewrittenExpr := rewriter.RewriteForMetrics(query.Having.Expression, query.Aggregations)
sb.Having(rewrittenExpr)
}
} else {
// for count aggregation on histograms with no params, the exact result of spatial aggregation can be sent forward
sb.Select("*")
sb.From("__spatial_aggregation_cte")
if query.Having != nil && query.Having.Expression != "" {
rewriter := querybuilder.NewHavingExpressionRewriter()
rewrittenExpr := rewriter.RewriteForMetrics(query.Having.Expression, query.Aggregations)
sb.Where(rewrittenExpr)
}
}
sb.OrderBy(querybuilder.GroupByKeys(query.GroupBy)...)
sb.OrderBy("ts")
if metricType == metrictypes.HistogramType && spaceAgg == metrictypes.SpaceAggregationCount && query.Aggregations[0].ComparisonSpaceAggregationParam == nil {
sb.OrderBy("toFloat64(le)")
}
sb.OrderBy("ts ASC")
q, a := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
return &qbtypes.Statement{Query: combined + q, Args: append(args, a...)}, nil
}
func (b *MetricQueryStatementBuilder) buildHistogramCTE(
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
) (string, []any, error) {
spaceAgg := query.Aggregations[0].SpaceAggregation
histogramCTEQueryBuilder := sqlbuilder.NewSelectBuilder()
if spaceAgg.IsPercentile() {
histogramCTEQueryBuilder.Select("ts")
for _, g := range query.GroupBy {
histogramCTEQueryBuilder.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
quantile := spaceAgg.Percentile()
histogramCTEQueryBuilder.SelectMore(fmt.Sprintf(
"histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) AS value",
quantile,
))
histogramCTEQueryBuilder.From("__spatial_aggregation_cte")
histogramCTEQueryBuilder.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
histogramCTEQueryBuilder.GroupBy("ts")
} else if spaceAgg == metrictypes.SpaceAggregationCount && query.Aggregations[0].ComparisonSpaceAggregationParam != nil {
histogramCTEQueryBuilder.Select("ts")
for _, g := range query.GroupBy {
histogramCTEQueryBuilder.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
aggQuery, err := AggregationQueryForHistogramCountWithParams(query.Aggregations[0].ComparisonSpaceAggregationParam)
if err != nil {
return "", nil, err
}
histogramCTEQueryBuilder.SelectMore(aggQuery)
histogramCTEQueryBuilder.From("__spatial_aggregation_cte")
histogramCTEQueryBuilder.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
histogramCTEQueryBuilder.GroupBy("ts")
} else {
// for count aggregation on histograms with no params, the exact result of spatial aggregation can be sent forward
histogramCTEQueryBuilder.Select("*")
histogramCTEQueryBuilder.From("__spatial_aggregation_cte")
}
histogramQueryCTE, histogramQueryCTEArgs := histogramCTEQueryBuilder.BuildWithFlavor(sqlbuilder.ClickHouse)
histogramCTE := fmt.Sprintf("__histogram_cte AS (%s)", histogramQueryCTE)
return histogramCTE, histogramQueryCTEArgs, nil
}
/*
this receives a CTE (__spatial_aggregation_cte or __histogram_cte) that has columns ts, value, and a column each for all the group by keys
it creates a CTE (__with_avg_cte) that adds a column avg_val which has the avg value for the group the row belongs in
*/
func (b *MetricQueryStatementBuilder) buildCTEWithAvgColumn(
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
latestCTE string,
) string {
withAvgCTEBuilder := sqlbuilder.NewSelectBuilder()
withAvgCTEBuilder.Select("*")
groupByKeys := querybuilder.GroupByKeys(query.GroupBy)
withAvgCTEBuilder.SelectMore(fmt.Sprintf("avgIf(value, isNaN(value) = 0) OVER (PARTITION BY %s) AS avg_val", strings.Join(groupByKeys, ",")))
withAvgCTEBuilder.From(latestCTE)
withAvgCTEQuery, _ := withAvgCTEBuilder.BuildWithFlavor(sqlbuilder.ClickHouse) // no args so second return param is ignored
withAvgCTE := fmt.Sprintf("__with_avg_cte AS (%s)", withAvgCTEQuery)
return withAvgCTE
}
/*
this receives the __with_avg_cte that has columns ts, value, a column each for all the group by keys, and avg_val which has the avg value for the group the row belongs in
it creates a CTE (__with_group_rank_cte) that adds a column group_rank that ranks each group based on the order by keys (or by avg val if there are none)
*/
func (b *MetricQueryStatementBuilder) buildCTEWithGroupRank(
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
) string {
withGroupByCTEBuilder := sqlbuilder.NewSelectBuilder()
withGroupByCTEBuilder.Select("*")
windowOrder := []string{}
orderedKeys := map[string]struct{}{} // this will be used to add the remaining keys as tie breakers in the end
if len(query.Order) > 0 {
for _, o := range query.Order {
key := o.Key.Name
if isMetricAggOrderByKey(key, query.Aggregations[0]) {
windowOrder = append(windowOrder, fmt.Sprintf("avg_val %s", o.Direction.StringValue()))
} else {
windowOrder = append(windowOrder, fmt.Sprintf("`%s` %s", key, o.Direction.StringValue()))
orderedKeys[fmt.Sprintf("`%s`", key)] = struct{}{}
}
}
} else {
windowOrder = append(windowOrder, "avg_val DESC")
}
groupByKeys := querybuilder.GroupByKeys(query.GroupBy)
for _, gk := range groupByKeys { // keys that haven't been added via order by keys will be added at the end as tie breakers
if _, ok := orderedKeys[gk]; !ok {
windowOrder = append(windowOrder, fmt.Sprintf("%s ASC", gk))
}
}
withGroupByCTEBuilder.SelectMore(fmt.Sprintf("dense_rank() OVER (ORDER BY %s) AS group_rank", strings.Join(windowOrder, ",")))
withGroupByCTEBuilder.From("__with_avg_cte")
withGroupRankCTEQuery, _ := withGroupByCTEBuilder.BuildWithFlavor(sqlbuilder.ClickHouse) // no args so second return param is ignored
withGroupRankCTE := fmt.Sprintf("__with_group_rank_cte AS (%s)", withGroupRankCTEQuery)
return withGroupRankCTE
}

View File

@@ -15,17 +15,16 @@ import (
)
func TestStatementBuilder(t *testing.T) {
type baseQuery struct {
name string
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]
orderKey string
args []any
cte string
}
bases := []baseQuery{
cases := []struct {
name string
requestType qbtypes.RequestType
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]
expected qbtypes.Statement
expectedErr error
}{
{
name: "cumulative_rate_sum",
name: "test_cumulative_rate_sum",
requestType: qbtypes.RequestTypeTimeSeries,
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Signal: telemetrytypes.SignalMetrics,
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
@@ -41,16 +40,24 @@ func TestStatementBuilder(t *testing.T) {
Filter: &qbtypes.Filter{
Expression: "service.name = 'cartservice'",
},
Limit: 10,
GroupBy: []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
},
},
},
},
orderKey: "service.name",
args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "cartservice", "signoz_calls_total", uint64(1747947360000), uint64(1747983420000), 0},
cte: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`)",
expected: qbtypes.Statement{
Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name`, ts",
Args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "cartservice", "signoz_calls_total", uint64(1747947360000), uint64(1747983420000), 0},
},
expectedErr: nil,
},
{
name: "cumulative_rate_sum_with_mat_column",
name: "test_cumulative_rate_sum_with_mat_column",
requestType: qbtypes.RequestTypeTimeSeries,
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Signal: telemetrytypes.SignalMetrics,
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
@@ -66,16 +73,24 @@ func TestStatementBuilder(t *testing.T) {
Filter: &qbtypes.Filter{
Expression: "materialized.key.name REGEXP 'cartservice' OR service.name = 'cartservice'",
},
Limit: 10,
GroupBy: []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
},
},
},
},
orderKey: "service.name",
args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "cartservice", "cartservice", "signoz_calls_total", uint64(1747947360000), uint64(1747983420000), 0},
cte: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND (match(JSONExtractString(labels, 'materialized.key.name'), ?) OR JSONExtractString(labels, 'service.name') = ?) GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`)",
expected: qbtypes.Statement{
Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND (match(JSONExtractString(labels, 'materialized.key.name'), ?) OR JSONExtractString(labels, 'service.name') = ?) GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name`, ts",
Args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "cartservice", "cartservice", "signoz_calls_total", uint64(1747947360000), uint64(1747983420000), 0},
},
expectedErr: nil,
},
{
name: "delta_rate_sum",
name: "test_delta_rate_sum",
requestType: qbtypes.RequestTypeTimeSeries,
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Signal: telemetrytypes.SignalMetrics,
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
@@ -91,16 +106,24 @@ func TestStatementBuilder(t *testing.T) {
Filter: &qbtypes.Filter{
Expression: "service.name = 'cartservice'",
},
Limit: 10,
GroupBy: []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
},
},
},
},
orderKey: "service.name",
args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "delta", false, "cartservice", "signoz_calls_total", uint64(1747947390000), uint64(1747983420000)},
cte: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, sum(value)/30 AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ts, `service.name`)",
expected: qbtypes.Statement{
Query: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, sum(value)/30 AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name`, ts",
Args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "delta", false, "cartservice", "signoz_calls_total", uint64(1747947390000), uint64(1747983420000)},
},
expectedErr: nil,
},
{
name: "histogram_percentile1",
name: "test_histogram_percentile1",
requestType: qbtypes.RequestTypeTimeSeries,
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Signal: telemetrytypes.SignalMetrics,
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
@@ -116,38 +139,24 @@ func TestStatementBuilder(t *testing.T) {
Filter: &qbtypes.Filter{
Expression: "service.name = 'cartservice'",
},
Limit: 10,
GroupBy: []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
},
},
orderKey: "service.name",
args: []any{"signoz_latency", uint64(1747936800000), uint64(1747983420000), "delta", false, "cartservice", "signoz_latency", uint64(1747947390000), uint64(1747983420000)},
cte: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, `le`, sum(value)/30 AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name`, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`, `le`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ts, `service.name`, `le`), __histogram_cte AS (SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts)",
},
{
name: "histogram_percentile2",
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Signal: telemetrytypes.SignalMetrics,
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "http_server_duration_bucket",
Type: metrictypes.HistogramType,
Temporality: metrictypes.Cumulative,
TimeAggregation: metrictypes.TimeAggregationRate,
SpaceAggregation: metrictypes.SpaceAggregationPercentile95,
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
},
},
},
GroupBy: []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
},
},
orderKey: "service.name",
args: []any{"http_server_duration_bucket", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "http_server_duration_bucket", uint64(1747947360000), uint64(1747983420000), 0},
cte: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, `le`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, `le`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name`, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? GROUP BY fingerprint, `service.name`, `le`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name`, `le` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, `le`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`, `le`), __histogram_cte AS (SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts)",
expected: qbtypes.Statement{
Query: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, `le`, sum(value)/30 AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name`, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`, `le`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ts, `service.name`, `le`) SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts ORDER BY `service.name`, ts",
Args: []any{"signoz_latency", uint64(1747936800000), uint64(1747983420000), "delta", false, "cartservice", "signoz_latency", uint64(1747947390000), uint64(1747983420000)},
},
expectedErr: nil,
},
{
name: "gauge_avg_sum",
name: "test_gauge_avg_sum",
requestType: qbtypes.RequestTypeTimeSeries,
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Signal: telemetrytypes.SignalMetrics,
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
@@ -163,81 +172,51 @@ func TestStatementBuilder(t *testing.T) {
Filter: &qbtypes.Filter{
Expression: "host.name = 'big-data-node-1'",
},
Limit: 10,
GroupBy: []qbtypes.GroupByKey{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "host.name"}},
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "host.name",
},
},
},
},
orderKey: "host.name",
args: []any{"system.memory.usage", uint64(1747936800000), uint64(1747983420000), "unspecified", false, "big-data-node-1", "system.memory.usage", uint64(1747947390000), uint64(1747983420000), 0},
cte: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `host.name`, avg(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'host.name') AS `host.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'host.name') = ? GROUP BY fingerprint, `host.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `host.name` ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `host.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `host.name`)",
expected: qbtypes.Statement{
Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `host.name`, avg(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'host.name') AS `host.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'host.name') = ? GROUP BY fingerprint, `host.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `host.name` ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `host.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `host.name`) SELECT * FROM __spatial_aggregation_cte ORDER BY `host.name`, ts",
Args: []any{"system.memory.usage", uint64(1747936800000), uint64(1747983420000), "unspecified", false, "big-data-node-1", "system.memory.usage", uint64(1747947390000), uint64(1747983420000), 0},
},
expectedErr: nil,
},
{
name: "test_histogram_percentile2",
requestType: qbtypes.RequestTypeTimeSeries,
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Signal: telemetrytypes.SignalMetrics,
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "http_server_duration_bucket",
Type: metrictypes.HistogramType,
Temporality: metrictypes.Cumulative,
TimeAggregation: metrictypes.TimeAggregationRate,
SpaceAggregation: metrictypes.SpaceAggregationPercentile95,
},
},
Limit: 10,
GroupBy: []qbtypes.GroupByKey{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
},
},
},
},
expected: qbtypes.Statement{
Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, `le`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, `le`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name`, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? GROUP BY fingerprint, `service.name`, `le`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name`, `le` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, `le`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`, `le`) SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts ORDER BY `service.name`, ts",
Args: []any{"http_server_duration_bucket", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "http_server_duration_bucket", uint64(1747947360000), uint64(1747983420000), 0},
},
expectedErr: nil,
},
}
type variant struct {
name string
limit int
hasOrder bool
}
variants := []variant{
{"with_limits", 10, false},
{"without_limits", 0, false},
{"with_order_by", 0, true},
{"with_order_by_and_limits", 10, true},
}
sumMetricsFinalSelects := map[string]string{
"with_limits": " SELECT * FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT `service.name` FROM __spatial_aggregation_cte GROUP BY `service.name` ORDER BY avg(value) DESC LIMIT 10) ORDER BY avg(value) OVER (PARTITION BY `service.name`) DESC, `service.name`, ts ASC",
"without_limits": " SELECT * FROM __spatial_aggregation_cte ORDER BY avg(value) OVER (PARTITION BY `service.name`) DESC, `service.name`, ts ASC",
"with_order_by": " SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name` asc, ts ASC",
"with_order_by_and_limits": " SELECT * FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT `service.name` FROM __spatial_aggregation_cte GROUP BY `service.name` ORDER BY `service.name` asc LIMIT 10) ORDER BY `service.name` asc, ts ASC",
}
histogramMetricsFinalSelects := map[string]string{
"with_limits": " SELECT * FROM __histogram_cte WHERE (`service.name`) IN (SELECT `service.name` FROM __histogram_cte GROUP BY `service.name` ORDER BY avg(value) DESC LIMIT 10) ORDER BY avg(value) OVER (PARTITION BY `service.name`) DESC, `service.name`, ts ASC",
"without_limits": " SELECT * FROM __histogram_cte ORDER BY avg(value) OVER (PARTITION BY `service.name`) DESC, `service.name`, ts ASC",
"with_order_by": " SELECT * FROM __histogram_cte ORDER BY `service.name` asc, ts ASC",
"with_order_by_and_limits": " SELECT * FROM __histogram_cte WHERE (`service.name`) IN (SELECT `service.name` FROM __histogram_cte GROUP BY `service.name` ORDER BY `service.name` asc LIMIT 10) ORDER BY `service.name` asc, ts ASC",
}
// expectedFinalSelects maps "base/variant" to the final SELECT portion after the CTE.
// The full expected query is: base.cte + expectedFinalSelects[name]
expectedFinalSelects := map[string]string{
// cumulative_rate_sum
"cumulative_rate_sum/with_limits": sumMetricsFinalSelects["with_limits"],
"cumulative_rate_sum/without_limits": sumMetricsFinalSelects["without_limits"],
"cumulative_rate_sum/with_order_by": sumMetricsFinalSelects["with_order_by"],
"cumulative_rate_sum/with_order_by_and_limits": sumMetricsFinalSelects["with_order_by_and_limits"],
// cumulative_rate_sum_with_mat_column
"cumulative_rate_sum_with_mat_column/with_limits": sumMetricsFinalSelects["with_limits"],
"cumulative_rate_sum_with_mat_column/without_limits": sumMetricsFinalSelects["without_limits"],
"cumulative_rate_sum_with_mat_column/with_order_by": sumMetricsFinalSelects["with_order_by"],
"cumulative_rate_sum_with_mat_column/with_order_by_and_limits": sumMetricsFinalSelects["with_order_by_and_limits"],
// delta_rate_sum
"delta_rate_sum/with_limits": sumMetricsFinalSelects["with_limits"],
"delta_rate_sum/without_limits": sumMetricsFinalSelects["without_limits"],
"delta_rate_sum/with_order_by": sumMetricsFinalSelects["with_order_by"],
"delta_rate_sum/with_order_by_and_limits": sumMetricsFinalSelects["with_order_by_and_limits"],
// histogram_percentile1
"histogram_percentile1/with_limits": histogramMetricsFinalSelects["with_limits"],
"histogram_percentile1/without_limits": histogramMetricsFinalSelects["without_limits"],
"histogram_percentile1/with_order_by": histogramMetricsFinalSelects["with_order_by"],
"histogram_percentile1/with_order_by_and_limits": histogramMetricsFinalSelects["with_order_by_and_limits"],
// histogram_percentile2
"histogram_percentile2/with_limits": histogramMetricsFinalSelects["with_limits"],
"histogram_percentile2/without_limits": histogramMetricsFinalSelects["without_limits"],
"histogram_percentile2/with_order_by": histogramMetricsFinalSelects["with_order_by"],
"histogram_percentile2/with_order_by_and_limits": histogramMetricsFinalSelects["with_order_by_and_limits"],
// gauge_avg_sum
"gauge_avg_sum/with_limits": " SELECT * FROM __spatial_aggregation_cte WHERE (`host.name`) IN (SELECT `host.name` FROM __spatial_aggregation_cte GROUP BY `host.name` ORDER BY avg(value) DESC LIMIT 10) ORDER BY avg(value) OVER (PARTITION BY `host.name`) DESC, `host.name`, ts ASC",
"gauge_avg_sum/without_limits": " SELECT * FROM __spatial_aggregation_cte ORDER BY avg(value) OVER (PARTITION BY `host.name`) DESC, `host.name`, ts ASC",
"gauge_avg_sum/with_order_by": " SELECT * FROM __spatial_aggregation_cte ORDER BY `host.name` asc, ts ASC",
"gauge_avg_sum/with_order_by_and_limits": " SELECT * FROM __spatial_aggregation_cte WHERE (`host.name`) IN (SELECT `host.name` FROM __spatial_aggregation_cte GROUP BY `host.name` ORDER BY `host.name` asc LIMIT 10) ORDER BY `host.name` asc, ts ASC",
}
fm := NewFieldMapper()
@@ -248,13 +227,15 @@ func TestStatementBuilder(t *testing.T) {
t.Fatalf("failed to load field keys: %v", err)
}
mockMetadataStore.KeysMap = keys
// NOTE: LoadFieldKeysFromJSON doesn't set Materialized field
// for keys, so we have to set it manually here for testing
if _, ok := mockMetadataStore.KeysMap["materialized.key.name"]; ok {
if len(mockMetadataStore.KeysMap["materialized.key.name"]) > 0 {
mockMetadataStore.KeysMap["materialized.key.name"][0].Materialized = true
}
}
fl, err := flagger.New(context.Background(), instrumentationtest.New().ToProviderSettings(), flagger.Config{}, flagger.MustNewRegistry())
flagger, err := flagger.New(context.Background(), instrumentationtest.New().ToProviderSettings(), flagger.Config{}, flagger.MustNewRegistry())
if err != nil {
t.Fatalf("failed to create flagger: %v", err)
}
@@ -264,30 +245,23 @@ func TestStatementBuilder(t *testing.T) {
mockMetadataStore,
fm,
cb,
fl,
flagger,
)
for _, b := range bases {
for _, v := range variants {
name := b.name + "/" + v.name
t.Run(name, func(t *testing.T) {
q := b.query
q.Limit = v.limit
if v.hasOrder {
q.Order = []qbtypes.OrderBy{
{
Key: qbtypes.OrderByKey{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: b.orderKey}},
Direction: qbtypes.OrderDirectionAsc,
},
}
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
result, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, qbtypes.RequestTypeTimeSeries, q, nil)
q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query, nil)
if c.expectedErr != nil {
require.Error(t, err)
require.Contains(t, err.Error(), c.expectedErr.Error())
} else {
require.NoError(t, err)
require.Equal(t, b.cte+expectedFinalSelects[name], result.Query)
require.Equal(t, b.args, result.Args)
})
}
require.Equal(t, c.expected.Query, q.Query)
require.Equal(t, c.expected.Args, q.Args)
require.Equal(t, c.expected.Warnings, q.Warnings)
}
})
}
}

View File

@@ -121,8 +121,18 @@ func (b *traceQueryStatementBuilder) Build(
if !ok {
b.logger.DebugContext(ctx, "failed to get trace time range", "trace_ids", traceIDs)
} else if traceStart > 0 && traceEnd > 0 {
start = uint64(traceStart)
end = uint64(traceEnd)
// we don't need to query if the start and end are non overlapping
if uint64(traceStart) > end || uint64(traceEnd) < start {
return &qbtypes.Statement{Skip: true}, nil
}
// clamp start/end to trace time range to avoid overlap between buckets
if uint64(traceStart) > start {
start = uint64(traceStart)
}
if uint64(traceEnd) < end {
end = uint64(traceEnd)
}
b.logger.DebugContext(ctx, "optimized time range for traces", "trace_ids", traceIDs, "start", start, "end", end)
}
}

View File

@@ -47,6 +47,7 @@ type Statement struct {
Args []any
Warnings []string
WarningsDocURL string
Skip bool // don't run this query
}
// StatementBuilder builds the query.

View File

@@ -58,8 +58,6 @@ def build_builder_query(
step_interval: int = DEFAULT_STEP_INTERVAL,
group_by: Optional[List[str]] = None,
filter_expression: Optional[str] = None,
order_by: Optional[List[Dict]] = None,
limit: Optional[int] = None,
functions: Optional[List[Dict]] = None,
disabled: bool = False,
) -> Dict:
@@ -95,12 +93,6 @@ def build_builder_query(
if filter_expression:
spec["filter"] = {"expression": filter_expression}
if order_by:
spec["order"] = order_by
if limit is not None:
spec["limit"] = limit
if functions:
spec["functions"] = functions

View File

@@ -696,7 +696,6 @@ def test_traces_list_with_corrupt_data(
assert response.status_code == status_code
if response.status_code == HTTPStatus.OK:
if not results(traces):
# No results expected
assert response.json()["data"]["data"]["results"][0]["rows"] is None
@@ -2026,3 +2025,249 @@ def test_traces_fill_zero_formula_with_group_by(
expected_by_ts=expectations[service_name],
context=f"traces/fillZero/F1/{service_name}",
)
def test_traces_list_filter_by_trace_id(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
) -> None:
"""
Tests that filtering by trace_id:
1. Returns the matching span (narrow window, single bucket).
2. Does not return duplicate spans when the query window spans multiple
exponential buckets (>1 h) — regression test for the expand-vs-clamp bug.
3. Returns no results when the query window does not contain the trace.
"""
target_trace_id = TraceIdGenerator.trace_id()
other_trace_id = TraceIdGenerator.trace_id()
span_id_root = TraceIdGenerator.span_id()
other_span_id = TraceIdGenerator.span_id()
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
common_resources = {
"deployment.environment": "production",
"service.name": "trace-filter-service",
"cloud.provider": "integration",
}
insert_traces(
[
Traces(
timestamp=now - timedelta(seconds=10),
duration=timedelta(seconds=5),
trace_id=target_trace_id,
span_id=span_id_root,
parent_span_id="",
name="root-span",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources=common_resources,
attributes={"http.request.method": "GET"},
),
# span from a different trace — must not appear in results
Traces(
timestamp=now - timedelta(seconds=5),
duration=timedelta(seconds=1),
trace_id=other_trace_id,
span_id=other_span_id,
parent_span_id="",
name="other-root-span",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources=common_resources,
attributes={},
),
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
trace_filter = f"trace_id = '{target_trace_id}'"
def _query(start_ms: int, end_ms: int) -> List:
response = make_query_request(
signoz,
token,
start_ms=start_ms,
end_ms=end_ms,
request_type="raw",
queries=[
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "traces",
"limit": 100,
"offset": 0,
"filter": {"expression": trace_filter},
"order": [{"key": {"name": "timestamp"}, "direction": "desc"}],
"selectFields": [
{
"name": "name",
"fieldDataType": "string",
"fieldContext": "span",
"signal": "traces",
}
],
},
}
],
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
return response.json()["data"]["data"]["results"][0]["rows"] or []
now_ms = int(now.timestamp() * 1000)
# --- Test 1: narrow window (single bucket, <1 h) ---
narrow_start_ms = int((now - timedelta(minutes=5)).timestamp() * 1000)
narrow_rows = _query(narrow_start_ms, now_ms)
assert len(narrow_rows) == 1, (
f"Expected 1 span for trace_id filter (narrow window), got {len(narrow_rows)}"
)
assert narrow_rows[0]["data"]["span_id"] == span_id_root
assert narrow_rows[0]["data"]["trace_id"] == target_trace_id
# --- Test 2: wide window (>1 h, triggers multiple exponential buckets) ---
# should just return 1 span, not duplicate
wide_start_ms = int((now - timedelta(hours=12)).timestamp() * 1000)
wide_rows = _query(wide_start_ms, now_ms)
assert len(wide_rows) == 1, (
f"Expected 1 span for trace_id filter (wide window, multi-bucket), "
f"got {len(wide_rows)} — possible duplicate-span regression"
)
assert wide_rows[0]["data"]["span_id"] == span_id_root
assert wide_rows[0]["data"]["trace_id"] == target_trace_id
# --- Test 3: window that does not contain the trace returns no results ---
past_start_ms = int((now - timedelta(hours=6)).timestamp() * 1000)
past_end_ms = int((now - timedelta(hours=2)).timestamp() * 1000)
past_rows = _query(past_start_ms, past_end_ms)
assert len(past_rows) == 0, (
f"Expected 0 spans for trace_id filter outside time window, "
f"got {len(past_rows)}"
)
def test_traces_list_filter_by_trace_id_cross_bucket(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
) -> None:
"""
Regression test for multi-bucket trace_id queries.
Setup: a single trace with two spans placed in different exponential buckets.
makeBuckets over a 3-hour window produces (newest-first):
bucket 1: [now-1h, now] ← span_a lives here (now-30min)
bucket 2: [now-3h, now-1h] ← span_b lives here (now-90min)
Without the fix: both buckets expanded their window to [traceStart, traceEnd]
and therefore each returned both spans → 4 rows total (duplicates).
With the fix: each bucket is tied to its intersection with the trace range,
so bucket 1 returns span_a and bucket 2 returns span_b → exactly 2 rows.
"""
trace_id = TraceIdGenerator.trace_id()
span_id_a = TraceIdGenerator.span_id()
span_id_b = TraceIdGenerator.span_id()
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
common_resources = {
"deployment.environment": "production",
"service.name": "cross-bucket-service",
"cloud.provider": "integration",
}
insert_traces(
[
# span_a: sits in bucket 1 [now-1h, now]
Traces(
timestamp=now - timedelta(minutes=30),
duration=timedelta(seconds=1),
trace_id=trace_id,
span_id=span_id_a,
parent_span_id="",
name="span-a",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources=common_resources,
attributes={},
),
# span_b: sits in bucket 2 [now-3h, now-1h]
Traces(
timestamp=now - timedelta(minutes=90),
duration=timedelta(seconds=1),
trace_id=trace_id,
span_id=span_id_b,
parent_span_id=span_id_a,
name="span-b",
kind=TracesKind.SPAN_KIND_CLIENT,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources=common_resources,
attributes={},
),
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# 3-hour window forces makeBuckets to create two buckets:
# bucket 1: [now-1h, now]
# bucket 2: [now-3h, now-1h]
start_ms = int((now - timedelta(hours=3)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
response = make_query_request(
signoz,
token,
start_ms=start_ms,
end_ms=end_ms,
request_type="raw",
queries=[
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "traces",
"limit": 100,
"offset": 0,
"filter": {"expression": f"trace_id = '{trace_id}'"},
"order": [{"key": {"name": "timestamp"}, "direction": "desc"}],
"selectFields": [
{
"name": "name",
"fieldDataType": "string",
"fieldContext": "span",
"signal": "traces",
}
],
},
}
],
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
rows = response.json()["data"]["data"]["results"][0]["rows"] or []
assert len(rows) == 2, (
f"Expected exactly 2 spans (one per bucket), got {len(rows)}"
f"likely a duplicate-span regression from bucket window expansion"
)
returned_span_ids = {r["data"]["span_id"] for r in rows}
assert returned_span_ids == {span_id_a, span_id_b}
assert all(r["data"]["trace_id"] == trace_id for r in rows)

View File

@@ -2,20 +2,16 @@
Look at the cumulative_counters_1h.jsonl file for the relevant data
"""
import logging
import os
from datetime import datetime, timedelta, timezone
from http import HTTPStatus
from typing import Any, Callable, List, Optional, Union
import pytest
from typing import Any, Callable, List
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.metrics import Metrics
from fixtures.querier import (
build_builder_query,
build_order_by,
get_all_series,
get_series_values,
make_query_request,
@@ -75,200 +71,16 @@ def test_rate_with_steady_values_and_reset(
assert v["value"] >= 0, f"Rate should not be negative: {v['value']}"
def _assert_endpoint_rate_values(endpoint_values: dict) -> None:
# /health: 60 data points (t01-t60), steady +10/min
# rate = 10/60 = 0.167
if "/health" in endpoint_values:
health_values = endpoint_values["/health"]
assert (
len(health_values) >= 58
), f"Expected >= 58 values for /health, got {len(health_values)}"
count_steady_health = sum(1 for v in health_values if v["value"] == 0.167)
assert (
count_steady_health >= 57
), f"Expected >= 57 steady rate values (0.167) for /health, got {count_steady_health}"
# all /health rates should be 0.167 except possibly first/last due to boundaries
for v in health_values[1:-1]:
assert v["value"] == 0.167, f"Expected /health rate 0.167, got {v['value']}"
# /products: 51 data points with 10-minute gap (t20-t29 missing), steady +20/min
# rate = 20/60 = 0.333, gap causes lower averaged rate at boundary
if "/products" in endpoint_values:
products_values = endpoint_values["/products"]
assert (
len(products_values) >= 49
), f"Expected >= 49 values for /products, got {len(products_values)}"
count_steady_products = sum(1 for v in products_values if v["value"] == 0.333)
# most values should be 0.333, some boundary values differ due to 10-min gap
assert (
count_steady_products >= 46
), f"Expected >= 46 steady rate values (0.333) for /products, got {count_steady_products}"
# check that non-0.333 values are due to gap averaging (should be lower)
gap_boundary_values = [v["value"] for v in products_values if v["value"] != 0.333]
for val in gap_boundary_values:
assert (
0 < val < 0.333
), f"Gap boundary values should be between 0 and 0.333, got {val}"
# /checkout: 61 data points (t00-t60), +1/min normal, +50/min spike at t40-t44
# normal rate = 1/60 = 0.0167, spike rate = 50/60 = 0.833
if "/checkout" in endpoint_values:
checkout_values = endpoint_values["/checkout"]
assert (
len(checkout_values) >= 59
), f"Expected >= 59 values for /checkout, got {len(checkout_values)}"
count_steady_checkout = sum(1 for v in checkout_values if v["value"] == 0.0167)
assert (
count_steady_checkout >= 53
), f"Expected >= 53 steady rate values (0.0167) for /checkout, got {count_steady_checkout}"
# check that spike values exist (traffic spike +50/min at t40-t44)
count_spike_checkout = sum(1 for v in checkout_values if v["value"] == 0.833)
assert (
count_spike_checkout >= 4
), f"Expected >= 4 spike rate values (0.833) for /checkout, got {count_spike_checkout}"
# spike values should be consecutive
spike_indices = [
i for i, v in enumerate[Any](checkout_values) if v["value"] == 0.833
]
assert len(spike_indices) >= 4, f"Expected >= 4 spike indices, got {spike_indices}"
for i in range(1, len(spike_indices)):
assert (
spike_indices[i] == spike_indices[i - 1] + 1
), f"Spike indices should be consecutive, got {spike_indices}"
# /orders: 60 data points (t00-t60) with gap at t30, counter reset at t31 (150->2)
# rate = 5/60 = 0.0833
# reset at t31 causes: rate at t30 includes gap (lower), t31 has high rate after reset
if "/orders" in endpoint_values:
orders_values = endpoint_values["/orders"]
assert (
len(orders_values) >= 58
), f"Expected >= 58 values for /orders, got {len(orders_values)}"
count_steady_orders = sum(1 for v in orders_values if v["value"] == 0.0833)
assert (
count_steady_orders >= 55
), f"Expected >= 55 steady rate values (0.0833) for /orders, got {count_steady_orders}"
# check for counter reset effects - there should be some non-standard values
non_standard_orders = [v["value"] for v in orders_values if v["value"] != 0.0833]
assert (
len(non_standard_orders) >= 2
), f"Expected >= 2 non-standard values due to counter reset, got {non_standard_orders}"
# post-reset value should be higher (new counter value / interval)
high_rate_orders = [v for v in non_standard_orders if v > 0.0833]
assert (
len(high_rate_orders) >= 1
), f"Expected at least one high rate value after counter reset, got {non_standard_orders}"
# /users: 56 data points (t05-t60), sparse +1 every 5 minutes
# Rate = 1/60 = 0.0167 during increment, 0 during flat periods
if "/users" in endpoint_values:
users_values = endpoint_values["/users"]
assert (
len(users_values) >= 54
), f"Expected >= 54 values for /users, got {len(users_values)}"
count_zero_users = sum(1 for v in users_values if v["value"] == 0)
# most values should be 0 (flat periods between increments)
assert (
count_zero_users >= 40
), f"Expected >= 40 zero rate values for /users (sparse data), got {count_zero_users}"
# non-zero values should be 0.0167 (1/60 increment rate)
non_zero_users = [v["value"] for v in users_values if v["value"] != 0]
count_increment_rate = sum(1 for v in non_zero_users if v == 0.0167)
assert (
count_increment_rate >= 8
), f"Expected >= 8 increment rate values (0.0167) for /users, got {count_increment_rate}"
@pytest.mark.parametrize(
"metric_suffix,order_by,limit,expected_count,expected_endpoints",
[
(
"no_order",
None, # this is equivalent to sum(metric_name)
None,
5,
["/products", "/health", "/checkout", "/orders", "/users"],
),
(
"only_limit",
None, # this is equivalent to sum(metric_name)
3,
3,
["/products", "/health", "/checkout"],
),
(
"asc",
[build_order_by("endpoint", "asc")],
None,
5,
["/checkout", "/health", "/orders", "/products", "/users"],
),
(
"asc_lim3",
[build_order_by("endpoint", "asc")],
3,
3,
["/checkout", "/health", "/orders"],
),
(
"desc",
[build_order_by("endpoint", "desc")],
None,
5,
["/users", "/products", "/orders", "/health", "/checkout"],
),
(
"desc_lim3",
[build_order_by("endpoint", "desc")],
3,
3,
["/users", "/products", "/orders"],
),
(
"asc_metric_name",
[build_order_by("sum(test_rate_groupby_asc_metric_name)", "asc")],
None,
5,
["/users", "/orders", "/checkout", "/health", "/products"],
),
(
"asc_metric_name_lim3",
[build_order_by("sum(test_rate_groupby_asc_metric_name_lim3)", "asc")],
3,
3,
["/users", "/orders", "/checkout"],
),
(
"desc_metric_name",
[build_order_by("sum(test_rate_groupby_desc_metric_name)", "desc")],
None,
5,
["/products", "/health", "/checkout", "/orders", "/users"],
),
(
"desc_metric_name_lim3",
[build_order_by("sum(test_rate_groupby_desc_metric_name_lim3)", "desc")],
3,
3,
["/products", "/health", "/checkout"],
),
],
)
def test_rate_group_by_endpoint(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_metrics: Callable[[List[Metrics]], None],
metric_suffix: str,
order_by: Optional[List],
limit: Optional[int],
expected_count: int,
expected_endpoints: Union[set, List[str]],
) -> None:
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
start_ms = int((now - timedelta(minutes=65)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
metric_name = f"test_rate_groupby_{metric_suffix}"
metric_name = "test_rate_groupby"
metrics = Metrics.load_from_file(
CUMULATIVE_COUNTERS_FILE,
@@ -285,8 +97,6 @@ def test_rate_group_by_endpoint(
"sum",
temporality="cumulative",
group_by=["endpoint"],
order_by=order_by,
limit=limit,
)
response = make_query_request(signoz, token, start_ms, end_ms, [query])
@@ -295,23 +105,10 @@ def test_rate_group_by_endpoint(
data = response.json()
all_series = get_all_series(data, "A")
# Should have 5 different endpoints
assert (
len(all_series) == expected_count
), f"Expected {expected_count} series, got {len(all_series)}"
endpoint_labels = [
series.get("labels", [{}])[0].get("value", "unknown")
for series in all_series
]
if isinstance(expected_endpoints, set):
assert (
set(endpoint_labels) == expected_endpoints
), f"Expected endpoints {expected_endpoints}, got {set(endpoint_labels)}"
else:
assert endpoint_labels == expected_endpoints, (
f"Expected endpoints {expected_endpoints}, got {endpoint_labels}"
)
len(all_series) == 5
), f"Expected 5 series for 5 endpoints, got {len(all_series)}"
# endpoint -> values
endpoint_values = {}
@@ -320,6 +117,11 @@ def test_rate_group_by_endpoint(
values = sorted(series.get("values", []), key=lambda x: x["timestamp"])
endpoint_values[endpoint] = values
expected_endpoints = {"/products", "/health", "/checkout", "/orders", "/users"}
assert (
set(endpoint_values.keys()) == expected_endpoints
), f"Expected endpoints {expected_endpoints}, got {set(endpoint_values.keys())}"
# at no point rate should be negative
for endpoint, values in endpoint_values.items():
for v in values:
@@ -327,4 +129,103 @@ def test_rate_group_by_endpoint(
v["value"] >= 0
), f"Rate for {endpoint} should not be negative: {v['value']}"
_assert_endpoint_rate_values(endpoint_values)
# /health: 60 data points (t01-t60), steady +10/min
# rate = 10/60 = 0.167
health_values = endpoint_values["/health"]
assert (
len(health_values) >= 58
), f"Expected >= 58 values for /health, got {len(health_values)}"
count_steady_health = sum(1 for v in health_values if v["value"] == 0.167)
assert (
count_steady_health >= 57
), f"Expected >= 57 steady rate values (0.167) for /health, got {count_steady_health}"
# all /health rates should be 0.167 except possibly first/last due to boundaries
for v in health_values[1:-1]:
assert v["value"] == 0.167, f"Expected /health rate 0.167, got {v['value']}"
# /products: 51 data points with 10-minute gap (t20-t29 missing), steady +20/min
# rate = 20/60 = 0.333, gap causes lower averaged rate at boundary
products_values = endpoint_values["/products"]
assert (
len(products_values) >= 49
), f"Expected >= 49 values for /products, got {len(products_values)}"
count_steady_products = sum(1 for v in products_values if v["value"] == 0.333)
# most values should be 0.333, some boundary values differ due to 10-min gap
assert (
count_steady_products >= 46
), f"Expected >= 46 steady rate values (0.333) for /products, got {count_steady_products}"
# check that non-0.333 values are due to gap averaging (should be lower)
gap_boundary_values = [v["value"] for v in products_values if v["value"] != 0.333]
for val in gap_boundary_values:
assert (
0 < val < 0.333
), f"Gap boundary values should be between 0 and 0.333, got {val}"
# /checkout: 61 data points (t00-t60), +1/min normal, +50/min spike at t40-t44
# normal rate = 1/60 = 0.0167, spike rate = 50/60 = 0.833
checkout_values = endpoint_values["/checkout"]
assert (
len(checkout_values) >= 59
), f"Expected >= 59 values for /checkout, got {len(checkout_values)}"
count_steady_checkout = sum(1 for v in checkout_values if v["value"] == 0.0167)
assert (
count_steady_checkout >= 53
), f"Expected >= 53 steady rate values (0.0167) for /checkout, got {count_steady_checkout}"
# check that spike values exist (traffic spike +50/min at t40-t44)
count_spike_checkout = sum(1 for v in checkout_values if v["value"] == 0.833)
assert (
count_spike_checkout >= 4
), f"Expected >= 4 spike rate values (0.833) for /checkout, got {count_spike_checkout}"
# spike values should be consecutive
spike_indices = [
i for i, v in enumerate[Any](checkout_values) if v["value"] == 0.833
]
assert len(spike_indices) >= 4, f"Expected >= 4 spike indices, got {spike_indices}"
# consecutiveness
for i in range(1, len(spike_indices)):
assert (
spike_indices[i] == spike_indices[i - 1] + 1
), f"Spike indices should be consecutive, got {spike_indices}"
# /orders: 60 data points (t00-t60) with gap at t30, counter reset at t31 (150->2)
# rate = 5/60 = 0.0833
# reset at t31 causes: rate at t30 includes gap (lower), t31 has high rate after reset
orders_values = endpoint_values["/orders"]
assert (
len(orders_values) >= 58
), f"Expected >= 58 values for /orders, got {len(orders_values)}"
count_steady_orders = sum(1 for v in orders_values if v["value"] == 0.0833)
assert (
count_steady_orders >= 55
), f"Expected >= 55 steady rate values (0.0833) for /orders, got {count_steady_orders}"
# check for counter reset effects - there should be some non-standard values
non_standard_orders = [v["value"] for v in orders_values if v["value"] != 0.0833]
assert (
len(non_standard_orders) >= 2
), f"Expected >= 2 non-standard values due to counter reset, got {non_standard_orders}"
# post-reset value should be higher (new counter value / interval)
high_rate_orders = [v for v in non_standard_orders if v > 0.0833]
assert (
len(high_rate_orders) >= 1
), f"Expected at least one high rate value after counter reset, got {non_standard_orders}"
# /users: 56 data points (t05-t60), sparse +1 every 5 minutes
# Rate = 1/60 = 0.0167 during increment, 0 during flat periods
users_values = endpoint_values["/users"]
assert (
len(users_values) >= 54
), f"Expected >= 54 values for /users, got {len(users_values)}"
count_zero_users = sum(1 for v in users_values if v["value"] == 0)
# most values should be 0 (flat periods between increments)
assert (
count_zero_users >= 40
), f"Expected >= 40 zero rate values for /users (sparse data), got {count_zero_users}"
# non-zero values should be 0.0167 (1/60 increment rate)
non_zero_users = [v["value"] for v in users_values if v["value"] != 0]
count_increment_rate = sum(1 for v in non_zero_users if v == 0.0167)
assert (
count_increment_rate >= 8
), f"Expected >= 8 increment rate values (0.0167) for /users, got {count_increment_rate}"

View File

@@ -5,7 +5,7 @@ Look at the multi_temporality_counters_1h.jsonl file for the relevant data
import random
from datetime import datetime, timedelta, timezone
from http import HTTPStatus
from typing import Callable, List, Optional, Union
from typing import Callable, List
import pytest
@@ -14,7 +14,6 @@ from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.metrics import Metrics
from fixtures.querier import (
build_builder_query,
build_order_by,
get_all_series,
get_series_values,
make_query_request,
@@ -92,198 +91,6 @@ def test_with_steady_values_and_reset(
), f"{time_aggregation} should not be negative: {v['value']}"
def _assert_endpoint_group_values( # pylint: disable=too-many-arguments
endpoint_values: dict,
stable_health_value: float,
stable_products_value: float,
stable_checkout_value: float,
spike_checkout_value: float,
stable_orders_value: float,
spike_users_value: float,
time_aggregation: str,
) -> None:
# /health: 60 data points (t01-t60), steady +10/min
if "/health" in endpoint_values:
health_values = endpoint_values["/health"]
assert (
len(health_values) >= 58
), f"Expected >= 58 values for /health, got {len(health_values)}"
count_steady_health = sum(
1 for v in health_values if v["value"] == stable_health_value
)
assert (
count_steady_health >= 57
), f"Expected >= 57 steady rate values ({stable_health_value}) for /health, got {count_steady_health}"
# all /health rates should be stable except possibly first/last due to boundaries
for v in health_values[1:-1]:
assert (
v["value"] == stable_health_value
), f"Expected /health rate {stable_health_value}, got {v['value']}"
# /products: 51 data points with 10-minute gap (t20-t29 missing), steady +20/min
if "/products" in endpoint_values:
products_values = endpoint_values["/products"]
assert (
len(products_values) >= 49
), f"Expected >= 49 values for /products, got {len(products_values)}"
count_steady_products = sum(
1 for v in products_values if v["value"] == stable_products_value
)
# most values should be stable, some boundary values differ due to 10-min gap
assert (
count_steady_products >= 46
), f"Expected >= 46 steady rate values ({stable_products_value}) for /products, got {count_steady_products}"
# check that non-stable values are due to gap averaging (should be lower)
gap_boundary_values = [
v["value"] for v in products_values if v["value"] != stable_products_value
]
for val in gap_boundary_values:
assert (
0 < val < stable_products_value
), f"Gap boundary values should be between 0 and {stable_products_value}, got {val}"
# /checkout: 61 data points (t00-t60), +1/min normal, +50/min spike at t40-t44
if "/checkout" in endpoint_values:
checkout_values = endpoint_values["/checkout"]
assert (
len(checkout_values) >= 59
), f"Expected >= 59 values for /checkout, got {len(checkout_values)}"
count_steady_checkout = sum(
1 for v in checkout_values if v["value"] == stable_checkout_value
)
assert (
count_steady_checkout >= 53
), f"Expected >= 53 steady {time_aggregation} values ({stable_checkout_value}) for /checkout, got {count_steady_checkout}"
# check that spike values exist (traffic spike +50/min at t40-t44)
count_spike_checkout = sum(
1 for v in checkout_values if v["value"] == spike_checkout_value
)
assert (
count_spike_checkout >= 4
), f"Expected >= 4 spike {time_aggregation} values ({spike_checkout_value}) for /checkout, got {count_spike_checkout}"
# spike values should be consecutive
spike_indices = [
i for i, v in enumerate(checkout_values) if v["value"] == spike_checkout_value
]
assert len(spike_indices) >= 4, f"Expected >= 4 spike indices, got {spike_indices}"
for i in range(1, len(spike_indices)):
assert (
spike_indices[i] == spike_indices[i - 1] + 1
), f"Spike indices should be consecutive, got {spike_indices}"
# /orders: 60 data points (t00-t60) with gap at t30, counter reset at t31 (150->2)
# reset at t31 causes: rate/increase at t30 includes gap (lower), t31 has high rate after reset
if "/orders" in endpoint_values:
orders_values = endpoint_values["/orders"]
assert (
len(orders_values) >= 58
), f"Expected >= 58 values for /orders, got {len(orders_values)}"
count_steady_orders = sum(
1 for v in orders_values if v["value"] == stable_orders_value
)
assert (
count_steady_orders >= 55
), f"Expected >= 55 steady {time_aggregation} values ({stable_orders_value}) for /orders, got {count_steady_orders}"
# check for counter reset effects - there should be some non-standard values
non_standard_orders = [
v["value"] for v in orders_values if v["value"] != stable_orders_value
]
assert (
len(non_standard_orders) >= 2
), f"Expected >= 2 non-standard values due to counter reset, got {non_standard_orders}"
# post-reset value should be higher (new counter value / interval)
high_rate_orders = [v for v in non_standard_orders if v > stable_orders_value]
assert (
len(high_rate_orders) >= 1
), f"Expected at least one high {time_aggregation} value after counter reset, got {non_standard_orders}"
# /users: 56 data points (t05-t60), sparse +1 every 5 minutes
if "/users" in endpoint_values:
users_values = endpoint_values["/users"]
assert (
len(users_values) >= 54
), f"Expected >= 54 values for /users, got {len(users_values)}"
count_zero_users = sum(1 for v in users_values if v["value"] == 0)
# most values should be 0 (flat periods between increments)
assert (
count_zero_users >= 40
), f"Expected >= 40 zero {time_aggregation} values for /users (sparse data), got {count_zero_users}"
# non-zero values should be stable increment rate
non_zero_users = [v["value"] for v in users_values if v["value"] != 0]
count_increment_rate = sum(1 for v in non_zero_users if v == spike_users_value)
assert (
count_increment_rate >= 8
), f"Expected >= 8 increment {time_aggregation} values ({spike_users_value}) for /users, got {count_increment_rate}"
@pytest.mark.parametrize(
"order_suffix,order_by_spec,limit,expected_count,expected_endpoints",
[
(
"no_order",
None,
None,
5,
["/products", "/health", "/checkout", "/orders", "/users"],
),
(
"asc",
("endpoint", "asc"),
None,
5,
["/checkout", "/health", "/orders", "/products", "/users"],
),
(
"asc_lim3",
("endpoint", "asc"),
3,
3,
["/checkout", "/health", "/orders"],
),
(
"desc",
("endpoint", "desc"),
None,
5,
["/users", "/products", "/orders", "/health", "/checkout"],
),
(
"desc_lim3",
("endpoint", "desc"),
3,
3,
["/users", "/products", "/orders"],
),
(
"asc_metric_name",
("sum_metric", "asc"),
None,
5,
["/users", "/orders", "/checkout", "/health", "/products"],
),
(
"asc_metric_name_lim3",
("sum_metric", "asc"),
3,
3,
["/users", "/orders", "/checkout"],
),
(
"desc_metric_name",
("sum_metric", "desc"),
None,
5,
["/products", "/health", "/checkout", "/orders", "/users"],
),
(
"desc_metric_name_lim3",
("sum_metric", "desc"),
3,
3,
["/products", "/health", "/checkout"],
),
],
)
@pytest.mark.parametrize(
"time_aggregation, stable_health_value, stable_products_value, stable_checkout_value, spike_checkout_value, stable_orders_value, spike_users_value",
[
@@ -303,24 +110,11 @@ def test_group_by_endpoint(
spike_checkout_value: float,
stable_orders_value: float,
spike_users_value: float,
order_suffix: str,
order_by_spec: Optional[tuple],
limit: Optional[int],
expected_count: int,
expected_endpoints: Union[set, List[str]],
) -> None:
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
start_ms = int((now - timedelta(minutes=65)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
metric_name = f"test_{time_aggregation}_groupby_{order_suffix}"
# Build order_by at runtime so metric name reflects actual time_aggregation
order_by = None
if order_by_spec is not None:
key, direction = order_by_spec
if key == "sum_metric":
key = f"sum({metric_name})"
order_by = [build_order_by(key, direction)]
metric_name = f"test_{time_aggregation}_groupby"
metrics = Metrics.load_from_file(
MULTI_TEMPORALITY_FILE,
@@ -336,8 +130,6 @@ def test_group_by_endpoint(
time_aggregation,
"sum",
group_by=["endpoint"],
order_by=order_by,
limit=limit,
)
response = make_query_request(signoz, token, start_ms, end_ms, [query])
@@ -345,23 +137,10 @@ def test_group_by_endpoint(
data = response.json()
all_series = get_all_series(data, "A")
# Should have 5 different endpoints
assert (
len(all_series) == expected_count
), f"Expected {expected_count} series, got {len(all_series)}"
endpoint_labels = [
series.get("labels", [{}])[0].get("value", "unknown")
for series in all_series
]
if isinstance(expected_endpoints, set):
assert (
set(endpoint_labels) == expected_endpoints
), f"Expected endpoints {expected_endpoints}, got {set(endpoint_labels)}"
else:
assert endpoint_labels == expected_endpoints, (
f"Expected endpoints {expected_endpoints}, got {endpoint_labels}"
)
len(all_series) == 5
), f"Expected 5 series for 5 endpoints, got {len(all_series)}"
# endpoint -> values
endpoint_values = {}
@@ -370,6 +149,11 @@ def test_group_by_endpoint(
values = sorted(series.get("values", []), key=lambda x: x["timestamp"])
endpoint_values[endpoint] = values
expected_endpoints = {"/products", "/health", "/checkout", "/orders", "/users"}
assert (
set(endpoint_values.keys()) == expected_endpoints
), f"Expected endpoints {expected_endpoints}, got {set(endpoint_values.keys())}"
# at no point rate should be negative
for endpoint, values in endpoint_values.items():
for v in values:
@@ -377,16 +161,117 @@ def test_group_by_endpoint(
v["value"] >= 0
), f"Rate for {endpoint} should not be negative: {v['value']}"
_assert_endpoint_group_values(
endpoint_values,
stable_health_value,
stable_products_value,
stable_checkout_value,
spike_checkout_value,
stable_orders_value,
spike_users_value,
time_aggregation,
# /health: 60 data points (t01-t60), steady +10/min
health_values = endpoint_values["/health"]
assert (
len(health_values) >= 58
), f"Expected >= 58 values for /health, got {len(health_values)}"
count_steady_health = sum(
1 for v in health_values if v["value"] == stable_health_value
)
assert (
count_steady_health >= 57
), f"Expected >= 57 steady rate values ({stable_health_value}) for /health, got {count_steady_health}"
# all /health rates should be state except possibly first/last due to boundaries
for v in health_values[1:-1]:
assert (
v["value"] == stable_health_value
), f"Expected /health rate {stable_health_value}, got {v['value']}"
# /products: 51 data points with 10-minute gap (t20-t29 missing), steady +20/min
products_values = endpoint_values["/products"]
assert (
len(products_values) >= 49
), f"Expected >= 49 values for /products, got {len(products_values)}"
count_steady_products = sum(
1 for v in products_values if v["value"] == stable_products_value
)
# most values should be stable, some boundary values differ due to 10-min gap
assert (
count_steady_products >= 46
), f"Expected >= 46 steady rate values ({stable_products_value}) for /products, got {count_steady_products}"
# check that non-stable values are due to gap averaging (should be lower)
gap_boundary_values = [
v["value"] for v in products_values if v["value"] != stable_products_value
]
for val in gap_boundary_values:
assert (
0 < val < stable_products_value
), f"Gap boundary values should be between 0 and {stable_products_value}, got {val}"
# /checkout: 61 data points (t00-t60), +1/min normal, +50/min spike at t40-t44
checkout_values = endpoint_values["/checkout"]
assert (
len(checkout_values) >= 59
), f"Expected >= 59 values for /checkout, got {len(checkout_values)}"
count_steady_checkout = sum(
1 for v in checkout_values if v["value"] == stable_checkout_value
)
assert (
count_steady_checkout >= 53
), f"Expected >= 53 steady {time_aggregation} values ({stable_checkout_value}) for /checkout, got {count_steady_checkout}"
# check that spike values exist (traffic spike +50/min at t40-t44)
count_spike_checkout = sum(
1 for v in checkout_values if v["value"] == spike_checkout_value
)
assert (
count_spike_checkout >= 4
), f"Expected >= 4 spike {time_aggregation} values ({spike_checkout_value}) for /checkout, got {count_spike_checkout}"
# spike values should be consecutive
spike_indices = [
i for i, v in enumerate(checkout_values) if v["value"] == spike_checkout_value
]
assert len(spike_indices) >= 4, f"Expected >= 4 spike indices, got {spike_indices}"
# consecutiveness
for i in range(1, len(spike_indices)):
assert (
spike_indices[i] == spike_indices[i - 1] + 1
), f"Spike indices should be consecutive, got {spike_indices}"
# /orders: 60 data points (t00-t60) with gap at t30, counter reset at t31 (150->2)
# reset at t31 causes: rate/increase at t30 includes gap (lower), t31 has high rate after reset
orders_values = endpoint_values["/orders"]
assert (
len(orders_values) >= 58
), f"Expected >= 58 values for /orders, got {len(orders_values)}"
count_steady_orders = sum(
1 for v in orders_values if v["value"] == stable_orders_value
)
assert (
count_steady_orders >= 55
), f"Expected >= 55 steady {time_aggregation} values ({stable_orders_value}) for /orders, got {count_steady_orders}"
# check for counter reset effects - there should be some non-standard values
non_standard_orders = [
v["value"] for v in orders_values if v["value"] != stable_orders_value
]
assert (
len(non_standard_orders) >= 2
), f"Expected >= 2 non-standard values due to counter reset, got {non_standard_orders}"
# post-reset value should be higher (new counter value / interval)
high_rate_orders = [v for v in non_standard_orders if v > stable_orders_value]
assert (
len(high_rate_orders) >= 1
), f"Expected at least one high {time_aggregation} value after counter reset, got {non_standard_orders}"
# /users: 56 data points (t05-t60), sparse +1 every 5 minutes
users_values = endpoint_values["/users"]
assert (
len(users_values) >= 54
), f"Expected >= 54 values for /users, got {len(users_values)}"
count_zero_users = sum(1 for v in users_values if v["value"] == 0)
# most values should be 0 (flat periods between increments)
assert (
count_zero_users >= 40
), f"Expected >= 40 zero {time_aggregation} values for /users (sparse data), got {count_zero_users}"
# non-zero values should be 0.0167 (1/60 increment rate)
non_zero_users = [v["value"] for v in users_values if v["value"] != 0]
count_increment_rate = sum(1 for v in non_zero_users if v == spike_users_value)
assert (
count_increment_rate >= 8
), f"Expected >= 8 increment {time_aggregation} values ({spike_users_value}) for /users, got {count_increment_rate}"
@pytest.mark.parametrize(

View File

@@ -4,7 +4,7 @@ Look at the histogram_data_1h.jsonl file for the relevant data
from datetime import datetime, timedelta, timezone
from http import HTTPStatus
from typing import Callable, List, Optional, Union
from typing import Callable, List
import pytest
@@ -13,16 +13,13 @@ from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.metrics import Metrics
from fixtures.querier import (
build_builder_query,
build_order_by,
get_all_series,
get_series_values,
make_query_request,
)
from fixtures.utils import get_testdata_file_path
FILE = get_testdata_file_path("histogram_data_1h.jsonl")
FILE_WITH_MANY_GROUPS = get_testdata_file_path("histogram_data_1h_many_groups.jsonl")
@pytest.mark.parametrize(
@@ -524,564 +521,4 @@ def test_histogram_percentile_for_delta_service(
assert len(result_values) == 60
assert result_values[0]["value"] == zeroth_value
assert result_values[1]["value"] == first_value
assert result_values[-1]["value"] == last_value
def _assert_series_endpoint_labels(
series: list,
expected_endpoints: Union[set, List[str]],
prefix: str,
) -> None:
labels = [s.get("labels", [{}])[0].get("value", "unknown") for s in series]
if isinstance(expected_endpoints, set):
assert (
set(labels) == expected_endpoints
), f"Expected {prefix} endpoints {expected_endpoints}, got {set(labels)}"
else:
assert labels == expected_endpoints, (
f"Expected {prefix} endpoints in order {expected_endpoints}, got {labels}"
)
@pytest.mark.parametrize(
"order_suffix,order_by,limit,expected_count,expected_endpoints",
[
(
"no_order",
None,
None,
3,
["/checkout", "/health", "/orders"],
),
(
"asc",
[build_order_by("endpoint", "asc")],
None,
3,
["/checkout", "/health", "/orders"],
),
(
"asc_lim2",
[build_order_by("endpoint", "asc")],
2,
2,
["/checkout", "/health"],
),
(
"desc",
[build_order_by("endpoint", "desc")],
None,
3,
["/orders", "/health", "/checkout"],
),
(
"desc_lim2",
[build_order_by("endpoint", "desc")],
2,
2,
["/orders", "/health"],
),
(
"asc_metric_name",
[build_order_by("count(test_histogram_count_groupby_asc_metric_name)", "asc")],
None,
3,
["/health", "/orders", "/checkout"], ## health and orders have the same size so they are then sorted endpoint as a tiebreaker
),
(
"asc_metric_name_lim2",
[build_order_by("count(test_histogram_count_groupby_asc_metric_name_lim2)", "asc")],
2,
2,
["/health", "/orders"],
),
(
"desc_metric_name",
[build_order_by("count(test_histogram_count_groupby_desc_metric_name)", "desc")],
None,
3,
["/checkout", "/health", "/orders"], ## health and orders have the same size so they are then sorted endpoint as a tiebreaker
),
(
"desc_metric_name_lim2",
[build_order_by("count(test_histogram_count_groupby_desc_metric_name_lim2)", "desc")],
2,
2,
["/checkout", "/health"],
),
],
)
def test_histogram_count_group_by_endpoint(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_metrics: Callable[[List[Metrics]], None],
order_suffix: str,
order_by: Optional[List],
limit: Optional[int],
expected_count: int,
expected_endpoints: Union[set, List[str]],
) -> None:
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
start_ms = int((now - timedelta(minutes=65)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
metric_name = f"test_histogram_count_groupby_{order_suffix}"
metrics = Metrics.load_from_file(
FILE,
base_time=now - timedelta(minutes=60),
metric_name_override=metric_name,
)
insert_metrics(metrics)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
query_count = build_builder_query(
"A",
metric_name,
"increase",
"count",
comparisonSpaceAggregationParam={"threshold": 1000, "operator": "<="},
group_by=["endpoint"],
order_by=order_by,
limit=limit,
)
response = make_query_request(signoz, token, start_ms, end_ms, [query_count])
assert response.status_code == HTTPStatus.OK
data = response.json()
count_all_series = get_all_series(data, "A")
assert (
len(count_all_series) == expected_count
), f"Expected {expected_count} series, got {len(count_all_series)}"
_assert_series_endpoint_labels(count_all_series, expected_endpoints, "count")
count_values = {}
for series in count_all_series:
endpoint = series.get("labels", [{}])[0].get("value", "unknown")
count_values[endpoint] = sorted(
series.get("values", []), key=lambda x: x["timestamp"]
)
for endpoint, values in count_values.items():
for v in values:
assert v["value"] >= 0, f"Count for {endpoint} should not be negative: {v['value']}"
# /health (cumulative, service=api): 59 points, increase starts at 11/min → 69/min
if "/health" in count_values:
vals = count_values["/health"]
assert vals[0]["value"] == 11, f"Expected /health count first=11, got {vals[0]['value']}"
assert vals[-1]["value"] == 69, f"Expected /health count last=69, got {vals[-1]['value']}"
# /orders (cumulative, service=api): same distribution as /health
if "/orders" in count_values:
vals = count_values["/orders"]
assert vals[0]["value"] == 11, f"Expected /orders count first=11, got {vals[0]['value']}"
assert vals[-1]["value"] == 69, f"Expected /orders count last=69, got {vals[-1]['value']}"
# /checkout (delta, service=web): 60 points, zeroth=12345 (raw delta), then 11/min → 69/min
if "/checkout" in count_values:
vals = count_values["/checkout"]
assert vals[0]["value"] == 12345, f"Expected /checkout count zeroth=12345, got {vals[0]['value']}"
assert vals[1]["value"] == 11, f"Expected /checkout count first=11, got {vals[1]['value']}"
assert vals[-1]["value"] == 69, f"Expected /checkout count last=69, got {vals[-1]['value']}"
@pytest.mark.parametrize(
"order_suffix,order_by,limit,expected_count,expected_endpoints",
[
(
"no_order",
None,
None,
4,
[ "/checkout", "/health", "/orders", "/coupon"],
),
(
"only_limit",
None,
1,
1,
[ "/checkout"], ##health and checkout have the same size so they are then sorted endpoint as a tiebreaker, and only checkout makes the limit
),
(
"asc",
[build_order_by("endpoint", "asc")],
None,
4,
[ "/checkout", "/coupon", "/health", "/orders"],
),
(
"asc_lim2",
[build_order_by("endpoint", "asc")],
2,
2,
["/checkout", "/coupon"],
),
(
"desc",
[build_order_by("endpoint", "desc")],
None,
4,
["/orders", "/health", "/coupon", "/checkout"],
),
(
"desc_lim2",
[build_order_by("endpoint", "desc")],
2,
2,
["/orders", "/health"],
),
(
"asc_metric_name",
[build_order_by("p75(test_histogram_p75_groupby_asc_metric_name)", "asc")],
None,
4,
["/coupon", "/orders", "/checkout", "/health"], ## health and checkout have the same size so they are then sorted endpoint as a tiebreaker
),
(
"asc_metric_name_lim2",
[build_order_by("p75(test_histogram_p75_groupby_asc_metric_name_lim2)", "asc")],
2,
2,
["/coupon", "/orders"],
),
(
"asc_metric_name_lim3",
[build_order_by("p75(test_histogram_p75_groupby_asc_metric_name_lim3)", "asc")],
3,
3,
["/coupon", "/orders", "/checkout"], ##health and checkout have the same size so they are then sorted endpoint as a tiebreaker, and only checkout makes the limit
),
(
"desc_metric_name",
[build_order_by("p75(test_histogram_p75_groupby_desc_metric_name)", "desc")],
None,
4,
[ "/checkout", "/health", "/orders", "/coupon"],
),
(
"desc_metric_name_lim2",
[build_order_by("p75(test_histogram_p75_groupby_desc_metric_name_lim2)", "desc")],
2,
2,
[ "/checkout", "/health"],
),
(
"desc_metric_name_lim2",
[build_order_by("p75(test_histogram_p75_groupby_desc_metric_name_lim2)", "desc")],
1,
1,
[ "/checkout"], ##health and checkout have the same size so they are then sorted endpoint as a tiebreaker, and only checkout makes the limit
),
],
)
def test_histogram_percentile_group_by_endpoint(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_metrics: Callable[[List[Metrics]], None],
order_suffix: str,
order_by: Optional[List],
limit: Optional[int],
expected_count: int,
expected_endpoints: Union[set, List[str]],
) -> None:
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
start_ms = int((now - timedelta(minutes=65)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
metric_name = f"test_histogram_p75_groupby_{order_suffix}"
metrics = Metrics.load_from_file(
FILE_WITH_MANY_GROUPS,
base_time=now - timedelta(minutes=60),
metric_name_override=metric_name,
)
insert_metrics(metrics)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
query_p75 = build_builder_query(
"A",
metric_name,
"doesnotreallymatter",
"p75",
group_by=["endpoint"],
order_by=order_by,
limit=limit,
)
response = make_query_request(signoz, token, start_ms, end_ms, [query_p75])
assert response.status_code == HTTPStatus.OK
data = response.json()
p75_series = get_all_series(data, "A")
for series in p75_series:
endpoint = series.get("labels", [{}])[0].get("value", "unknown")
assert (
len(p75_series) == expected_count
), f"Expected {expected_count} p75 series, got {len(p75_series)}"
_assert_series_endpoint_labels(p75_series, expected_endpoints, "p75")
p75_values = {}
for series in p75_series:
endpoint = series.get("labels", [{}])[0].get("value", "unknown")
p75_values[endpoint] = sorted(
series.get("values", []), key=lambda x: x["timestamp"]
)
for endpoint, values in p75_values.items():
for v in values:
assert v["value"] >= 0, f"p75 for {endpoint} should not be negative: {v['value']}"
# /health (cumulative, service=api)
if "/health" in p75_values:
vals = p75_values["/health"]
assert vals[0]["value"] == 6000, f"Expected /health p75 first=6000, got {vals[0]['value']}"
assert vals[-1]["value"] == 6000, f"Expected /health p75 last=6000, got {vals[-1]['value']}"
# /orders (cumulative, service=api): same distribution as /health
if "/orders" in p75_values:
vals = p75_values["/orders"]
assert vals[0]["value"] == 4500, f"Expected /orders p75 first=4500, got {vals[0]['value']}"
assert vals[-1]["value"] == 4500, f"Expected /orders p75 last=4500, got {vals[-1]['value']}"
# /checkout (delta, service=web): 60 points
if "/checkout" in p75_values:
vals = p75_values["/checkout"]
assert vals[0]["value"] == 6000, f"Expected /checkout p75 zeroth=6000, got {vals[0]['value']}"
assert vals[1]["value"] == 6000, f"Expected /checkout p75 first=6000, got {vals[1]['value']}"
assert vals[-1]["value"] == 6000, f"Expected /checkout p75 last=6000, got {vals[-1]['value']}"
# /coupon (delta, service=web): 60 points
if "/coupon" in p75_values:
vals = p75_values["/coupon"]
assert vals[0]["value"] == 1125, f"Expected /coupon p75 zeroth=1125, got {vals[0]['value']}"
assert vals[1]["value"] == 1125, f"Expected /coupon p75 first=1125, got {vals[1]['value']}"
assert vals[-1]["value"] == 1125, f"Expected /coupon p75 last=1125, got {vals[-1]['value']}"
@pytest.mark.parametrize(
"order_suffix,order_by,limit,expected_count,expected_endpoints, expected_status_codes",
[
(
"no_order",
None,
None,
5,
[ "/checkout", "/health", "/orders", "/coupon", "/coupon"], ## coupon has 200 and 500 status codes so it will appear twice
[ "200", "200", "200", "200", "500"],
),
(
"only_limit",
None,
1,
1,
[ "/checkout"], ##health and checkout have the same size so they are then sorted endpoint as a tiebreaker, and only checkout makes the limit
[ "200"]
),
(
"asc_endpoint",
[build_order_by("endpoint", "asc")],
None,
5,
[ "/checkout", "/coupon", "/coupon", "/health", "/orders"],
[ "200", "200", "500", "200", "200"],
),
(
"asc_endpoint_status_code",
[build_order_by("endpoint", "asc"), build_order_by("status_code", "asc")],
None,
5,
[ "/checkout", "/coupon", "/coupon", "/health", "/orders"],
[ "200", "200", "500", "200", "200"],
),
(
"asc_status_code_endpoint",
[build_order_by("status_code", "asc"), build_order_by("endpoint", "asc")],
None,
5,
[ "/checkout", "/coupon", "/health", "/orders", "/coupon"],
[ "200", "200", "200", "200", "500"],
),
(
"asc_endpoint_limit_2",
[build_order_by("endpoint", "asc")],
2,
2,
[ "/checkout", "/coupon"],
[ "200", "200"],
),
(
"asc_endpoint_status_code_limit_2",
[build_order_by("endpoint", "asc"), build_order_by("status_code", "asc")],
2,
2,
[ "/checkout", "/coupon"],
[ "200", "200"],
),
(
"asc_status_code_endpoint_limit_4",
[build_order_by("status_code", "asc"), build_order_by("endpoint", "asc")],
4,
4,
[ "/checkout", "/coupon", "/health", "/orders"],
[ "200", "200", "200", "200"],
),
(
"desc_endpoint",
[build_order_by("endpoint", "desc")],
None,
5,
["/orders", "/health", "/coupon", "/coupon", "/checkout"],
[ "200", "200", "200", "500", "200"],
),
(
"desc_endpoint_status_code",
[build_order_by("endpoint", "desc"), build_order_by("status_code", "desc")],
None,
5,
["/orders", "/health", "/coupon", "/coupon", "/checkout"],
[ "200", "200", "500", "200", "200"],
),
(
"desc_status_code_endpoint",
[build_order_by("status_code", "desc"), build_order_by("endpoint", "desc")],
None,
5,
["/coupon", "/orders", "/health", "/coupon", "/checkout"],
[ "500", "200", "200", "200", "200"],
),
(
"desc_endpoint_limit2",
[build_order_by("endpoint", "desc")],
3,
3,
["/orders", "/health", "/coupon"],
[ "200", "200", "200"],
),
(
"desc_endpoint_status_code_limit3",
[build_order_by("endpoint", "desc"), build_order_by("status_code", "desc")],
3,
3,
["/orders", "/health", "/coupon"],
[ "200", "200", "500"],
),
(
"desc_status_code_endpoint_limit2",
[build_order_by("status_code", "desc"), build_order_by("endpoint", "desc")],
2,
2,
["/coupon", "/orders"],
[ "500", "200"],
),
],
)
def test_histogram_percentile_group_by_endpoint_and_status_code(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_metrics: Callable[[List[Metrics]], None],
order_suffix: str,
order_by: Optional[List],
limit: Optional[int],
expected_count: int,
expected_endpoints: Union[set, List[str]],
expected_status_codes: Union[set, List[str]],
) -> None:
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
start_ms = int((now - timedelta(minutes=65)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
metric_name = f"test_histogram_p75_groupby_{order_suffix}"
metrics = Metrics.load_from_file(
FILE_WITH_MANY_GROUPS,
base_time=now - timedelta(minutes=60),
metric_name_override=metric_name,
)
insert_metrics(metrics)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
query_p75 = build_builder_query(
"A",
metric_name,
"doesnotreallymatter",
"p75",
group_by=["endpoint", "status_code"],
order_by=order_by,
limit=limit,
)
response = make_query_request(signoz, token, start_ms, end_ms, [query_p75])
assert response.status_code == HTTPStatus.OK
data = response.json()
p75_series = get_all_series(data, "A")
for series in p75_series:
endpoint = series.get("labels", [{}])[0].get("value", "unknown")
status_code = series.get("labels", [{}])[1].get("value", "unknown")
assert (
len(p75_series) == expected_count
), f"Expected {expected_count} p75 series, got {len(p75_series)}"
_assert_series_endpoint_labels(p75_series, expected_endpoints, "p75")
endpoints = [s.get("labels", [{}])[0].get("value", "unknown") for s in p75_series]
assert endpoints == expected_endpoints, (
f"Expected p75 endpoints in order {expected_endpoints}, got {endpoints}"
)
status_codes = [s.get("labels", [{}])[1].get("value", "unknown") for s in p75_series]
assert status_codes == expected_status_codes, (
f"Expected p75 endpoints in order {expected_status_codes}, got {status_codes}"
)
p75_values = {}
for series in p75_series:
endpoint = series.get("labels", [{}])[0].get("value", "unknown")
status_code = series.get("labels", [{}])[1].get("value", "unknown")
p75_values[endpoint+status_code] = sorted(
series.get("values", []), key=lambda x: x["timestamp"]
)
for endpoint, values in p75_values.items():
for v in values:
assert v["value"] >= 0, f"p75 for {endpoint} should not be negative: {v['value']}"
# /health (cumulative, service=api)
if "/health200" in p75_values:
vals = p75_values["/health200"]
assert vals[0]["value"] == 6000, f"Expected /health p75 first=6000, got {vals[0]['value']}"
assert vals[-1]["value"] == 6000, f"Expected /health p75 last=6000, got {vals[-1]['value']}"
# /orders (cumulative, service=api): same distribution as /health
if "/orders200" in p75_values:
vals = p75_values["/orders200"]
assert vals[0]["value"] == 4500, f"Expected /orders p75 first=4500, got {vals[0]['value']}"
assert vals[-1]["value"] == 4500, f"Expected /orders p75 last=4500, got {vals[-1]['value']}"
# /checkout (delta, service=web): 60 points
if "/checkout200" in p75_values:
vals = p75_values["/checkout200"]
assert vals[0]["value"] == 6000, f"Expected /checkout p75 zeroth=6000, got {vals[0]['value']}"
assert vals[1]["value"] == 6000, f"Expected /checkout p75 first=6000, got {vals[1]['value']}"
assert vals[-1]["value"] == 6000, f"Expected /checkout p75 last=6000, got {vals[-1]['value']}"
# /coupon (delta, service=web): 60 points
if "/coupon200" in p75_values:
vals = p75_values["/coupon200"]
assert vals[0]["value"] == 1250, f"Expected /coupon200 p75 zeroth=1250, got {vals[0]['value']}"
assert vals[1]["value"] == 1250, f"Expected /coupon200 p75 first=1250, got {vals[1]['value']}"
assert vals[-1]["value"] == 1250, f"Expected /coupon200 p75 last=1250, got {vals[-1]['value']}"
if "/coupon500" in p75_values:
vals = p75_values["/coupon500"]
assert vals[0]["value"] == 750, f"Expected /coupon500 p75 zeroth=750, got {vals[0]['value']}"
assert vals[1]["value"] == 750, f"Expected /coupon500 p75 first=750, got {vals[1]['value']}"
assert vals[-1]["value"] == 750, f"Expected /coupon500 p75 last=750, got {vals[-1]['value']}"
assert result_values[-1]["value"] == last_value

View File

@@ -1,6 +1,6 @@
from datetime import datetime, timedelta, timezone
from http import HTTPStatus
from typing import Callable, List, Optional, Union
from typing import Callable, List
import pytest
@@ -9,8 +9,6 @@ from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.metrics import Metrics
from fixtures.querier import (
build_builder_query,
build_order_by,
get_all_series,
get_series_values,
make_query_request,
)
@@ -141,137 +139,3 @@ def test_for_multiple_aggregations(
assert result_values[19]["value"] == twentieth_min_val
assert result_values[20]["value"] == twenty_first_min_val
assert result_values[30]["value"] == thirty_first_min_val
@pytest.mark.parametrize(
"metric_suffix,order_by,limit,expected_count,expected_services",
[
(
"no_order",
None, # default ordering: desc by avg of all metric values for a group
None,
3,
["lab", "web", "api"], # sum of all values: lab=42000, api=36000, web=34000. avg of all sums: lab=700, api=600, web=680
),
(
"only_limit",
None,
2,
2,
["lab", "web"], # top 2 by default desc: lab=42000, api=36000
),
(
"asc",
[build_order_by("service", "asc")],
None,
3,
["api", "lab", "web"],
),
(
"asc_lim2",
[build_order_by("service", "asc")],
2,
2,
["api", "lab"],
),
(
"desc",
[build_order_by("service", "desc")],
None,
3,
["web", "lab", "api"],
),
(
"desc_lim2",
[build_order_by("service", "desc")],
2,
2,
["web", "lab"],
),
(
"asc_metric_name",
[build_order_by("sum(test_gauge_groupby_asc_metric_name)", "asc")],
None,
3,
["api", "web", "lab"],
),
(
"asc_metric_name_lim2",
[build_order_by("sum(test_gauge_groupby_asc_metric_name_lim2)", "asc")],
2,
2,
["api", "web"],
),
(
"desc_metric_name",
[build_order_by("sum(test_gauge_groupby_desc_metric_name)", "desc")],
None,
3,
["lab", "web", "api"],
),
(
"desc_metric_name_lim2",
[build_order_by("sum(test_gauge_groupby_desc_metric_name_lim2)", "desc")],
2,
2,
["lab", "web"],
),
],
)
def test_gauge_group_by_service(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_metrics: Callable[[List[Metrics]], None],
metric_suffix: str,
order_by: Optional[List],
limit: Optional[int],
expected_count: int,
expected_services: Union[set, List[str]],
) -> None:
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
start_ms = int((now - timedelta(minutes=65)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
metric_name = f"test_gauge_groupby_{metric_suffix}"
metrics = Metrics.load_from_file(
FILE,
base_time=now - timedelta(minutes=60),
metric_name_override=metric_name,
)
insert_metrics(metrics)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
query = build_builder_query(
"A",
metric_name,
"max",
"sum",
group_by=["service"],
order_by=order_by,
limit=limit,
)
response = make_query_request(signoz, token, start_ms, end_ms, [query])
assert response.status_code == HTTPStatus.OK
data = response.json()
all_series = get_all_series(data, "A")
assert (
len(all_series) == expected_count
), f"Expected {expected_count} series, got {len(all_series)}"
service_labels = [
series.get("labels", [{}])[0].get("value", "unknown")
for series in all_series
]
if isinstance(expected_services, set):
assert (
set(service_labels) == expected_services
), f"Expected services {expected_services}, got {set(service_labels)}"
else:
assert service_labels == expected_services, (
f"Expected services {expected_services}, got {service_labels}"
)

View File

@@ -5,16 +5,13 @@ Look at the delta_counters_1h.jsonl file for the relevant data
import os
from datetime import datetime, timedelta, timezone
from http import HTTPStatus
from typing import Any, Callable, List, Optional, Union
import pytest
from typing import Any, Callable, List
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.metrics import Metrics
from fixtures.querier import (
build_builder_query,
build_order_by,
get_all_series,
get_series_values,
make_query_request,
@@ -72,61 +69,16 @@ def test_rate_with_steady_values_and_reset(
assert v["value"] >= 0, f"Rate should not be negative: {v['value']}"
@pytest.mark.parametrize(
"order_suffix,order_by,limit,expected_count,expected_endpoints",
[
(
"no_order",
None,
None,
5,
{"/products", "/health", "/checkout", "/orders", "/users"},
),
(
"asc",
[build_order_by("endpoint", "asc")],
None,
5,
["/checkout", "/health", "/orders", "/products", "/users"],
),
(
"asc_lim3",
[build_order_by("endpoint", "asc")],
3,
3,
["/checkout", "/health", "/orders"],
),
(
"desc",
[build_order_by("endpoint", "desc")],
None,
5,
["/users", "/products", "/orders", "/health", "/checkout"],
),
(
"desc_lim3",
[build_order_by("endpoint", "desc")],
3,
3,
["/users", "/products", "/orders"],
),
],
)
def test_rate_group_by_endpoint(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_metrics: Callable[[List[Metrics]], None],
order_suffix: str,
order_by: Optional[List],
limit: Optional[int],
expected_count: int,
expected_endpoints: Union[set, List[str]],
) -> None:
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
start_ms = int((now - timedelta(minutes=65)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
metric_name = f"test_rate_groupby_{order_suffix}"
metric_name = "test_rate_groupby"
metrics = Metrics.load_from_file(
DELTA_COUNTERS_FILE,
@@ -142,8 +94,6 @@ def test_rate_group_by_endpoint(
"rate",
"sum",
group_by=["endpoint"],
order_by=order_by,
limit=limit,
)
response = make_query_request(signoz, token, start_ms, end_ms, [query])
@@ -152,23 +102,10 @@ def test_rate_group_by_endpoint(
data = response.json()
all_series = get_all_series(data, "A")
# Should have 5 different endpoints
assert (
len(all_series) == expected_count
), f"Expected {expected_count} series, got {len(all_series)}"
endpoint_labels = [
series.get("labels", [{}])[0].get("value", "unknown")
for series in all_series
]
if isinstance(expected_endpoints, set):
assert (
set(endpoint_labels) == expected_endpoints
), f"Expected endpoints {expected_endpoints}, got {set(endpoint_labels)}"
else:
assert endpoint_labels == expected_endpoints, (
f"Expected endpoints {expected_endpoints}, got {endpoint_labels}"
)
len(all_series) == 5
), f"Expected 5 series for 5 endpoints, got {len(all_series)}"
# endpoint -> values
endpoint_values = {}
@@ -177,6 +114,11 @@ def test_rate_group_by_endpoint(
values = sorted(series.get("values", []), key=lambda x: x["timestamp"])
endpoint_values[endpoint] = values
expected_endpoints = {"/products", "/health", "/checkout", "/orders", "/users"}
assert (
set(endpoint_values.keys()) == expected_endpoints
), f"Expected endpoints {expected_endpoints}, got {set(endpoint_values.keys())}"
# at no point rate should be negative
for endpoint, values in endpoint_values.items():
for v in values:
@@ -186,95 +128,93 @@ def test_rate_group_by_endpoint(
# /health: 60 data points (t01-t60), steady +10/min
# rate = 10/60 = 0.167
if "/health" in endpoint_values:
health_values = endpoint_values["/health"]
assert (
len(health_values) == 60
), f"Expected 60 values for /health, got {len(health_values)}"
count_steady_health = sum(1 for v in health_values if v["value"] == 0.167)
assert (
count_steady_health == 60
), f"Expected == 60 steady rate values (0.167) for /health, got {count_steady_health}"
# all /health rates should be 0.167 except possibly first/last due to boundaries
for v in health_values[1:-1]:
assert v["value"] == 0.167, f"Expected /health rate 0.167, got {v['value']}"
health_values = endpoint_values["/health"]
assert (
len(health_values) == 60
), f"Expected 60 values for /health, got {len(health_values)}"
count_steady_health = sum(1 for v in health_values if v["value"] == 0.167)
assert (
count_steady_health == 60
), f"Expected == 60 steady rate values (0.167) for /health, got {count_steady_health}"
# all /health rates should be 0.167 except possibly first/last due to boundaries
for v in health_values[1:-1]:
assert v["value"] == 0.167, f"Expected /health rate 0.167, got {v['value']}"
# /products: 51 data points with 10-minute gap (t20-t29 missing), steady +20/min
# rate = 20/60 = 0.333, gap causes lower averaged rate at boundary
if "/products" in endpoint_values:
products_values = endpoint_values["/products"]
assert (
len(products_values) == 51
), f"Expected 51 values for /products, got {len(products_values)}"
count_steady_products = sum(1 for v in products_values if v["value"] == 0.333)
assert (
count_steady_products == 51
), f"Expected 51 steady rate values (0.333) for /products, got {count_steady_products}"
products_values = endpoint_values["/products"]
assert (
len(products_values) == 51
), f"Expected 51 values for /products, got {len(products_values)}"
count_steady_products = sum(1 for v in products_values if v["value"] == 0.333)
assert (
count_steady_products == 51
), f"Expected 51 steady rate values (0.333) for /products, got {count_steady_products}"
# /checkout: 61 data points (t00-t60), +1/min normal, +50/min spike at t40-t44
# normal rate = 1/60 = 0.0167, spike rate = 50/60 = 0.833
if "/checkout" in endpoint_values:
checkout_values = endpoint_values["/checkout"]
checkout_values = endpoint_values["/checkout"]
assert (
len(checkout_values) == 61
), f"Expected 61 values for /checkout, got {len(checkout_values)}"
count_steady_checkout = sum(1 for v in checkout_values if v["value"] == 0.0167)
assert (
count_steady_checkout == 56
), f"Expected 56 steady rate values (0.0167) for /checkout, got {count_steady_checkout}"
# check that spike values exist (traffic spike +50/min at t40-t44)
count_spike_checkout = sum(1 for v in checkout_values if v["value"] == 0.833)
assert (
count_spike_checkout == 5
), f"Expected 5 spike rate values (0.833) for /checkout, got {count_spike_checkout}"
# spike values should be consecutive
spike_indices = [
i for i, v in enumerate[Any](checkout_values) if v["value"] == 0.833
]
assert len(spike_indices) == 5, f"Expected 5 spike indices, got {spike_indices}"
# consecutiveness
for i in range(1, len(spike_indices)):
assert (
len(checkout_values) == 61
), f"Expected 61 values for /checkout, got {len(checkout_values)}"
count_steady_checkout = sum(1 for v in checkout_values if v["value"] == 0.0167)
assert (
count_steady_checkout == 56
), f"Expected 56 steady rate values (0.0167) for /checkout, got {count_steady_checkout}"
# check that spike values exist (traffic spike +50/min at t40-t44)
count_spike_checkout = sum(1 for v in checkout_values if v["value"] == 0.833)
assert (
count_spike_checkout == 5
), f"Expected 5 spike rate values (0.833) for /checkout, got {count_spike_checkout}"
# spike values should be consecutive
spike_indices = [
i for i, v in enumerate[Any](checkout_values) if v["value"] == 0.833
]
assert len(spike_indices) == 5, f"Expected 5 spike indices, got {spike_indices}"
for i in range(1, len(spike_indices)):
assert (
spike_indices[i] == spike_indices[i - 1] + 1
), f"Spike indices should be consecutive, got {spike_indices}"
spike_indices[i] == spike_indices[i - 1] + 1
), f"Spike indices should be consecutive, got {spike_indices}"
# /orders: 60 data points (t00-t60) with gap at t30, counter reset at t31 (150->2)
# rate = 5/60 = 0.0833
# reset at t31 causes: rate at t30 includes gap (lower), t31 has high rate after reset
if "/orders" in endpoint_values:
orders_values = endpoint_values["/orders"]
assert (
len(orders_values) == 60
), f"Expected 60 values for /orders, got {len(orders_values)}"
count_steady_orders = sum(1 for v in orders_values if v["value"] == 0.0833)
assert (
count_steady_orders == 58
), f"Expected 58 steady rate values (0.0833) for /orders, got {count_steady_orders}"
# check for counter reset effects - there should be some non-standard values
non_standard_orders = [v["value"] for v in orders_values if v["value"] != 0.0833]
assert (
len(non_standard_orders) == 2
), f"Expected 2 non-standard values due to counter reset, got {non_standard_orders}"
# post-reset value should be higher (new counter value / interval)
high_rate_orders = [v for v in non_standard_orders if v > 0.0833]
assert (
len(high_rate_orders) == 1
), f"Expected one high rate value after counter reset, got {non_standard_orders}"
orders_values = endpoint_values["/orders"]
assert (
len(orders_values) == 60
), f"Expected 59 values for /orders, got {len(orders_values)}"
count_steady_orders = sum(1 for v in orders_values if v["value"] == 0.0833)
assert (
count_steady_orders == 58
), f"Expected 58 steady rate values (0.0833) for /orders, got {count_steady_orders}"
# check for counter reset effects - there should be some non-standard values
non_standard_orders = [v["value"] for v in orders_values if v["value"] != 0.0833]
assert (
len(non_standard_orders) == 2
), f"Expected 2 non-standard values due to counter reset, got {non_standard_orders}"
# post-reset value should be higher (new counter value / interval)
high_rate_orders = [v for v in non_standard_orders if v > 0.0833]
assert (
len(high_rate_orders) == 1
), f"Expected one high rate value after counter reset, got {non_standard_orders}"
# /users: 56 data points (t05-t60), sparse +1 every 5 minutes (12 of them)
# Rate = 1/60 = 0.0167 during increment, 0 during flat periods
if "/users" in endpoint_values:
users_values = endpoint_values["/users"]
assert (
len(users_values) == 56
), f"Expected 56 values for /users, got {len(users_values)}"
count_zero_users = sum(1 for v in users_values if v["value"] == 0)
# most values should be 0 (flat periods between increments)
assert (
count_zero_users == 44
), f"Expected 44 zero rate values for /users (sparse data), got {count_zero_users}"
# non-zero values should be 0.0167 (1/60 increment rate)
non_zero_users = [v["value"] for v in users_values if v["value"] != 0]
count_increment_rate = sum(1 for v in non_zero_users if v == 0.0167)
assert (
count_increment_rate == 12
), f"Expected 12 increment rate values (0.0167) for /users, got {count_increment_rate}"
users_values = endpoint_values["/users"]
assert (
len(users_values) == 56
), f"Expected 56 values for /users, got {len(users_values)}"
count_zero_users = sum(1 for v in users_values if v["value"] == 0)
# most values should be 0 (flat periods between increments)
assert (
count_zero_users == 44
), f"Expected 44 zero rate values for /users (sparse data), got {count_zero_users}"
# non-zero values should be 0.0167 (1/60 increment rate)
non_zero_users = [v["value"] for v in users_values if v["value"] != 0]
count_increment_rate = sum(1 for v in non_zero_users if v == 0.0167)
assert (
count_increment_rate == 12
), f"Expected 12 increment rate values (0.0167) for /users, got {count_increment_rate}"

File diff suppressed because it is too large Load Diff