Compare commits

...

1 Commits

Author SHA1 Message Date
Tushar Vats
a8c5bbdf17 fix: qb warnings 2026-06-01 04:43:50 +05:30
5 changed files with 457 additions and 57 deletions

View File

@@ -679,21 +679,7 @@ func (bc *bucketCache) mergeAndDeduplicateBuckets(existing, fresh []*qbtypes.Cac
// deduplicateWarnings removes duplicate warnings.
func (bc *bucketCache) deduplicateWarnings(warnings []string) []string {
if len(warnings) == 0 {
return nil
}
seen := make(map[string]bool, len(warnings))
unique := make([]string, 0, len(warnings)) // Pre-allocate capacity
for _, warning := range warnings {
if !seen[warning] {
seen[warning] = true
unique = append(unique, warning)
}
}
return unique
return dedupeWarnings(warnings)
}
// trimResultToFluxBoundary trims the result to exclude data points beyond the flux boundary.

View File

@@ -642,6 +642,12 @@ func (q *querier) run(
},
}
// Warnings can arrive duplicated: the bucket cache returns the cached
// portion's warnings alongside an identical warning emitted by every
// freshly-executed missing range (see mergeResults), and distinct queries
// can surface the same warning. Collapse exact duplicates before building
// the response.
warnings = dedupeWarnings(warnings)
if len(warnings) != 0 {
warns := make([]qbtypes.QueryWarnDataAdditional, len(warnings))
for i, warning := range warnings {
@@ -1125,6 +1131,8 @@ func (q *querier) adjustStepInterval(queries []qbtypes.QueryEnvelope, start, end
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
if qe.GetSource() == telemetrytypes.SourceMeter {
clampStep(qe, meterRecommended, meterMin, &warnings)
// we don't want to return warnings for meter metrics.
warnings = nil
} else {
clampStep(qe, metricRecommended, metricMin, &warnings)
}
@@ -1140,3 +1148,21 @@ func (q *querier) adjustStepInterval(queries []qbtypes.QueryEnvelope, start, end
}
return warnings
}
// dedupeWarnings removes exact-duplicate warning messages while preserving the
// order of first occurrence. Returns nil for an empty input. Warning counts are
// tiny (a handful per request), so a linear scan beats the allocation and
// hashing overhead of a map.
func dedupeWarnings(warnings []string) []string {
if len(warnings) == 0 {
return nil
}
unique := make([]string, 0, len(warnings))
// N^2 is faster than map-based deduping for small warning counts, and it preserves order of first occurrence without extra bookkeeping.
for _, warning := range warnings {
if !slices.Contains(unique, warning) {
unique = append(unique, warning)
}
}
return unique
}

View File

@@ -28,6 +28,42 @@ class TelemetryFieldKey:
}
class RequestType:
RAW = "raw"
TIME_SERIES = "time_series"
SCALAR = "scalar"
TABLE = "table"
@dataclass
class Aggregation:
expression: str
alias: str | None = None
def to_dict(self) -> dict:
agg: dict[str, Any] = {"expression": self.expression}
if self.alias:
agg["alias"] = self.alias
return agg
@dataclass
class MetricAggregation:
metric_name: str
time_aggregation: str
space_aggregation: str
temporality: str = "cumulative"
def to_dict(self) -> dict:
agg: dict[str, Any] = {
"metricName": self.metric_name,
"timeAggregation": self.time_aggregation,
"spaceAggregation": self.space_aggregation,
"temporality": self.temporality,
}
return agg
@dataclass
class OrderBy:
key: TelemetryFieldKey
@@ -46,6 +82,8 @@ class BuilderQuery:
filter_expression: str | None = None
select_fields: list[TelemetryFieldKey] | None = None
order: list[OrderBy] | None = None
aggregations: list[Aggregation | MetricAggregation] | None = None
step_interval: int | None = None
def to_dict(self) -> dict:
spec: dict[str, Any] = {
@@ -62,6 +100,11 @@ class BuilderQuery:
spec["selectFields"] = [f.to_dict() for f in self.select_fields]
if self.order:
spec["order"] = [o.to_dict() if hasattr(o, "to_dict") else o for o in self.order]
if self.aggregations:
spec["aggregations"] = [agg.to_dict() if hasattr(agg, "to_dict") else agg for agg in self.aggregations]
if self.step_interval is not None:
spec["stepInterval"] = self.step_interval
return {"type": "builder_query", "spec": spec}
@@ -117,7 +160,7 @@ def make_query_request(
end_ms: int,
queries: list[dict],
*,
request_type: str = "time_series",
request_type: str = RequestType.TIME_SERIES,
format_options: dict | None = None,
variables: dict | None = None,
no_cache: bool = True,

View File

@@ -15,7 +15,6 @@ from fixtures.querier import (
build_group_by_field,
build_logs_aggregation,
build_order_by,
build_raw_query,
build_scalar_query,
find_named_result,
index_series_by_label,
@@ -2626,43 +2625,3 @@ def test_logs_aggregation_filter_by_trace_id(
orphan_count, orphan_warnings = _count(narrow_start_ms, now_ms, orphan_trace_id)
assert orphan_count == 1, f"Expected count=1 for orphan trace_id aggregation, got {orphan_count} — query may have been incorrectly short-circuited"
assert not any(outside_range_msg in m for m in orphan_warnings), f"Did not expect outside-range warning for orphan trace_id, got {orphan_warnings}"
def test_logs_list_ambigous_warnings(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[list[Logs]], None],
) -> None:
insert_logs(
[
Logs(
timestamp=datetime.now(tz=UTC) - timedelta(seconds=1),
resources={
"service.name": "java",
},
attributes={
"service.name": "java",
},
body="This is a log message, coming from a java application",
severity_text="DEBUG",
),
]
)
response = make_query_request(
signoz,
get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD),
start_ms=int((datetime.now(tz=UTC) - timedelta(minutes=1)).timestamp() * 1000),
end_ms=int(datetime.now(tz=UTC).timestamp() * 1000),
request_type="raw",
queries=[build_raw_query(name="A", signal="logs", filter_expression='service.name = "java"')],
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
warning = response.json()["data"].get("warning", None)
assert warning is not None
assert warning["message"] == "Encountered warnings"
assert len(warning.get("warnings")) > 0
assert any(["ambiguous" in w["message"] for w in warning.get("warnings")])

View File

@@ -0,0 +1,386 @@
from collections.abc import Callable
from datetime import UTC, datetime, timedelta
from http import HTTPStatus
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.logs import Logs
from fixtures.meter import MeterSample, make_meter_samples
from fixtures.querier import (
Aggregation,
BuilderQuery,
MetricAggregation,
RequestType,
make_query_request,
)
def test_resource_default_warning(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[list[Logs]], None],
) -> None:
insert_logs(
[
Logs(
timestamp=datetime.now(tz=UTC),
resources={
"service.name": "java",
},
attributes={
"service.name": "java",
},
body="This is a log message, coming from a java application",
severity_text="DEBUG",
),
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
response = make_query_request(
signoz,
token,
start_ms=int((datetime.now(tz=UTC) - timedelta(minutes=20)).timestamp() * 1000),
end_ms=int(datetime.now(tz=UTC).timestamp() * 1000),
request_type=RequestType.RAW,
queries=[
BuilderQuery(
name="A",
signal="logs",
filter_expression="service.name = 'java'",
).to_dict()
],
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
warning = response.json()["data"].get("warning", None)
assert warning is not None
assert warning["message"] == "Encountered warnings"
expected_service_name_warning = (
"Key `service.name` is ambiguous, found 2 different combinations of "
"field context / data type: [name=service.name,context=resource,datatype=string "
"name=service.name,context=attribute,datatype=string]. Using `resource` context "
"by default. To query attributes explicitly, use the fully qualified name "
"(e.g., 'attribute.service.name')"
)
assert warning["warnings"] == [
{"message": expected_service_name_warning},
]
def test_key_collision_warning(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[list[Logs]], None],
) -> None:
insert_logs(
[
Logs(
timestamp=datetime.now(tz=UTC),
resources={
"service.name": "java",
},
attributes={
"service.name": "java",
"http.status_code": 200,
},
body="This is a log message, coming from a java application",
severity_text="DEBUG",
),
Logs(
timestamp=datetime.now(tz=UTC),
resources={
"service.name": "java",
},
attributes={
"service.name": "java",
"http.status_code": "200",
},
body="This is a log message, coming from a java application",
severity_text="DEBUG",
),
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
response = make_query_request(
signoz,
token,
start_ms=int((datetime.now(tz=UTC) - timedelta(minutes=20)).timestamp() * 1000),
end_ms=int(datetime.now(tz=UTC).timestamp() * 1000),
request_type=RequestType.RAW,
queries=[
BuilderQuery(
name="A",
signal="logs",
filter_expression="http.status_code = '200'",
).to_dict()
],
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
warning = response.json()["data"].get("warning", None)
assert warning is not None
assert warning["message"] == "Encountered warnings"
expected_http_status_code_warning = "Key `http.status_code` is ambiguous, found 2 different combinations of field context / data type: [name=http.status_code,context=attribute,datatype=number name=http.status_code,context=attribute,datatype=string]."
assert warning["warnings"] == [
{"message": expected_http_status_code_warning},
]
def test_deduped_warnings_for_single_query(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[list[Logs]], None],
) -> None:
insert_logs(
[
*[
Logs(
timestamp=datetime.now(tz=UTC) - timedelta(minutes=i * 10),
resources={
"service.name": "java",
},
attributes={
"service.name": "java",
"http.status_code": 200,
},
body="This is a log message, coming from a java application",
severity_text="DEBUG",
)
for i in range(10)
],
Logs(
timestamp=datetime.now(tz=UTC),
resources={
"service.name": "java",
},
attributes={
"service.name": "java",
"http.status_code": "200",
},
body="This is a log message, coming from a java application",
severity_text="DEBUG",
),
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# `service.name` (resource vs attribute) and `http.status_code` (number vs
# string) are both ambiguous keys, so referencing them in the filter makes
# the querier emit an "is ambiguous" warning while *building* the query.
#
# This test targets the bucket-cache warning-merge path: the cache stores a
# query's warnings alongside its buckets, and on a partial cache hit
# executeWithCache merges the cached warnings with the warnings re-emitted by
# each freshly executed missing range (querier.mergeResults). Before the
# dedup fix the same message appeared once per executed range.
query = BuilderQuery(
name="A",
signal="logs",
step_interval=600,
filter_expression="service.name = 'java' and http.status_code = 200",
aggregations=[Aggregation(expression="count()")],
).to_dict()
# Anchor both windows to a single "now" so their step-aligned boundaries
# match. Both windows end well before the 5m flux interval so the results
# are cacheable.
now_ms = int(datetime.now(tz=UTC).timestamp() * 1000)
minute = 60 * 1000
# First request populates the cache for [-90m, -30m], storing the warnings.
first = make_query_request(
signoz,
token,
start_ms=now_ms - 90 * minute,
end_ms=now_ms - 30 * minute,
request_type=RequestType.TIME_SERIES,
queries=[query],
no_cache=False,
)
assert first.status_code == HTTPStatus.OK
assert first.json()["status"] == "success"
# Second request reuses the cached [-90m, -30m] buckets (which carry the
# cached warnings) and executes only the trailing [-30m, -20m] step fresh,
# which re-emits the same warnings — exercising the cache/fresh merge.
response = make_query_request(
signoz,
token,
start_ms=now_ms - 90 * minute,
end_ms=now_ms - 20 * minute,
request_type=RequestType.TIME_SERIES,
queries=[query],
no_cache=False,
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
warning = response.json()["data"].get("warning", None)
assert warning is not None
assert warning["message"] == "Encountered warnings"
# Each ambiguity warning arrives from both the cached portion and the fresh
# missing range; after deduplication each distinct message appears once.
expected_service_name_warning = (
"Key `service.name` is ambiguous, found 2 different combinations of "
"field context / data type: [name=service.name,context=resource,datatype=string "
"name=service.name,context=attribute,datatype=string]. Using `resource` context "
"by default. To query attributes explicitly, use the fully qualified name "
"(e.g., 'attribute.service.name')"
)
expected_status_code_warning = "Key `http.status_code` is ambiguous, found 2 different combinations of field context / data type: [name=http.status_code,context=attribute,datatype=number name=http.status_code,context=attribute,datatype=string]."
assert warning["warnings"] == [
{"message": expected_service_name_warning},
{"message": expected_status_code_warning},
]
def test_deduped_warnings_for_multiple_queries(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[list[Logs]], None],
) -> None:
insert_logs(
[
*[
Logs(
timestamp=datetime.now(tz=UTC) - timedelta(minutes=i * 10),
resources={
"service.name": "java",
},
attributes={
"service.name": "java",
"http.status_code": 200,
},
body="This is a log message, coming from a java application",
severity_text="DEBUG",
)
for i in range(10)
],
Logs(
timestamp=datetime.now(tz=UTC),
resources={
"service.name": "java",
},
attributes={
"service.name": "java",
"http.status_code": "200",
},
body="This is a log message, coming from a java application",
severity_text="DEBUG",
),
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# `service.name` (resource vs attribute) and `http.status_code` (number vs
# string) are both ambiguous keys, so referencing them in the filter makes
# the querier emit an "is ambiguous" warning while *building* the query.
query_1 = BuilderQuery(
name="A",
signal="logs",
step_interval=600,
filter_expression="service.name = 'java' and http.status_code = 200",
aggregations=[Aggregation(expression="count()")],
).to_dict()
query_2 = BuilderQuery(
name="B",
signal="logs",
step_interval=600,
filter_expression="service.name != '_java' and http.status_code = 200",
aggregations=[Aggregation(expression="count()")],
).to_dict()
now_ms = int(datetime.now(tz=UTC).timestamp() * 1000)
minute = 60 * 1000
response = make_query_request(
signoz,
token,
start_ms=now_ms - 90 * minute,
end_ms=now_ms - 20 * minute,
request_type=RequestType.TIME_SERIES,
queries=[query_1, query_2],
no_cache=False,
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
warning = response.json()["data"].get("warning", None)
assert warning is not None
assert warning["message"] == "Encountered warnings"
expected_service_name_warning = (
"Key `service.name` is ambiguous, found 2 different combinations of "
"field context / data type: [name=service.name,context=resource,datatype=string "
"name=service.name,context=attribute,datatype=string]. Using `resource` context "
"by default. To query attributes explicitly, use the fully qualified name "
"(e.g., 'attribute.service.name')"
)
expected_status_code_warning = "Key `http.status_code` is ambiguous, found 2 different combinations of field context / data type: [name=http.status_code,context=attribute,datatype=number name=http.status_code,context=attribute,datatype=string]."
assert warning["warnings"] == [
{"message": expected_service_name_warning},
{"message": expected_status_code_warning},
]
def test_no_warnings_for_meter_query(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_meter_samples: Callable[[list[MeterSample]], None],
) -> None:
# Meter queries deliberately suppress the step-interval clamp warning. The
# minimum allowed step for a meter is 1h, so a 10m (600s) step over this
# range would normally clamp and emit a warning for a regular metric — for a
# meter source the querier discards it. This asserts no warning leaks out.
now = datetime.now(tz=UTC).replace(second=0, microsecond=0)
metric_name = "signoz.meter.log.size"
insert_meter_samples(
make_meter_samples(
metric_name,
{"service": "test-service"},
now,
count=60,
temporality="Delta",
type_="Sum",
is_monotonic=True,
)
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
response = make_query_request(
signoz,
token,
start_ms=int((now - timedelta(minutes=20)).timestamp() * 1000),
end_ms=int(now.timestamp() * 1000),
request_type=RequestType.TIME_SERIES,
queries=[
BuilderQuery(
name="A",
signal="metrics",
source="meter",
step_interval=600,
aggregations=[MetricAggregation(metric_name=metric_name, time_aggregation="sum", space_aggregation="sum")],
).to_dict()
],
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
assert "warning" not in response.json()["data"]