Compare commits

...

2 Commits

Author SHA1 Message Date
Tushar Vats
e5a8877018 fix: integration tests 2026-06-29 15:18:57 +05:30
Tushar Vats
1f6b818950 fix: gracefully limit group by queries 2026-06-29 13:31:42 +05:30
8 changed files with 524 additions and 0 deletions

View File

@@ -0,0 +1,147 @@
package querier
import (
"fmt"
"slices"
"strings"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
)
var implicitLimitWarn = "Query %s returned too many results; output has been limited to %d. Add an explicit limit to the query to override this."
// enforceImplicitLimit trims each query that QueryRangeRequest.Normalize capped down to its
// implicit limit, honoring its order, and returns a warning per trimmed query.
func enforceImplicitLimit(results map[string]any, requestType qbtypes.RequestType, queries []qbtypes.QueryEnvelope) []string {
var warnings []string
for idx := range queries {
qe := &queries[idx]
limit := qe.GetImplicitLimit()
if limit == 0 {
continue
}
name := qe.GetQueryName()
result, ok := results[name]
if !ok {
continue
}
if requestType.IsAggregation() {
if trimGroupsToLimit(result, limit, qe.GetOrder()) {
warnings = append(warnings, fmt.Sprintf(implicitLimitWarn, name, limit))
}
}
}
return warnings
}
// trimGroupsToLimit clips a result to at most limit groups and reports whether it had to trim.
func trimGroupsToLimit(result any, limit int, orderBy []qbtypes.OrderBy) bool {
switch value := result.(type) {
case *qbtypes.TimeSeriesData:
if len(value.Aggregations) == 0 || len(value.Aggregations[0].Series) <= limit {
return false
}
// consume yields series in map (non-deterministic) order, so re-derive the CTE's top-N by
// the query order: aggregation keys rank by series total (not per-bucket average), group
// keys by label. All buckets share the same group set, so pick survivors once and filter.
ranked := append([]*qbtypes.TimeSeries(nil), value.Aggregations[0].Series...)
slices.SortStableFunc(ranked, func(a, b *qbtypes.TimeSeries) int {
return compareSeriesForTrim(a, b, orderBy)
})
keep := make(map[string]struct{}, limit)
for _, s := range ranked[:limit] {
keep[qbtypes.GetUniqueSeriesKey(s.Labels)] = struct{}{}
}
for _, bucket := range value.Aggregations {
kept := make([]*qbtypes.TimeSeries, 0, len(keep))
for _, s := range bucket.Series {
if _, ok := keep[qbtypes.GetUniqueSeriesKey(s.Labels)]; ok {
kept = append(kept, s)
}
}
bucket.Series = kept
}
return true
case *qbtypes.ScalarData:
if len(value.Data) <= limit {
return false
}
// Rows arrive SQL-ordered; re-sort by the aggregation only when no order was given.
if len(orderBy) == 0 {
sortByFirstAggregation(value.Data, value.Columns)
}
value.Data = value.Data[:limit]
return true
}
return false
}
// compareSeriesForTrim orders two series by the query order for trimming: a group by key is
// compared by its label value, anything else (an aggregation) by series total. With no order it
// falls back to total descending.
func compareSeriesForTrim(a, b *qbtypes.TimeSeries, orderBy []qbtypes.OrderBy) int {
if len(orderBy) == 0 {
return cmpFloat(seriesTotal(b), seriesTotal(a))
}
for _, o := range orderBy {
var c int
if va, ok := seriesLabelValue(a, o.Key.Name); ok {
vb, _ := seriesLabelValue(b, o.Key.Name)
c = strings.Compare(va, vb)
} else {
c = cmpFloat(seriesTotal(a), seriesTotal(b))
}
if c != 0 {
if o.Direction == qbtypes.OrderDirectionDesc {
return -c
}
return c
}
}
return 0
}
// seriesTotal sums a series' evaluable values — the per-group total the limit CTE ranks by.
func seriesTotal(s *qbtypes.TimeSeries) float64 {
var total float64
for _, v := range s.EvaluableValues() {
total += v.Value
}
return total
}
// seriesLabelValue returns the string value of the series label with the given key name.
func seriesLabelValue(s *qbtypes.TimeSeries, name string) (string, bool) {
for _, l := range s.Labels {
if l.Key.Name == name {
if str, ok := l.Value.(string); ok {
return str, true
}
return fmt.Sprintf("%v", l.Value), true
}
}
return "", false
}
func cmpFloat(a, b float64) int {
switch {
case a < b:
return -1
case a > b:
return 1
default:
return 0
}
}

