Compare commits

...

20 Commits

Author SHA1 Message Date
Piyush Singariya
ac80e44782 Merge branch 'main' into fix/resource-query 2026-05-14 17:00:31 +05:30
Piyush Singariya
c141ac92c3 Merge branch 'main' into fix/resource-query 2026-05-14 16:59:00 +05:30
Piyush Singariya
9e3851af71 fix: comment fixed 2026-05-14 16:58:27 +05:30
Piyush Singariya
eb02171a81 Merge branch 'main' into fix/resource-query 2026-05-14 16:48:32 +05:30
Piyush Singariya
b0eceff9c6 fix: comment remove 2026-05-14 16:47:03 +05:30
Piyush Singariya
d8b61addd2 Merge branch 'main' into fix/resource-query 2026-05-14 16:28:44 +05:30
Piyush Singariya
e2927f6deb chore: bring in new fixture for building raw query 2026-05-14 16:28:13 +05:30
Piyush Singariya
9a8a70a66f fix: fmt py 2026-05-14 16:23:36 +05:30
Piyush Singariya
426095b713 chore: compressing tests into max 5 2026-05-14 16:19:14 +05:30
Piyush Singariya
c3058205b4 fix: uvx checks 2026-05-14 15:50:11 +05:30
Piyush Singariya
69e5977ab9 chore: comment fix 2026-05-14 15:33:22 +05:30
Piyush Singariya
19d04d005e chore: fmt py 2026-05-14 15:31:38 +05:30
Piyush Singariya
cee826f703 Merge branch 'main' into fix/resource-query 2026-05-14 15:30:21 +05:30
Piyush Singariya
5b9f864f6e chore: run non body tests in json enabled 2026-05-14 15:29:59 +05:30
Piyush Singariya
d24f0c13cc fix: package tests 2026-05-14 13:44:25 +05:30
Piyush Singariya
f70333630a test: add unit test for resource tags in json enabled flagger 2026-05-14 13:36:24 +05:30
Piyush Singariya
078b4c93d7 revert: stmt builder test changes 2026-05-14 13:19:30 +05:30
Piyush Singariya
9145f33ae8 Merge branch 'main' into fix/resource-query 2026-05-14 12:49:43 +05:30
Piyush Singariya
7bb67ba2cb fix: update test suite 2026-05-14 12:47:37 +05:30
Piyush Singariya
02311ede99 fix: query fix in conditionFor 2026-05-14 11:31:35 +05:30
5 changed files with 414 additions and 32 deletions

View File

