mirror of
https://github.com/SigNoz/signoz.git
synced 2026-07-02 04:40:37 +01:00
Compare commits
2 Commits
feat/authz
...
tvats-boun
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e5a8877018 | ||
|
|
1f6b818950 |
147
pkg/querier/implicit_limit.go
Normal file
147
pkg/querier/implicit_limit.go
Normal file
@@ -0,0 +1,147 @@
|
||||
package querier
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"slices"
|
||||
"strings"
|
||||
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
)
|
||||
|
||||
var implicitLimitWarn = "Query %s returned too many results; output has been limited to %d. Add an explicit limit to the query to override this."
|
||||
|
||||
// enforceImplicitLimit trims each query that QueryRangeRequest.Normalize capped down to its
|
||||
// implicit limit, honoring its order, and returns a warning per trimmed query.
|
||||
func enforceImplicitLimit(results map[string]any, requestType qbtypes.RequestType, queries []qbtypes.QueryEnvelope) []string {
|
||||
var warnings []string
|
||||
for idx := range queries {
|
||||
qe := &queries[idx]
|
||||
|
||||
limit := qe.GetImplicitLimit()
|
||||
if limit == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
name := qe.GetQueryName()
|
||||
result, ok := results[name]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if requestType.IsAggregation() {
|
||||
if trimGroupsToLimit(result, limit, qe.GetOrder()) {
|
||||
warnings = append(warnings, fmt.Sprintf(implicitLimitWarn, name, limit))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return warnings
|
||||
}
|
||||
|
||||
// trimGroupsToLimit clips a result to at most limit groups and reports whether it had to trim.
|
||||
func trimGroupsToLimit(result any, limit int, orderBy []qbtypes.OrderBy) bool {
|
||||
switch value := result.(type) {
|
||||
case *qbtypes.TimeSeriesData:
|
||||
if len(value.Aggregations) == 0 || len(value.Aggregations[0].Series) <= limit {
|
||||
return false
|
||||
}
|
||||
|
||||
// consume yields series in map (non-deterministic) order, so re-derive the CTE's top-N by
|
||||
// the query order: aggregation keys rank by series total (not per-bucket average), group
|
||||
// keys by label. All buckets share the same group set, so pick survivors once and filter.
|
||||
ranked := append([]*qbtypes.TimeSeries(nil), value.Aggregations[0].Series...)
|
||||
slices.SortStableFunc(ranked, func(a, b *qbtypes.TimeSeries) int {
|
||||
return compareSeriesForTrim(a, b, orderBy)
|
||||
})
|
||||
|
||||
keep := make(map[string]struct{}, limit)
|
||||
for _, s := range ranked[:limit] {
|
||||
keep[qbtypes.GetUniqueSeriesKey(s.Labels)] = struct{}{}
|
||||
}
|
||||
|
||||
for _, bucket := range value.Aggregations {
|
||||
kept := make([]*qbtypes.TimeSeries, 0, len(keep))
|
||||
for _, s := range bucket.Series {
|
||||
if _, ok := keep[qbtypes.GetUniqueSeriesKey(s.Labels)]; ok {
|
||||
kept = append(kept, s)
|
||||
}
|
||||
}
|
||||
bucket.Series = kept
|
||||
}
|
||||
return true
|
||||
case *qbtypes.ScalarData:
|
||||
if len(value.Data) <= limit {
|
||||
return false
|
||||
}
|
||||
|
||||
// Rows arrive SQL-ordered; re-sort by the aggregation only when no order was given.
|
||||
if len(orderBy) == 0 {
|
||||
sortByFirstAggregation(value.Data, value.Columns)
|
||||
}
|
||||
value.Data = value.Data[:limit]
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// compareSeriesForTrim orders two series by the query order for trimming: a group by key is
|
||||
// compared by its label value, anything else (an aggregation) by series total. With no order it
|
||||
// falls back to total descending.
|
||||
func compareSeriesForTrim(a, b *qbtypes.TimeSeries, orderBy []qbtypes.OrderBy) int {
|
||||
if len(orderBy) == 0 {
|
||||
return cmpFloat(seriesTotal(b), seriesTotal(a))
|
||||
}
|
||||
|
||||
for _, o := range orderBy {
|
||||
var c int
|
||||
if va, ok := seriesLabelValue(a, o.Key.Name); ok {
|
||||
vb, _ := seriesLabelValue(b, o.Key.Name)
|
||||
c = strings.Compare(va, vb)
|
||||
} else {
|
||||
c = cmpFloat(seriesTotal(a), seriesTotal(b))
|
||||
}
|
||||
|
||||
if c != 0 {
|
||||
if o.Direction == qbtypes.OrderDirectionDesc {
|
||||
return -c
|
||||
}
|
||||
return c
|
||||
}
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
|
||||
// seriesTotal sums a series' evaluable values — the per-group total the limit CTE ranks by.
|
||||
func seriesTotal(s *qbtypes.TimeSeries) float64 {
|
||||
var total float64
|
||||
for _, v := range s.EvaluableValues() {
|
||||
total += v.Value
|
||||
}
|
||||
return total
|
||||
}
|
||||
|
||||
// seriesLabelValue returns the string value of the series label with the given key name.
|
||||
func seriesLabelValue(s *qbtypes.TimeSeries, name string) (string, bool) {
|
||||
for _, l := range s.Labels {
|
||||
if l.Key.Name == name {
|
||||
if str, ok := l.Value.(string); ok {
|
||||
return str, true
|
||||
}
|
||||
return fmt.Sprintf("%v", l.Value), true
|
||||
}
|
||||
}
|
||||
return "", false
|
||||
}
|
||||
|
||||
func cmpFloat(a, b float64) int {
|
||||
switch {
|
||||
case a < b:
|
||||
return -1
|
||||
case a > b:
|
||||
return 1
|
||||
default:
|
||||
return 0
|
||||
}
|
||||
}
|
||||
138
pkg/querier/implicit_limit_test.go
Normal file
138
pkg/querier/implicit_limit_test.go
Normal file
@@ -0,0 +1,138 @@
|
||||
package querier
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func tsWithValue(label string, value float64) *qbtypes.TimeSeries {
|
||||
return &qbtypes.TimeSeries{
|
||||
Labels: []*qbtypes.Label{{Key: telemetrytypes.TelemetryFieldKey{Name: "body.k"}, Value: label}},
|
||||
Values: []*qbtypes.TimeSeriesValue{{Timestamp: 1, Value: value}},
|
||||
}
|
||||
}
|
||||
|
||||
func tsWithValues(label string, values ...float64) *qbtypes.TimeSeries {
|
||||
pts := make([]*qbtypes.TimeSeriesValue, len(values))
|
||||
for i, v := range values {
|
||||
pts[i] = &qbtypes.TimeSeriesValue{Timestamp: int64(i + 1), Value: v}
|
||||
}
|
||||
return &qbtypes.TimeSeries{
|
||||
Labels: []*qbtypes.Label{{Key: telemetrytypes.TelemetryFieldKey{Name: "body.k"}, Value: label}},
|
||||
Values: pts,
|
||||
}
|
||||
}
|
||||
|
||||
func seriesLabel(s *qbtypes.TimeSeries) string { return s.Labels[0].Value.(string) }
|
||||
|
||||
// A is spread thin (total 5, avg ~1.25); B/C are concentrated (avg 3, 2). Ranking by average
|
||||
// would drop the high-total A. The fix keeps the top groups by total: A and B.
|
||||
func TestTrimGroupsToLimit_TimeSeriesKeepsTopByTotalNotAverage(t *testing.T) {
|
||||
tsd := &qbtypes.TimeSeriesData{
|
||||
Aggregations: []*qbtypes.AggregationBucket{{
|
||||
Series: []*qbtypes.TimeSeries{
|
||||
tsWithValues("A", 1, 1, 1, 1, 1), // total 5, avg 1
|
||||
tsWithValue("B", 3), // total 3, avg 3
|
||||
tsWithValue("C", 2), // total 2, avg 2
|
||||
},
|
||||
}},
|
||||
}
|
||||
|
||||
trimmed := trimGroupsToLimit(tsd, 2, nil)
|
||||
|
||||
require.True(t, trimmed)
|
||||
kept := []string{seriesLabel(tsd.Aggregations[0].Series[0]), seriesLabel(tsd.Aggregations[0].Series[1])}
|
||||
require.ElementsMatch(t, []string{"A", "B"}, kept, "must keep top-2 by total (A=5, B=3), not by average")
|
||||
}
|
||||
|
||||
// Series in ascending value order: a naive Series[:limit] keeps the lowest; the trim must not.
|
||||
func TestTrimGroupsToLimit_TimeSeriesKeepsTopByValue(t *testing.T) {
|
||||
tsd := &qbtypes.TimeSeriesData{
|
||||
Aggregations: []*qbtypes.AggregationBucket{{
|
||||
Series: []*qbtypes.TimeSeries{
|
||||
tsWithValue("a", 1),
|
||||
tsWithValue("b", 2),
|
||||
tsWithValue("c", 3),
|
||||
},
|
||||
}},
|
||||
}
|
||||
|
||||
trimmed := trimGroupsToLimit(tsd, 2, nil)
|
||||
|
||||
require.True(t, trimmed)
|
||||
require.Len(t, tsd.Aggregations[0].Series, 2)
|
||||
|
||||
kept := []float64{
|
||||
tsd.Aggregations[0].Series[0].Values[0].Value,
|
||||
tsd.Aggregations[0].Series[1].Values[0].Value,
|
||||
}
|
||||
require.ElementsMatch(t, []float64{3, 2}, kept, "must keep the top-2 by value, not an arbitrary subset")
|
||||
}
|
||||
|
||||
func TestTrimGroupsToLimit_ScalarKeepsTopByValueViaSeriesLimit(t *testing.T) {
|
||||
// Rows not in value order: the trim must re-sort and keep b=3, c=2, not the leading rows.
|
||||
sd := &qbtypes.ScalarData{
|
||||
Columns: []*qbtypes.ColumnDescriptor{
|
||||
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "body.k"}, Type: qbtypes.ColumnTypeGroup},
|
||||
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "__result_0"}, Type: qbtypes.ColumnTypeAggregation},
|
||||
},
|
||||
Data: [][]any{{"a", 1.0}, {"b", 3.0}, {"c", 2.0}},
|
||||
}
|
||||
|
||||
trimmed := trimGroupsToLimit(sd, 2, nil)
|
||||
|
||||
require.True(t, trimmed)
|
||||
require.Len(t, sd.Data, 2)
|
||||
require.Equal(t, []any{"b", 3.0}, sd.Data[0])
|
||||
require.Equal(t, []any{"c", 2.0}, sd.Data[1])
|
||||
}
|
||||
|
||||
func TestTrimGroupsToLimit_NoTrimWhenUnderLimit(t *testing.T) {
|
||||
tsd := &qbtypes.TimeSeriesData{
|
||||
Aggregations: []*qbtypes.AggregationBucket{{
|
||||
Series: []*qbtypes.TimeSeries{tsWithValue("a", 1), tsWithValue("b", 2)},
|
||||
}},
|
||||
}
|
||||
|
||||
require.False(t, trimGroupsToLimit(tsd, 5, nil))
|
||||
require.Len(t, tsd.Aggregations[0].Series, 2)
|
||||
}
|
||||
|
||||
func TestEnforceImplicitLimit_WarnsAndTrimsOnlyLimitedQueries(t *testing.T) {
|
||||
results := map[string]any{
|
||||
"A": &qbtypes.TimeSeriesData{QueryName: "A", Aggregations: []*qbtypes.AggregationBucket{{
|
||||
Series: []*qbtypes.TimeSeries{tsWithValue("a", 1), tsWithValue("b", 2), tsWithValue("c", 3)},
|
||||
}}},
|
||||
"B": &qbtypes.TimeSeriesData{QueryName: "B", Aggregations: []*qbtypes.AggregationBucket{{
|
||||
Series: []*qbtypes.TimeSeries{tsWithValue("x", 9), tsWithValue("y", 8), tsWithValue("z", 7)},
|
||||
}}},
|
||||
}
|
||||
|
||||
// Only A is marked as capped; B must be left untouched even though it also has 3 series.
|
||||
warnings := enforceImplicitLimitForTest(t, results, map[string]bool{"A": true}, 2)
|
||||
|
||||
require.Len(t, warnings, 1)
|
||||
require.Contains(t, warnings[0], "A")
|
||||
require.Len(t, results["A"].(*qbtypes.TimeSeriesData).Aggregations[0].Series, 2)
|
||||
require.Len(t, results["B"].(*qbtypes.TimeSeriesData).Aggregations[0].Series, 3)
|
||||
}
|
||||
|
||||
// enforceImplicitLimitForTest mirrors enforceImplicitLimit with a test-pinned limit.
|
||||
func enforceImplicitLimitForTest(t *testing.T, results map[string]any, limited map[string]bool, limit int) []string {
|
||||
t.Helper()
|
||||
|
||||
var warnings []string
|
||||
for name := range limited {
|
||||
result, ok := results[name]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if trimGroupsToLimit(result, limit, nil) {
|
||||
warnings = append(warnings, "Query "+name+" returned too many groups")
|
||||
}
|
||||
}
|
||||
return warnings
|
||||
}
|
||||
@@ -92,6 +92,9 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
|
||||
req.Start = querybuilder.ToMilliSecs(req.Start)
|
||||
req.End = querybuilder.ToMilliSecs(req.End)
|
||||
|
||||
// Default the group by order and cap unbounded body group by queries before building them.
|
||||
req.Normalize()
|
||||
|
||||
event := &qbtypes.QBEvent{
|
||||
Version: "v5",
|
||||
NumberOfQueries: len(req.CompositeQuery.Queries),
|
||||
@@ -645,6 +648,9 @@ func (q *querier) run(
|
||||
}
|
||||
}
|
||||
|
||||
// Trim capped queries before post-processing, which can reshape results and obscure counts.
|
||||
warnings = append(warnings, enforceImplicitLimit(results, req.RequestType, req.CompositeQuery.Queries)...)
|
||||
|
||||
gomaps.Copy(results, preseededResults)
|
||||
processedResults, err := q.postProcessResults(ctx, orgID, results, req)
|
||||
if err != nil {
|
||||
|
||||
@@ -71,6 +71,10 @@ type QueryBuilderQuery[T any] struct {
|
||||
// ShiftBy is extracted from timeShift function for internal use
|
||||
// This field is not serialized to JSON
|
||||
ShiftBy int64 `json:"-"`
|
||||
|
||||
// implicitLimit is the row cap we imposed when the query had no explicit limit; 0 means the
|
||||
// user set the limit (or none was imposed). The querier trims results back to this value.
|
||||
implicitLimit int `json:"-"`
|
||||
}
|
||||
|
||||
// PrepareJSONSchema pins `signal` to the single value implied by the aggregation
|
||||
|
||||
@@ -466,6 +466,65 @@ func (q *QueryEnvelope) UseDefaultOrderByForListQuery() {
|
||||
}
|
||||
}
|
||||
|
||||
// DefaultGroupByLimit caps groups for a group by query with no explicit limit. The query runs
|
||||
// with one extra row so the querier can detect when the cap was hit and warn.
|
||||
const DefaultGroupByLimit = 1000
|
||||
|
||||
// Normalize prepares scalar/time series group by queries: it makes the implicit order explicit
|
||||
// and caps unbounded body group by queries at DefaultGroupByLimit+1 rows, recording the implicit
|
||||
// limit so the querier can trim and warn.
|
||||
func (r *QueryRangeRequest) Normalize() {
|
||||
if r.RequestType != RequestTypeScalar && r.RequestType != RequestTypeTimeSeries {
|
||||
return
|
||||
}
|
||||
|
||||
for idx := range r.CompositeQuery.Queries {
|
||||
qe := &r.CompositeQuery.Queries[idx]
|
||||
qe.UseDefaultGroupByOrder()
|
||||
|
||||
// Only body group by is capped for now; relax this gate to cap others.
|
||||
if qe.GetLimit() == 0 && qe.HasBodyContextGroupBy() {
|
||||
qe.SetImplicitLimit(DefaultGroupByLimit)
|
||||
qe.SetLimit(DefaultGroupByLimit + 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// UseDefaultGroupByOrder defaults a log/trace group by query's order to the first aggregation
|
||||
// descending (the SQL default) when none is set. Metrics order themselves.
|
||||
func (q *QueryEnvelope) UseDefaultGroupByOrder() {
|
||||
|
||||
if len(q.GetGroupBy()) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
var aggExpr string
|
||||
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
if len(spec.Aggregations) > 0 {
|
||||
aggExpr = spec.Aggregations[0].Expression
|
||||
}
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
if len(spec.Aggregations) > 0 {
|
||||
aggExpr = spec.Aggregations[0].Expression
|
||||
}
|
||||
default:
|
||||
return
|
||||
}
|
||||
|
||||
order := q.GetOrder()
|
||||
|
||||
if len(order) == 0 && aggExpr != "" {
|
||||
order = append(order, OrderBy{
|
||||
Key: OrderByKey{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: aggExpr}},
|
||||
Direction: OrderDirectionDesc,
|
||||
})
|
||||
}
|
||||
|
||||
q.SetOrder(order)
|
||||
}
|
||||
|
||||
func (r *QueryRangeRequest) FuncsForQuery(name string) []Function {
|
||||
funcs := []Function{}
|
||||
for _, query := range r.CompositeQuery.Queries {
|
||||
|
||||
@@ -169,6 +169,30 @@ func (q *QueryEnvelope) GetLimit() int {
|
||||
return 0
|
||||
}
|
||||
|
||||
// GetImplicitLimit returns the row cap we imposed on an unbounded query, or 0 if the user set
|
||||
// the limit.
|
||||
func (q *QueryEnvelope) GetImplicitLimit() int {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
return spec.implicitLimit
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
return spec.implicitLimit
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
return spec.implicitLimit
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// HasBodyContextGroupBy reports whether any group by column is in the log body context.
|
||||
func (q *QueryEnvelope) HasBodyContextGroupBy() bool {
|
||||
for _, key := range q.GetGroupBy() {
|
||||
if key.FieldContext == telemetrytypes.FieldContextBody {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// GetOffset returns the row offset.
|
||||
func (q *QueryEnvelope) GetOffset() int {
|
||||
switch spec := q.Spec.(type) {
|
||||
|
||||
@@ -194,6 +194,22 @@ func (q *QueryEnvelope) SetLimit(limit int) {
|
||||
}
|
||||
}
|
||||
|
||||
// SetImplicitLimit records the row cap we imposed on an unbounded query so the querier can trim
|
||||
// results back to it and warn.
|
||||
func (q *QueryEnvelope) SetImplicitLimit(limit int) {
|
||||
switch spec := q.Spec.(type) {
|
||||
case QueryBuilderQuery[TraceAggregation]:
|
||||
spec.implicitLimit = limit
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[LogAggregation]:
|
||||
spec.implicitLimit = limit
|
||||
q.Spec = spec
|
||||
case QueryBuilderQuery[MetricAggregation]:
|
||||
spec.implicitLimit = limit
|
||||
q.Spec = spec
|
||||
}
|
||||
}
|
||||
|
||||
// SetOffset sets the row offset of the spec, if applicable.
|
||||
func (q *QueryEnvelope) SetOffset(offset int) {
|
||||
switch spec := q.Spec.(type) {
|
||||
|
||||
130
tests/integration/tests/querier_json_body/03_group_by.py
Normal file
130
tests/integration/tests/querier_json_body/03_group_by.py
Normal file
@@ -0,0 +1,130 @@
|
||||
import json
|
||||
from collections.abc import Callable
|
||||
from datetime import UTC, datetime, timedelta
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
|
||||
from fixtures import types
|
||||
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
|
||||
from fixtures.logs import Logs
|
||||
from fixtures.querier import (
|
||||
build_group_by_field,
|
||||
build_logs_aggregation,
|
||||
build_scalar_query,
|
||||
get_all_series,
|
||||
get_all_warnings,
|
||||
get_scalar_table_data,
|
||||
make_query_request,
|
||||
)
|
||||
|
||||
DEFAULT_GROUP_BY_LIMIT = 1000
|
||||
|
||||
|
||||
def _uid_log(now: datetime, seq: int, service: str, uid: str) -> Logs:
|
||||
"""A log with a distinct body field `uid` and attribute `req_id`."""
|
||||
return Logs(
|
||||
timestamp=now - timedelta(seconds=5, milliseconds=seq),
|
||||
resources={"service.name": service},
|
||||
attributes={"req_id": f"r{seq:05d}"},
|
||||
body_v2=json.dumps({"uid": uid}),
|
||||
body_promoted="",
|
||||
severity_text="INFO",
|
||||
)
|
||||
|
||||
|
||||
def _group_by(
|
||||
signoz: types.SigNoz,
|
||||
token: str,
|
||||
start_ms: int,
|
||||
end_ms: int,
|
||||
*,
|
||||
request_type: str,
|
||||
group_by: list[dict],
|
||||
limit: int | None,
|
||||
filter_expression: str,
|
||||
) -> requests.Response:
|
||||
response = make_query_request(
|
||||
signoz=signoz,
|
||||
token=token,
|
||||
start_ms=start_ms,
|
||||
end_ms=end_ms,
|
||||
queries=[
|
||||
build_scalar_query(
|
||||
name="A",
|
||||
signal="logs",
|
||||
aggregations=[build_logs_aggregation("count()")],
|
||||
group_by=group_by,
|
||||
limit=limit,
|
||||
filter_expression=filter_expression,
|
||||
)
|
||||
],
|
||||
request_type=request_type,
|
||||
)
|
||||
assert response.status_code == 200, response.text
|
||||
assert response.json()["status"] == "success", response.text
|
||||
return response
|
||||
|
||||
|
||||
def _group_count(response: requests.Response, request_type: str) -> int:
|
||||
"""Number of groups returned — table rows for scalar, series for time series."""
|
||||
body = response.json()
|
||||
if request_type == "scalar":
|
||||
return len(get_scalar_table_data(body))
|
||||
return len(get_all_series(body, "A"))
|
||||
|
||||
|
||||
def _has_limit_warning(response: requests.Response) -> bool:
|
||||
return any(f"limited to {DEFAULT_GROUP_BY_LIMIT}" in w.get("message", "") for w in get_all_warnings(response.json()))
|
||||
|
||||
|
||||
@pytest.mark.parametrize("request_type", ["scalar", "time_series"])
|
||||
def test_groupby_body_default_limit(
|
||||
request_type: str,
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_logs: Callable[[list[Logs]], None],
|
||||
export_json_types: Callable[[list[Logs]], None],
|
||||
) -> None:
|
||||
"""
|
||||
An unbounded group by on a log body field is capped to a default limit; an
|
||||
explicit limit or a non-body group by opts out. Holds for both scalar and time
|
||||
series request types (the cap is on table rows / series respectively).
|
||||
|
||||
Seed > 1000 groups under a unique service, then assert:
|
||||
1. Body group by, no explicit limit -> capped to 1000 groups + a "limited to
|
||||
1000" warning.
|
||||
2. Body group by, explicit limit -> no cap, no warning (an explicit limit opts
|
||||
out of the default cap).
|
||||
3. Non-body (attribute) group by, no limit -> not capped, no warning (the cap is
|
||||
scoped to body group by).
|
||||
"""
|
||||
service = f"groupby-limit-{request_type}-svc"
|
||||
filter_expression = f"service.name = '{service}'"
|
||||
group_count = DEFAULT_GROUP_BY_LIMIT + 1
|
||||
|
||||
now = datetime.now(tz=UTC)
|
||||
logs_list = [_uid_log(now, i, service, f"u{i:05d}") for i in range(group_count)]
|
||||
|
||||
export_json_types(logs_list)
|
||||
insert_logs(logs_list)
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
start_ms = int((now - timedelta(seconds=60)).timestamp() * 1000)
|
||||
end_ms = int(now.timestamp() * 1000)
|
||||
|
||||
# 1. Body group by, no explicit limit -> capped to 1000 + warning.
|
||||
capped = _group_by(signoz, token, start_ms, end_ms, request_type=request_type, group_by=[{"name": "body.uid"}], limit=None, filter_expression=filter_expression)
|
||||
assert _group_count(capped, request_type) == DEFAULT_GROUP_BY_LIMIT
|
||||
assert _has_limit_warning(capped), get_all_warnings(capped.json())
|
||||
|
||||
# 2. Body group by with an explicit limit -> no cap, no warning.
|
||||
explicit = _group_by(signoz, token, start_ms, end_ms, request_type=request_type, group_by=[{"name": "body.uid"}], limit=group_count + 100, filter_expression=filter_expression)
|
||||
assert _group_count(explicit, request_type) == group_count
|
||||
assert get_all_warnings(explicit.json()) == []
|
||||
|
||||
# 3. Non-body (attribute) group by, no limit -> not capped (body-scoped).
|
||||
attr = _group_by(signoz, token, start_ms, end_ms, request_type=request_type, group_by=[build_group_by_field("req_id", "string", "attribute")], limit=None, filter_expression=filter_expression)
|
||||
assert _group_count(attr, request_type) == group_count
|
||||
assert get_all_warnings(attr.json()) == []
|
||||
Reference in New Issue
Block a user