View File

@@ -0,0 +1,138 @@
package querier
import (
"testing"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/stretchr/testify/require"
)
func tsWithValue(label string, value float64) *qbtypes.TimeSeries {
return &qbtypes.TimeSeries{
Labels: []*qbtypes.Label{{Key: telemetrytypes.TelemetryFieldKey{Name: "body.k"}, Value: label}},
Values: []*qbtypes.TimeSeriesValue{{Timestamp: 1, Value: value}},
}
}
func tsWithValues(label string, values ...float64) *qbtypes.TimeSeries {
pts := make([]*qbtypes.TimeSeriesValue, len(values))
for i, v := range values {
pts[i] = &qbtypes.TimeSeriesValue{Timestamp: int64(i + 1), Value: v}
}
return &qbtypes.TimeSeries{
Labels: []*qbtypes.Label{{Key: telemetrytypes.TelemetryFieldKey{Name: "body.k"}, Value: label}},
Values: pts,
}
}
func seriesLabel(s *qbtypes.TimeSeries) string { return s.Labels[0].Value.(string) }
// A is spread thin (total 5, avg ~1.25); B/C are concentrated (avg 3, 2). Ranking by average
// would drop the high-total A. The fix keeps the top groups by total: A and B.
func TestTrimGroupsToLimit_TimeSeriesKeepsTopByTotalNotAverage(t *testing.T) {
tsd := &qbtypes.TimeSeriesData{
Aggregations: []*qbtypes.AggregationBucket{{
Series: []*qbtypes.TimeSeries{
tsWithValues("A", 1, 1, 1, 1, 1), // total 5, avg 1
tsWithValue("B", 3), // total 3, avg 3
tsWithValue("C", 2), // total 2, avg 2
},
}},
}
trimmed := trimGroupsToLimit(tsd, 2, nil)
require.True(t, trimmed)
kept := []string{seriesLabel(tsd.Aggregations[0].Series[0]), seriesLabel(tsd.Aggregations[0].Series[1])}
require.ElementsMatch(t, []string{"A", "B"}, kept, "must keep top-2 by total (A=5, B=3), not by average")
}
// Series in ascending value order: a naive Series[:limit] keeps the lowest; the trim must not.
func TestTrimGroupsToLimit_TimeSeriesKeepsTopByValue(t *testing.T) {
tsd := &qbtypes.TimeSeriesData{
Aggregations: []*qbtypes.AggregationBucket{{
Series: []*qbtypes.TimeSeries{
tsWithValue("a", 1),
tsWithValue("b", 2),
tsWithValue("c", 3),
},
}},
}
trimmed := trimGroupsToLimit(tsd, 2, nil)
require.True(t, trimmed)
require.Len(t, tsd.Aggregations[0].Series, 2)
kept := []float64{
tsd.Aggregations[0].Series[0].Values[0].Value,
tsd.Aggregations[0].Series[1].Values[0].Value,
}
require.ElementsMatch(t, []float64{3, 2}, kept, "must keep the top-2 by value, not an arbitrary subset")
}
func TestTrimGroupsToLimit_ScalarKeepsTopByValueViaSeriesLimit(t *testing.T) {
// Rows not in value order: the trim must re-sort and keep b=3, c=2, not the leading rows.
sd := &qbtypes.ScalarData{
Columns: []*qbtypes.ColumnDescriptor{
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "body.k"}, Type: qbtypes.ColumnTypeGroup},
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "__result_0"}, Type: qbtypes.ColumnTypeAggregation},
},
Data: [][]any{{"a", 1.0}, {"b", 3.0}, {"c", 2.0}},
}
trimmed := trimGroupsToLimit(sd, 2, nil)
require.True(t, trimmed)
require.Len(t, sd.Data, 2)
require.Equal(t, []any{"b", 3.0}, sd.Data[0])
require.Equal(t, []any{"c", 2.0}, sd.Data[1])
}
func TestTrimGroupsToLimit_NoTrimWhenUnderLimit(t *testing.T) {
tsd := &qbtypes.TimeSeriesData{
Aggregations: []*qbtypes.AggregationBucket{{
Series: []*qbtypes.TimeSeries{tsWithValue("a", 1), tsWithValue("b", 2)},
}},
}
require.False(t, trimGroupsToLimit(tsd, 5, nil))
require.Len(t, tsd.Aggregations[0].Series, 2)
}
func TestEnforceImplicitLimit_WarnsAndTrimsOnlyLimitedQueries(t *testing.T) {
results := map[string]any{
"A": &qbtypes.TimeSeriesData{QueryName: "A", Aggregations: []*qbtypes.AggregationBucket{{
Series: []*qbtypes.TimeSeries{tsWithValue("a", 1), tsWithValue("b", 2), tsWithValue("c", 3)},
}}},
"B": &qbtypes.TimeSeriesData{QueryName: "B", Aggregations: []*qbtypes.AggregationBucket{{
Series: []*qbtypes.TimeSeries{tsWithValue("x", 9), tsWithValue("y", 8), tsWithValue("z", 7)},
}}},
}
// Only A is marked as capped; B must be left untouched even though it also has 3 series.
warnings := enforceImplicitLimitForTest(t, results, map[string]bool{"A": true}, 2)
require.Len(t, warnings, 1)
require.Contains(t, warnings[0], "A")
require.Len(t, results["A"].(*qbtypes.TimeSeriesData).Aggregations[0].Series, 2)
require.Len(t, results["B"].(*qbtypes.TimeSeriesData).Aggregations[0].Series, 3)
}
// enforceImplicitLimitForTest mirrors enforceImplicitLimit with a test-pinned limit.
func enforceImplicitLimitForTest(t *testing.T, results map[string]any, limited map[string]bool, limit int) []string {
t.Helper()
var warnings []string
for name := range limited {
result, ok := results[name]
if !ok {
continue
}
if trimGroupsToLimit(result, limit, nil) {
warnings = append(warnings, "Query "+name+" returned too many groups")
}
}
return warnings
}