@@ -41,7 +41,7 @@ func (c *conditionBuilder) conditionFor(
// TODO(Piyush): Update this to support multiple JSON columns based on evolutions
for _, column := range columns {
// TODO(Tushar): thread orgID here to evaluate correctly
if column.Type.GetType() == schema.ColumnTypeEnumJSON && c.fl.BooleanOrEmpty(ctx, flagger.FeatureUseJSONBody, featuretypes.NewFlaggerEvaluationContext(valuer.UUID{})) && key.Name != messageSubField {
if column.Type.GetType() == schema.ColumnTypeEnumJSON && key.FieldContext == telemetrytypes.FieldContextBody && c.fl.BooleanOrEmpty(ctx, flagger.FeatureUseJSONBody, featuretypes.NewFlaggerEvaluationContext(valuer.UUID{})) && key.Name != messageSubField {
valueType, value := InferDataType(value, operator, key)
cond, err := NewJSONConditionBuilder(key, valueType).buildJSONCondition(operator, value, sb)
if err != nil {

View File

@@ -33,7 +33,7 @@ func (t TestExpected) GetQuery() string {
}
func TestJSONStmtBuilder_TimeSeries(t *testing.T) {
statementBuilder := buildJSONTestStatementBuilder(t, false)
statementBuilder, _ := buildJSONTestStatementBuilder(t, false)
cases := []struct {
name string
@@ -171,7 +171,7 @@ func TestStmtBuilderTimeSeriesBodyGroupByPromoted(t *testing.T) {
*/
func TestJSONStmtBuilder_PrimitivePaths(t *testing.T) {
statementBuilder := buildJSONTestStatementBuilder(t, false)
statementBuilder, _ := buildJSONTestStatementBuilder(t, false)
cases := []struct {
name string
filter string
@@ -494,7 +494,7 @@ func TestStatementBuilderListQueryBodyPromoted(t *testing.T) {
*/
func TestJSONStmtBuilder_ArrayPaths(t *testing.T) {
statementBuilder := buildJSONTestStatementBuilder(t, false)
statementBuilder, _ := buildJSONTestStatementBuilder(t, false)
cases := []struct {
name string
filter string
@@ -799,7 +799,7 @@ func TestJSONStmtBuilder_ArrayPaths(t *testing.T) {
}
func TestJSONStmtBuilder_IndexedPaths(t *testing.T) {
statementBuilder := buildJSONTestStatementBuilder(t, true)
statementBuilder, _ := buildJSONTestStatementBuilder(t, true)
cases := []struct {
name string
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]
@@ -918,7 +918,7 @@ func TestJSONStmtBuilder_IndexedPaths(t *testing.T) {
}
func TestJSONStmtBuilder_SelectField(t *testing.T) {
statementBuilder := buildJSONTestStatementBuilder(t, false)
statementBuilder, _ := buildJSONTestStatementBuilder(t, false)
cases := []struct {
name string
@@ -1006,7 +1006,7 @@ func TestJSONStmtBuilder_SelectField(t *testing.T) {
}
func TestJSONStmtBuilder_OrderBy(t *testing.T) {
statementBuilder := buildJSONTestStatementBuilder(t, false)
statementBuilder, _ := buildJSONTestStatementBuilder(t, false)
cases := []struct {
name string
@@ -1082,6 +1082,69 @@ func TestJSONStmtBuilder_OrderBy(t *testing.T) {
}
}
func TestResourceAggrAndGroupBy_WithJSONEnabled(t *testing.T) {
statementBuilder, metadataStore := buildJSONTestStatementBuilder(t, false)
releaseTime := time.Date(2024, 1, 15, 10, 0, 0, 0, time.UTC)
keysMap := buildCompleteFieldKeyMap(releaseTime)
for _, keys := range keysMap {
for _, key := range keys {
metadataStore.SetKey(key)
}
}
cases := []struct {
name string
requestType qbtypes.RequestType
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]
expected qbtypes.Statement
expectedErrContains string
}{
{
name: "resource_aggregation_and_group_by_with_json_enabled",
requestType: qbtypes.RequestTypeTimeSeries,
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Signal: telemetrytypes.SignalLogs,
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
GroupBy: []qbtypes.GroupByKey{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "region",
},
},
},
Filter: &qbtypes.Filter{
Expression: "user.name exists",
},
Aggregations: []qbtypes.LogAggregation{
{
Expression: "count_distinct(service.name)",
},
},
},
expected: qbtypes.Statement{
Query: "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf(resource.`region`::String IS NOT NULL, resource.`region`::String, NULL)) AS `region`, countDistinct(multiIf(resource.`service.name`::String IS NOT NULL, resource.`service.name`::String, NULL)) AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE ((dynamicElement(body_v2.`user.name`, 'String') IS NOT NULL) OR mapContains(attributes_string, 'user.name') = ?) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? GROUP BY ts, `region`",
Args: []any{true, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448)},
Warnings: []string{"Key `user.name` is ambiguous, found 2 different combinations of field context / data type: [name=user.name,context=body,datatype=string name=user.name,context=attribute,datatype=string]."},
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query, nil)
if c.expectedErrContains != "" {
require.Error(t, err)
require.Contains(t, err.Error(), c.expectedErrContains)
} else {
require.NoError(t, err)
require.Equal(t, c.expected.Query, q.Query)
require.Equal(t, c.expected.Args, q.Args)
require.Equal(t, c.expected.Warnings, q.Warnings)
}
})
}
}
func buildTestTelemetryMetadataStore(t *testing.T, addIndexes bool) *telemetrytypestest.MockMetadataStore {
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.SetStaticFields(IntrinsicFields)
@@ -1123,7 +1186,7 @@ func buildTestTelemetryMetadataStore(t *testing.T, addIndexes bool) *telemetryty
return mockMetadataStore
}
func buildJSONTestStatementBuilder(t *testing.T, addIndexes bool) *logQueryStatementBuilder {
func buildJSONTestStatementBuilder(t *testing.T, addIndexes bool) (*logQueryStatementBuilder, *telemetrytypestest.MockMetadataStore) {
t.Helper()
mockMetadataStore := buildTestTelemetryMetadataStore(t, addIndexes)
@@ -1144,5 +1207,5 @@ func buildJSONTestStatementBuilder(t *testing.T, addIndexes bool) *logQueryState
fl,
)
return statementBuilder
return statementBuilder, mockMetadataStore
}

View File

@@ -450,6 +450,35 @@ def build_scalar_query(
return {"type": "builder_query", "spec": spec}
def build_raw_query(
name: str,
signal: str,
*,
order: list[dict] | None = None,
limit: int | None = None,
filter_expression: str | None = None,
step_interval: int = DEFAULT_STEP_INTERVAL,
disabled: bool = False,
) -> dict:
spec: dict[str, Any] = {
"name": name,
"signal": signal,
"stepInterval": step_interval,
"disabled": disabled,
}
if order:
spec["order"] = order
if limit is not None:
spec["limit"] = limit
if filter_expression:
spec["filter"] = {"expression": filter_expression}
return {"type": "builder_query", "spec": spec}
def build_group_by_field(
name: str,
field_data_type: str = "string",

View File

@@ -11,6 +11,7 @@ from fixtures.logs import Logs
from fixtures.querier import (
build_logs_aggregation,
build_order_by,
build_raw_query,
build_scalar_query,
get_column_data_from_response,
get_rows,
@@ -27,28 +28,33 @@ def _run_query_case(signoz: types.SigNoz, token: str, now: datetime, case: dict[
start_ms = case.get("startMs", int((now - timedelta(seconds=10)).timestamp() * 1000))
end_ms = case.get("endMs", int(now.timestamp() * 1000))
aggregation = case.get("aggregation")
if aggregation and not isinstance(aggregation, list):
aggregations = [build_logs_aggregation(aggregation)]
elif aggregation:
aggregations = aggregation
if case["requestType"] == "raw":
query = build_raw_query(
name=case["name"],
signal="logs",
filter_expression=case.get("expression"),
order=case.get("order") or [build_order_by("timestamp", "desc")],
limit=case.get("limit", 100),
step_interval=case.get("stepInterval") or 60,
)
else:
aggregations = []
order = case.get("order")
if order is None and case["requestType"] == "raw":
order = [build_order_by("timestamp", "desc")]
query = build_scalar_query(
name=case["name"],
signal="logs",
aggregations=aggregations,
group_by=case.get("groupBy"),
order=order,
limit=case.get("limit", 100),
filter_expression=case.get("expression"),
step_interval=case.get("stepInterval") or 60,
)
aggregation = case.get("aggregation")
if aggregation and not isinstance(aggregation, list):
aggregations = [build_logs_aggregation(aggregation)]
elif aggregation:
aggregations = aggregation
else:
aggregations = []
query = build_scalar_query(
name=case["name"],
signal="logs",
aggregations=aggregations,
group_by=case.get("groupBy"),
order=case.get("order"),
limit=case.get("limit", 100),
filter_expression=case.get("expression"),
step_interval=case.get("stepInterval") or 60,
)
response = make_query_request(
signoz=signoz,
@@ -636,10 +642,9 @@ def test_select_order_by(
end_ms = int(now.timestamp() * 1000)
def _run(case: dict[str, Any]) -> None:
query = build_scalar_query(
query = build_raw_query(
name=case["name"],
signal="logs",
aggregations=[build_logs_aggregation("count()")],
order=case["order"],
limit=100,
step_interval=60,

View File

@@ -0,0 +1,285 @@
import json
from collections.abc import Callable
from datetime import UTC, datetime, timedelta
from typing import Any
import requests
from fixtures import types
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
from fixtures.logs import Logs
from fixtures.querier import (
build_group_by_field,
build_logs_aggregation,
build_order_by,
build_raw_query,
build_scalar_query,
get_rows,
get_scalar_table_data,
make_query_request,
)
def _raw(
signoz: types.SigNoz,
token: str,
start_ms: int,
end_ms: int,
name: str,
*,
expression: str | None = None,
order: list[dict] | None = None,
limit: int = 100,
) -> requests.Response:
q = build_raw_query(
name=name,
signal="logs",
filter_expression=expression,
order=order or [build_order_by("timestamp", "desc")],
limit=limit,
step_interval=60,
)
r = make_query_request(signoz, token, start_ms, end_ms, queries=[q], request_type="raw")
assert r.status_code == 200, f"HTTP {r.status_code} for '{name}': {r.text}"
return r
def _scalar(
signoz: types.SigNoz,
token: str,
start_ms: int,
end_ms: int,
name: str,
aggregation: str,
*,
expression: str | None = None,
group_by: list[dict] | None = None,
) -> requests.Response:
q = build_scalar_query(
name=name,
signal="logs",
aggregations=[build_logs_aggregation(aggregation)],
filter_expression=expression,
group_by=group_by,
step_interval=60,
)
r = make_query_request(signoz, token, start_ms, end_ms, queries=[q], request_type="scalar")
assert r.status_code == 200, f"HTTP {r.status_code} for '{name}': {r.text}"
return r
def _body_users(response: requests.Response) -> set[str | None]:
return {json.loads(row["data"]["body"]).get("user") for row in get_rows(response)}
def _body_scores(response: requests.Response) -> list[int | None]:
return [json.loads(row["data"]["body"]).get("score") for row in get_rows(response)]
def _services(response: requests.Response) -> list[str]:
return [row["data"]["resources_string"].get("service.name", "") for row in get_rows(response)]
def _counts(response: requests.Response) -> dict[str, Any]:
return {str(row[0]): row[-1] for row in get_scalar_table_data(response.json()) if row}
def _run_case(
signoz: types.SigNoz,
token: str,
start_ms: int,
end_ms: int,
case: dict[str, Any],
) -> None:
if case["requestType"] == "raw":
response = _raw(signoz, token, start_ms, end_ms, case["name"], expression=case.get("expression"), order=case.get("order"))
else:
response = _scalar(signoz, token, start_ms, end_ms, case["name"], case["aggregation"], expression=case.get("expression"), group_by=case.get("groupBy"))
assert case["validate"](response), f"Validation failed for '{case['name']}': {response.json()}"
# ============================================================================
# Filter · GroupBy · Aggregation — non-body fields across all three contexts
#
# Five cases, one dataset. Each case crosses a different combination of
# resource attr / log attr / top-level field in WHERE, GROUP BY, and agg:
#
# case 1 filter resource + log attr + top-level in WHERE (raw)
# case 2 group by resource × top-level multi-key (scalar)
# case 3 aggregation count_distinct(log attr) grouped by top-level (scalar)
# case 4 agg+filter count by resource, body-field WHERE guard (scalar)
# case 5 agg+filter count_distinct(resource) by log attr, top-level filter (scalar)
#
# Data landscape (5 logs):
# log1 — auth-svc, GET, INFO, score=80, user=alice
# log2 — auth-svc, POST, ERROR, score=90, user=bob
# log3 — auth-svc, GET, INFO, score=60, user=carol
# log4 — api-gw, GET, WARN, score=70, user=diana
# log5 — worker, DELETE, ERROR, score=100, user=eve
# ============================================================================
def test_non_body_filter_groupby_aggregation(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[list[Logs]], None],
export_json_types: Callable[[list[Logs]], None],
) -> None:
now = datetime.now(tz=UTC)
start_ms = int((now - timedelta(seconds=10)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
log_data = [
("auth-svc", "GET", "INFO", {"score": 80, "user": "alice"}),
("auth-svc", "POST", "ERROR", {"score": 90, "user": "bob"}),
("auth-svc", "GET", "INFO", {"score": 60, "user": "carol"}),
("api-gw", "GET", "WARN", {"score": 70, "user": "diana"}),
("worker", "DELETE", "ERROR", {"score": 100, "user": "eve"}),
]
logs_list = [
Logs(
timestamp=now - timedelta(seconds=len(log_data) - i),
resources={"service.name": svc},
attributes={"http.method": method},
body_v2=json.dumps(body),
body_promoted="",
severity_text=sev,
)
for i, (svc, method, sev, body) in enumerate(log_data)
]
export_json_types(logs_list)
insert_logs(logs_list)
token = get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)
cases = [
# 1. Filter — resource + log attr + top-level in WHERE (all three non-body contexts at once)
{
"name": "filter.cross_context",
"requestType": "raw",
"expression": 'service.name = "auth-svc" AND http.method = "GET" AND severity_text = "INFO"',
"validate": lambda r: len(get_rows(r)) == 2 and _body_users(r) == {"alice", "carol"},
},
# 2. GroupBy — resource × top-level multi-key, no filter
# Proves both contexts resolve correctly as simultaneous GROUP BY keys.
{
"name": "groupby.resource_x_toplevel",
"requestType": "scalar",
"expression": None,
"groupBy": [build_group_by_field("service.name"), {"name": "severity_text"}],
"aggregation": "count()",
# auth-svc+INFO=2, auth-svc+ERROR=1, api-gw+WARN=1, worker+ERROR=1
"validate": lambda r: (p := {(str(row[0]), str(row[1])): row[-1] for row in get_scalar_table_data(r.json()) if len(row) >= 3}) and p.get(("auth-svc", "INFO")) == 2 and p.get(("auth-svc", "ERROR")) == 1 and p.get(("api-gw", "WARN")) == 1 and p.get(("worker", "ERROR")) == 1,
},
# 3. Aggregation — count_distinct(log attr) grouped by top-level
# ERROR logs use {POST, DELETE} → 2 distinct methods; INFO/WARN use only GET → 1.
{
"name": "agg.count_distinct_attr_by_toplevel",
"requestType": "scalar",
"expression": None,
"groupBy": [{"name": "severity_text"}],
"aggregation": "count_distinct(http.method)",
"validate": lambda r: (rows := _counts(r)) and int(rows["INFO"]) == 1 and int(rows["ERROR"]) == 2 and int(rows["WARN"]) == 1,
},
# 4. Aggregation + body filter — count by resource WHERE body score >= 80
# Body field gates the logs; non-body field drives the GROUP BY.
{
"name": "agg.count_by_resource_body_filter",
"requestType": "scalar",
"expression": "score >= 80",
"groupBy": [build_group_by_field("service.name")],
"aggregation": "count()",
# score>=80: alice(80), bob(90), eve(100) → auth-svc: 2, worker: 1; api-gw excluded
"validate": lambda r: (rows := _counts(r)) and int(rows["auth-svc"]) == 2 and int(rows["worker"]) == 1 and "api-gw" not in rows,
},
# 5. Aggregation + top-level filter — count_distinct(resource) grouped by log attr
# Aggregates a resource attr, groups by a log attr, filtered by a top-level field.
{
"name": "agg.count_distinct_resource_by_attr_toplevel_filter",
"requestType": "scalar",
"expression": "severity_text IN ['INFO', 'WARN']",
"groupBy": [{"name": "http.method"}],
"aggregation": "count_distinct(service.name)",
# INFO/WARN logs: GET(auth-svc×2, api-gw) → 2 distinct svcs; POST/DELETE excluded
"validate": lambda r: (rows := _counts(r)) and int(rows["GET"]) == 2 and "POST" not in rows and "DELETE" not in rows,
},
]
for case in cases:
case.setdefault("groupBy", None)
_run_case(signoz, token, start_ms, end_ms, case)
# ============================================================================
# OrderBy — non-body fields as primary sort keys
#
# Four cases cover every non-body context as the primary ORDER BY key:
# orderby.service_asc resource attr (service.name ASC)
# orderby.timestamp_desc top-level (timestamp DESC)
# orderby.severity_asc top-level (severity_text ASC)
# orderby.multi_method_then_score log attr primary, body path secondary
#
# Data landscape:
# log1 — svc-a, GET, INFO, score=80, ts=now-4s
# log2 — svc-a, POST, INFO, score=90, ts=now-3s
# log3 — svc-b, GET, WARN, score=60, ts=now-2s
# log4 — svc-b, DELETE, WARN, score=70, ts=now-1s
# ============================================================================
def test_non_body_orderby(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[list[Logs]], None],
export_json_types: Callable[[list[Logs]], None],
) -> None:
now = datetime.now(tz=UTC)
start_ms = int((now - timedelta(seconds=10)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
logs_list = [
Logs(timestamp=now - timedelta(seconds=4), resources={"service.name": "svc-a"}, attributes={"http.method": "GET"}, body_v2=json.dumps({"score": 80}), body_promoted="", severity_text="INFO"),
Logs(timestamp=now - timedelta(seconds=3), resources={"service.name": "svc-a"}, attributes={"http.method": "POST"}, body_v2=json.dumps({"score": 90}), body_promoted="", severity_text="INFO"),
Logs(timestamp=now - timedelta(seconds=2), resources={"service.name": "svc-b"}, attributes={"http.method": "GET"}, body_v2=json.dumps({"score": 60}), body_promoted="", severity_text="WARN"),
Logs(timestamp=now - timedelta(seconds=1), resources={"service.name": "svc-b"}, attributes={"http.method": "DELETE"}, body_v2=json.dumps({"score": 70}), body_promoted="", severity_text="WARN"),
]
export_json_types(logs_list)
insert_logs(logs_list)
token = get_token(email=USER_ADMIN_EMAIL, password=USER_ADMIN_PASSWORD)
cases = [
# resource attr ASC: svc-a×2 before svc-b×2
{
"name": "orderby.service_asc",
"requestType": "raw",
"order": [build_order_by("service.name", "asc")],
"validate": lambda r: len(get_rows(r)) == 4 and _services(r)[:2] == ["svc-a", "svc-a"] and _services(r)[2:] == ["svc-b", "svc-b"],
},
# top-level timestamp DESC: ts-1s(svc-b/70), ts-2s(svc-b/60), ts-3s(svc-a/90), ts-4s(svc-a/80)
{
"name": "orderby.timestamp_desc",
"requestType": "raw",
"order": [build_order_by("timestamp", "desc")],
"validate": lambda r: len(get_rows(r)) == 4 and _body_scores(r) == [70, 60, 90, 80] and _services(r) == ["svc-b", "svc-b", "svc-a", "svc-a"],
},
# top-level severity_text ASC: INFO(svc-a×2) before WARN(svc-b×2)
{
"name": "orderby.severity_asc",
"requestType": "raw",
"order": [build_order_by("severity_text", "asc")],
"validate": lambda r: len(get_rows(r)) == 4 and _services(r)[:2] == ["svc-a", "svc-a"] and _services(r)[2:] == ["svc-b", "svc-b"],
},
# multi-key: http.method ASC then score ASC — DELETE(70), GET(60,80), POST(90)
{
"name": "orderby.multi_method_then_score",
"requestType": "raw",
"order": [build_order_by("http.method", "asc"), build_order_by("score", "asc")],
# DELETE < GET < POST alphabetically; within GET scores go 60→80
"validate": lambda r: len(get_rows(r)) == 4 and _body_scores(r) == [70, 60, 80, 90],
},
]
for case in cases:
case.setdefault("groupBy", None)
_run_case(signoz, token, start_ms, end_ms, case)