View File

@@ -92,6 +92,9 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
req.Start = querybuilder.ToMilliSecs(req.Start)
req.End = querybuilder.ToMilliSecs(req.End)
// Default the group by order and cap unbounded body group by queries before building them.
req.Normalize()
event := &qbtypes.QBEvent{
Version: "v5",
NumberOfQueries: len(req.CompositeQuery.Queries),
@@ -645,6 +648,9 @@ func (q *querier) run(
}
}
// Trim capped queries before post-processing, which can reshape results and obscure counts.
warnings = append(warnings, enforceImplicitLimit(results, req.RequestType, req.CompositeQuery.Queries)...)
gomaps.Copy(results, preseededResults)
processedResults, err := q.postProcessResults(ctx, orgID, results, req)
if err != nil {

View File

@@ -71,6 +71,10 @@ type QueryBuilderQuery[T any] struct {
// ShiftBy is extracted from timeShift function for internal use
// This field is not serialized to JSON
ShiftBy int64 `json:"-"`
// implicitLimit is the row cap we imposed when the query had no explicit limit; 0 means the
// user set the limit (or none was imposed). The querier trims results back to this value.
implicitLimit int `json:"-"`
}
// PrepareJSONSchema pins `signal` to the single value implied by the aggregation

View File

@@ -466,6 +466,65 @@ func (q *QueryEnvelope) UseDefaultOrderByForListQuery() {
}
}
// DefaultGroupByLimit caps groups for a group by query with no explicit limit. The query runs
// with one extra row so the querier can detect when the cap was hit and warn.
const DefaultGroupByLimit = 1000
// Normalize prepares scalar/time series group by queries: it makes the implicit order explicit
// and caps unbounded body group by queries at DefaultGroupByLimit+1 rows, recording the implicit
// limit so the querier can trim and warn.
func (r *QueryRangeRequest) Normalize() {
if r.RequestType != RequestTypeScalar && r.RequestType != RequestTypeTimeSeries {
return
}
for idx := range r.CompositeQuery.Queries {
qe := &r.CompositeQuery.Queries[idx]
qe.UseDefaultGroupByOrder()
// Only body group by is capped for now; relax this gate to cap others.
if qe.GetLimit() == 0 && qe.HasBodyContextGroupBy() {
qe.SetImplicitLimit(DefaultGroupByLimit)
qe.SetLimit(DefaultGroupByLimit + 1)
}
}
}
// UseDefaultGroupByOrder defaults a log/trace group by query's order to the first aggregation
// descending (the SQL default) when none is set. Metrics order themselves.
func (q *QueryEnvelope) UseDefaultGroupByOrder() {
if len(q.GetGroupBy()) == 0 {
return
}
var aggExpr string
switch spec := q.Spec.(type) {
case QueryBuilderQuery[LogAggregation]:
if len(spec.Aggregations) > 0 {
aggExpr = spec.Aggregations[0].Expression
}
case QueryBuilderQuery[TraceAggregation]:
if len(spec.Aggregations) > 0 {
aggExpr = spec.Aggregations[0].Expression
}
default:
return
}
order := q.GetOrder()
if len(order) == 0 && aggExpr != "" {
order = append(order, OrderBy{
Key: OrderByKey{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: aggExpr}},
Direction: OrderDirectionDesc,
})
}
q.SetOrder(order)
}
func (r *QueryRangeRequest) FuncsForQuery(name string) []Function {
funcs := []Function{}
for _, query := range r.CompositeQuery.Queries {

View File

@@ -169,6 +169,30 @@ func (q *QueryEnvelope) GetLimit() int {
return 0
}
// GetImplicitLimit returns the row cap we imposed on an unbounded query, or 0 if the user set
// the limit.
func (q *QueryEnvelope) GetImplicitLimit() int {
switch spec := q.Spec.(type) {
case QueryBuilderQuery[TraceAggregation]:
return spec.implicitLimit
case QueryBuilderQuery[LogAggregation]:
return spec.implicitLimit
case QueryBuilderQuery[MetricAggregation]:
return spec.implicitLimit
}
return 0
}
// HasBodyContextGroupBy reports whether any group by column is in the log body context.
func (q *QueryEnvelope) HasBodyContextGroupBy() bool {
for _, key := range q.GetGroupBy() {
if key.FieldContext == telemetrytypes.FieldContextBody {
return true
}
}
return false
}
// GetOffset returns the row offset.
func (q *QueryEnvelope) GetOffset() int {
switch spec := q.Spec.(type) {

View File

@@ -194,6 +194,22 @@ func (q *QueryEnvelope) SetLimit(limit int) {
}
}
// SetImplicitLimit records the row cap we imposed on an unbounded query so the querier can trim
// results back to it and warn.
func (q *QueryEnvelope) SetImplicitLimit(limit int) {
switch spec := q.Spec.(type) {
case QueryBuilderQuery[TraceAggregation]:
spec.implicitLimit = limit
q.Spec = spec
case QueryBuilderQuery[LogAggregation]:
spec.implicitLimit = limit
q.Spec = spec
case QueryBuilderQuery[MetricAggregation]:
spec.implicitLimit = limit
q.Spec = spec
}
}
// SetOffset sets the row offset of the spec, if applicable.
func (q *QueryEnvelope) SetOffset(offset int) {
switch spec := q.Spec.(type) {

View File

@@ -0,0 +1,130 @@
import json
from collections.abc import Callable
from datetime import UTC, datetime, timedelta
import pytest
import requests
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.logs import Logs
from fixtures.querier import (
build_group_by_field,
build_logs_aggregation,
build_scalar_query,
get_all_series,
get_all_warnings,
get_scalar_table_data,
make_query_request,
)
DEFAULT_GROUP_BY_LIMIT = 1000
def _uid_log(now: datetime, seq: int, service: str, uid: str) -> Logs:
"""A log with a distinct body field `uid` and attribute `req_id`."""
return Logs(
timestamp=now - timedelta(seconds=5, milliseconds=seq),
resources={"service.name": service},
attributes={"req_id": f"r{seq:05d}"},
body_v2=json.dumps({"uid": uid}),
body_promoted="",
severity_text="INFO",
)
def _group_by(
signoz: types.SigNoz,
token: str,
start_ms: int,
end_ms: int,
*,
request_type: str,
group_by: list[dict],
limit: int | None,
filter_expression: str,
) -> requests.Response:
response = make_query_request(
signoz=signoz,
token=token,
start_ms=start_ms,
end_ms=end_ms,
queries=[
build_scalar_query(
name="A",
signal="logs",
aggregations=[build_logs_aggregation("count()")],
group_by=group_by,
limit=limit,
filter_expression=filter_expression,
)
],
request_type=request_type,
)
assert response.status_code == 200, response.text
assert response.json()["status"] == "success", response.text
return response
def _group_count(response: requests.Response, request_type: str) -> int:
"""Number of groups returned — table rows for scalar, series for time series."""
body = response.json()
if request_type == "scalar":
return len(get_scalar_table_data(body))
return len(get_all_series(body, "A"))
def _has_limit_warning(response: requests.Response) -> bool:
return any(f"limited to {DEFAULT_GROUP_BY_LIMIT}" in w.get("message", "") for w in get_all_warnings(response.json()))
@pytest.mark.parametrize("request_type", ["scalar", "time_series"])
def test_groupby_body_default_limit(
request_type: str,
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[list[Logs]], None],
export_json_types: Callable[[list[Logs]], None],
) -> None:
"""
An unbounded group by on a log body field is capped to a default limit; an
explicit limit or a non-body group by opts out. Holds for both scalar and time
series request types (the cap is on table rows / series respectively).
Seed > 1000 groups under a unique service, then assert:
1. Body group by, no explicit limit -> capped to 1000 groups + a "limited to
1000" warning.
2. Body group by, explicit limit -> no cap, no warning (an explicit limit opts
out of the default cap).
3. Non-body (attribute) group by, no limit -> not capped, no warning (the cap is
scoped to body group by).
"""
service = f"groupby-limit-{request_type}-svc"
filter_expression = f"service.name = '{service}'"
group_count = DEFAULT_GROUP_BY_LIMIT + 1
now = datetime.now(tz=UTC)
logs_list = [_uid_log(now, i, service, f"u{i:05d}") for i in range(group_count)]
export_json_types(logs_list)
insert_logs(logs_list)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
start_ms = int((now - timedelta(seconds=60)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
# 1. Body group by, no explicit limit -> capped to 1000 + warning.
capped = _group_by(signoz, token, start_ms, end_ms, request_type=request_type, group_by=[{"name": "body.uid"}], limit=None, filter_expression=filter_expression)
assert _group_count(capped, request_type) == DEFAULT_GROUP_BY_LIMIT
assert _has_limit_warning(capped), get_all_warnings(capped.json())
# 2. Body group by with an explicit limit -> no cap, no warning.
explicit = _group_by(signoz, token, start_ms, end_ms, request_type=request_type, group_by=[{"name": "body.uid"}], limit=group_count + 100, filter_expression=filter_expression)
assert _group_count(explicit, request_type) == group_count
assert get_all_warnings(explicit.json()) == []
# 3. Non-body (attribute) group by, no limit -> not capped (body-scoped).
attr = _group_by(signoz, token, start_ms, end_ms, request_type=request_type, group_by=[build_group_by_field("req_id", "string", "attribute")], limit=None, filter_expression=filter_expression)
assert _group_count(attr, request_type) == group_count
assert get_all_warnings(attr.json()) == []