fix: dont filter out static fields in metadata store (#10135)
Some checks failed
build-staging / prepare (push) Has been cancelled
build-staging / js-build (push) Has been cancelled
build-staging / go-build (push) Has been cancelled
build-staging / staging (push) Has been cancelled
Release Drafter / update_release_draft (push) Has been cancelled

This pull request introduces several improvements and bug fixes in the telemetry logs and traces statement builders, metadata key filtering, and integration tests. The most significant changes ensure that required fields are always present in queries, deprecated fields are handled correctly, and test coverage is expanded to handle edge cases such as corrupt data. Additionally, some code cleanup and formatting improvements are made.
This commit is contained in:
Tushar Vats
2026-01-28 19:42:04 +05:30
committed by GitHub
parent e7c812e07f
commit 189781748a
7 changed files with 407 additions and 79 deletions

View File

@@ -140,6 +140,22 @@ func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) []
} }
func (b *logQueryStatementBuilder) adjustKeys(ctx context.Context, keys map[string][]*telemetrytypes.TelemetryFieldKey, query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation], requestType qbtypes.RequestType) qbtypes.QueryBuilderQuery[qbtypes.LogAggregation] { func (b *logQueryStatementBuilder) adjustKeys(ctx context.Context, keys map[string][]*telemetrytypes.TelemetryFieldKey, query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation], requestType qbtypes.RequestType) qbtypes.QueryBuilderQuery[qbtypes.LogAggregation] {
// Always ensure timestamp and id are present in keys map
keys["id"] = append([]*telemetrytypes.TelemetryFieldKey{{
Name: "id",
Signal: telemetrytypes.SignalLogs,
FieldContext: telemetrytypes.FieldContextLog,
FieldDataType: telemetrytypes.FieldDataTypeString,
}}, keys["id"]...)
keys["timestamp"] = append([]*telemetrytypes.TelemetryFieldKey{{
Name: "timestamp",
Signal: telemetrytypes.SignalLogs,
FieldContext: telemetrytypes.FieldContextLog,
FieldDataType: telemetrytypes.FieldDataTypeNumber,
}}, keys["timestamp"]...)
/* /*
Adjust keys for alias expressions in aggregations Adjust keys for alias expressions in aggregations
*/ */
@@ -183,15 +199,6 @@ func (b *logQueryStatementBuilder) adjustKeys(ctx context.Context, keys map[stri
b.logger.InfoContext(ctx, "key adjustment action", "action", action) b.logger.InfoContext(ctx, "key adjustment action", "action", action)
} }
keys["id"] = []*telemetrytypes.TelemetryFieldKey{
{
Name: "id",
Signal: telemetrytypes.SignalLogs,
FieldContext: telemetrytypes.FieldContextLog,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
}
return query return query
} }

View File

@@ -4,7 +4,6 @@ import (
"context" "context"
"fmt" "fmt"
"log/slog" "log/slog"
"slices"
"strings" "strings"
"github.com/SigNoz/signoz/pkg/errors" "github.com/SigNoz/signoz/pkg/errors"
@@ -279,23 +278,6 @@ func (t *telemetryMetaStore) getTracesKeys(ctx context.Context, fieldKeySelector
} }
} }
// skip the keys that don't match data type
if field, exists := telemetrytraces.IntrinsicFields[key]; exists {
if len(dataTypes) > 0 &&
slices.Index(dataTypes, field.FieldDataType) == -1 &&
field.FieldDataType != telemetrytypes.FieldDataTypeUnspecified {
continue
}
}
if field, exists := telemetrytraces.CalculatedFields[key]; exists {
if len(dataTypes) > 0 &&
slices.Index(dataTypes, field.FieldDataType) == -1 &&
field.FieldDataType != telemetrytypes.FieldDataTypeUnspecified {
continue
}
}
if found { if found {
if field, exists := telemetrytraces.IntrinsicFields[key]; exists { if field, exists := telemetrytraces.IntrinsicFields[key]; exists {
if _, added := mapOfKeys[field.Name+";"+field.FieldContext.StringValue()+";"+field.FieldDataType.StringValue()]; !added { if _, added := mapOfKeys[field.Name+";"+field.FieldContext.StringValue()+";"+field.FieldDataType.StringValue()]; !added {
@@ -548,15 +530,6 @@ func (t *telemetryMetaStore) getLogsKeys(ctx context.Context, fieldKeySelectors
} }
} }
// skip the keys that don't match data type
if field, exists := telemetrylogs.IntrinsicFields[key]; exists {
if len(dataTypes) > 0 &&
slices.Index(dataTypes, field.FieldDataType) == -1 &&
field.FieldDataType != telemetrytypes.FieldDataTypeUnspecified {
continue
}
}
if found { if found {
if field, exists := telemetrylogs.IntrinsicFields[key]; exists { if field, exists := telemetrylogs.IntrinsicFields[key]; exists {
if _, added := mapOfKeys[field.Name+";"+field.FieldContext.StringValue()+";"+field.FieldDataType.StringValue()]; !added { if _, added := mapOfKeys[field.Name+";"+field.FieldContext.StringValue()+";"+field.FieldDataType.StringValue()]; !added {

View File

@@ -162,6 +162,26 @@ func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation])
func (b *traceQueryStatementBuilder) adjustKeys(ctx context.Context, keys map[string][]*telemetrytypes.TelemetryFieldKey, query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation], requestType qbtypes.RequestType) qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation] { func (b *traceQueryStatementBuilder) adjustKeys(ctx context.Context, keys map[string][]*telemetrytypes.TelemetryFieldKey, query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation], requestType qbtypes.RequestType) qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation] {
// add deprecated fields only during statement building
// why?
// 1. to not fail filter expression that use deprecated cols
// 2. this could have been moved to metadata fetching itself, however, that
// would mean, they also show up in suggestions we we don't want to do
for fieldKeyName, fieldKey := range IntrinsicFieldsDeprecated {
if _, ok := keys[fieldKeyName]; !ok {
keys[fieldKeyName] = []*telemetrytypes.TelemetryFieldKey{&fieldKey}
} else {
keys[fieldKeyName] = append(keys[fieldKeyName], &fieldKey)
}
}
for fieldKeyName, fieldKey := range CalculatedFieldsDeprecated {
if _, ok := keys[fieldKeyName]; !ok {
keys[fieldKeyName] = []*telemetrytypes.TelemetryFieldKey{&fieldKey}
} else {
keys[fieldKeyName] = append(keys[fieldKeyName], &fieldKey)
}
}
// Adjust keys for alias expressions in aggregations // Adjust keys for alias expressions in aggregations
actions := querybuilder.AdjustKeysForAliasExpressions(&query, requestType) actions := querybuilder.AdjustKeysForAliasExpressions(&query, requestType)
@@ -203,26 +223,6 @@ func (b *traceQueryStatementBuilder) adjustKeys(ctx context.Context, keys map[st
b.logger.InfoContext(ctx, "key adjustment action", "action", action) b.logger.InfoContext(ctx, "key adjustment action", "action", action)
} }
// add deprecated fields only during statement building
// why?
// 1. to not fail filter expression that use deprecated cols
// 2. this could have been moved to metadata fetching itself, however, that
// would mean, they also show up in suggestions we we don't want to do
for fieldKeyName, fieldKey := range IntrinsicFieldsDeprecated {
if _, ok := keys[fieldKeyName]; !ok {
keys[fieldKeyName] = []*telemetrytypes.TelemetryFieldKey{&fieldKey}
} else {
keys[fieldKeyName] = append(keys[fieldKeyName], &fieldKey)
}
}
for fieldKeyName, fieldKey := range CalculatedFieldsDeprecated {
if _, ok := keys[fieldKeyName]; !ok {
keys[fieldKeyName] = []*telemetrytypes.TelemetryFieldKey{&fieldKey}
} else {
keys[fieldKeyName] = append(keys[fieldKeyName], &fieldKey)
}
}
return query return query
} }

View File

@@ -20,9 +20,9 @@ USER_ADMIN_NAME = "admin"
USER_ADMIN_EMAIL = "admin@integration.test" USER_ADMIN_EMAIL = "admin@integration.test"
USER_ADMIN_PASSWORD = "password123Z$" USER_ADMIN_PASSWORD = "password123Z$"
USER_EDITOR_NAME = 'editor' USER_EDITOR_NAME = "editor"
USER_EDITOR_EMAIL = 'editor@integration.test' USER_EDITOR_EMAIL = "editor@integration.test"
USER_EDITOR_PASSWORD = 'password123Z$' USER_EDITOR_PASSWORD = "password123Z$"
@pytest.fixture(name="create_user_admin", scope="package") @pytest.fixture(name="create_user_admin", scope="package")

View File

@@ -7,8 +7,6 @@ from sqlalchemy import sql
from fixtures import types from fixtures import types
from fixtures.logger import setup_logger from fixtures.logger import setup_logger
from datetime import datetime, timedelta, timezone
logger = setup_logger(__name__) logger = setup_logger(__name__)
@@ -243,6 +241,7 @@ def test_reset_password_with_no_password(
token = get_token("admin+password@integration.test", "FINALPASSword123!#[") token = get_token("admin+password@integration.test", "FINALPASSword123!#[")
assert token is not None assert token is not None
def test_forgot_password_returns_204_for_nonexistent_email( def test_forgot_password_returns_204_for_nonexistent_email(
signoz: types.SigNoz, signoz: types.SigNoz,
) -> None: ) -> None:
@@ -292,7 +291,11 @@ def test_forgot_password_creates_reset_token(
# Create a user specifically for testing forgot password # Create a user specifically for testing forgot password
response = requests.post( response = requests.post(
signoz.self.host_configs["8080"].get("/api/v1/invite"), signoz.self.host_configs["8080"].get("/api/v1/invite"),
json={"email": "forgot@integration.test", "role": "EDITOR", "name": "forgotpassword user"}, json={
"email": "forgot@integration.test",
"role": "EDITOR",
"name": "forgotpassword user",
},
timeout=2, timeout=2,
headers={"Authorization": f"Bearer {admin_token}"}, headers={"Authorization": f"Bearer {admin_token}"},
) )
@@ -360,11 +363,7 @@ def test_forgot_password_creates_reset_token(
assert response.status_code == HTTPStatus.OK assert response.status_code == HTTPStatus.OK
user_response = response.json()["data"] user_response = response.json()["data"]
found_user = next( found_user = next(
( (user for user in user_response if user["email"] == "forgot@integration.test"),
user
for user in user_response
if user["email"] == "forgot@integration.test"
),
None, None,
) )
assert found_user is not None assert found_user is not None
@@ -374,16 +373,20 @@ def test_forgot_password_creates_reset_token(
# First get the password_id from factor_password, then get the token # First get the password_id from factor_password, then get the token
with signoz.sqlstore.conn.connect() as conn: with signoz.sqlstore.conn.connect() as conn:
result = conn.execute( result = conn.execute(
sql.text(""" sql.text(
"""
SELECT rpt.token SELECT rpt.token
FROM reset_password_token rpt FROM reset_password_token rpt
JOIN factor_password fp ON rpt.password_id = fp.id JOIN factor_password fp ON rpt.password_id = fp.id
WHERE fp.user_id = :user_id WHERE fp.user_id = :user_id
"""), """
),
{"user_id": found_user["id"]}, {"user_id": found_user["id"]},
) )
row = result.fetchone() row = result.fetchone()
assert row is not None, "Reset password token should exist after calling forgotPassword" assert (
row is not None
), "Reset password token should exist after calling forgotPassword"
reset_token = row[0] reset_token = row[0]
assert reset_token is not None assert reset_token is not None
@@ -426,11 +429,7 @@ def test_reset_password_with_expired_token(
assert response.status_code == HTTPStatus.OK assert response.status_code == HTTPStatus.OK
user_response = response.json()["data"] user_response = response.json()["data"]
found_user = next( found_user = next(
( (user for user in user_response if user["email"] == "forgot@integration.test"),
user
for user in user_response
if user["email"] == "forgot@integration.test"
),
None, None,
) )
assert found_user is not None assert found_user is not None
@@ -464,12 +463,14 @@ def test_reset_password_with_expired_token(
with signoz.sqlstore.conn.connect() as conn: with signoz.sqlstore.conn.connect() as conn:
# First get the token # First get the token
result = conn.execute( result = conn.execute(
sql.text(""" sql.text(
"""
SELECT rpt.token, rpt.id SELECT rpt.token, rpt.id
FROM reset_password_token rpt FROM reset_password_token rpt
JOIN factor_password fp ON rpt.password_id = fp.id JOIN factor_password fp ON rpt.password_id = fp.id
WHERE fp.user_id = :user_id WHERE fp.user_id = :user_id
"""), """
),
{"user_id": found_user["id"]}, {"user_id": found_user["id"]},
) )
row = result.fetchone() row = result.fetchone()
@@ -479,11 +480,13 @@ def test_reset_password_with_expired_token(
# Now expire the token by setting expires_at to a past time # Now expire the token by setting expires_at to a past time
conn.execute( conn.execute(
sql.text(""" sql.text(
"""
UPDATE reset_password_token UPDATE reset_password_token
SET expires_at = :expired_time SET expires_at = :expired_time
WHERE id = :token_id WHERE id = :token_id
"""), """
),
{ {
"expired_time": "2020-01-01 00:00:00", "expired_time": "2020-01-01 00:00:00",
"token_id": token_id, "token_id": token_id,

View File

@@ -399,6 +399,174 @@ def test_logs_list(
assert "d-001" in values assert "d-001" in values
def test_logs_list_with_corrupt_data(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_logs: Callable[[List[Logs]], None],
) -> None:
"""
Setup:
Insert 2 logs with different attributes
Tests:
1. Query logs for the last 10 seconds and check if the logs are returned in the correct order
2. Query values of severity_text attribute from the autocomplete API
3. Query values of severity_text attribute from the fields API
4. Query values of code.file attribute from the autocomplete API
5. Query values of code.file attribute from the fields API
6. Query values of code.line attribute from the autocomplete API
7. Query values of code.line attribute from the fields API
"""
insert_logs(
[
Logs(
timestamp=datetime.now(tz=timezone.utc) - timedelta(seconds=1),
resources={
"deployment.environment": "production",
"service.name": "java",
"os.type": "linux",
"host.name": "linux-001",
"cloud.provider": "integration",
"cloud.account.id": "001",
"timestamp": "2024-01-01T00:00:00Z",
},
attributes={
"log.iostream": "stdout",
"logtag": "F",
"code.file": "/opt/Integration.java",
"code.function": "com.example.Integration.process",
"code.line": 120,
"telemetry.sdk.language": "java",
"id": "1",
},
body="This is a log message, coming from a java application",
severity_text="DEBUG",
),
Logs(
timestamp=datetime.now(tz=timezone.utc),
resources={
"deployment.environment": "production",
"service.name": "go",
"os.type": "linux",
"host.name": "linux-001",
"cloud.provider": "integration",
"cloud.account.id": "001",
"id": 2,
},
attributes={
"log.iostream": "stdout",
"logtag": "F",
"code.file": "/opt/integration.go",
"code.function": "com.example.Integration.process",
"code.line": 120,
"metric.domain_id": "d-001",
"telemetry.sdk.language": "go",
"timestamp": "invalid-timestamp",
},
body="This is a log message, coming from a go application",
severity_text="INFO",
),
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# Query Logs for the last 10 seconds and check if the logs are returned in the correct order
response = requests.post(
signoz.self.host_configs["8080"].get("/api/v5/query_range"),
timeout=2,
headers={
"authorization": f"Bearer {token}",
},
json={
"schemaVersion": "v1",
"start": int(
(datetime.now(tz=timezone.utc) - timedelta(seconds=10)).timestamp()
* 1000
),
"end": int(datetime.now(tz=timezone.utc).timestamp() * 1000),
"requestType": "raw",
"compositeQuery": {
"queries": [
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "logs",
"disabled": False,
"limit": 100,
"offset": 0,
"order": [
{"key": {"name": "timestamp"}, "direction": "desc"},
{"key": {"name": "id"}, "direction": "desc"},
],
"having": {"expression": ""},
"aggregations": [{"expression": "count()"}],
},
}
]
},
"formatOptions": {"formatTableResultForUI": False, "fillGaps": False},
},
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
results = response.json()["data"]["data"]["results"]
assert len(results) == 1
rows = results[0]["rows"]
assert len(rows) == 2
assert (
rows[0]["data"]["body"] == "This is a log message, coming from a go application"
)
assert rows[0]["data"]["resources_string"] == {
"cloud.account.id": "001",
"cloud.provider": "integration",
"deployment.environment": "production",
"host.name": "linux-001",
"os.type": "linux",
"service.name": "go",
"id": "2",
}
assert rows[0]["data"]["attributes_string"] == {
"code.file": "/opt/integration.go",
"code.function": "com.example.Integration.process",
"log.iostream": "stdout",
"logtag": "F",
"metric.domain_id": "d-001",
"telemetry.sdk.language": "go",
"timestamp": "invalid-timestamp",
}
assert rows[0]["data"]["attributes_number"] == {"code.line": 120}
assert (
rows[1]["data"]["body"]
== "This is a log message, coming from a java application"
)
assert rows[1]["data"]["resources_string"] == {
"cloud.account.id": "001",
"cloud.provider": "integration",
"deployment.environment": "production",
"host.name": "linux-001",
"os.type": "linux",
"service.name": "java",
"timestamp": "2024-01-01T00:00:00Z",
}
assert rows[1]["data"]["attributes_string"] == {
"code.file": "/opt/Integration.java",
"code.function": "com.example.Integration.process",
"id": "1",
"log.iostream": "stdout",
"logtag": "F",
"telemetry.sdk.language": "java",
}
assert rows[1]["data"]["attributes_number"] == {"code.line": 120}
@pytest.mark.parametrize( @pytest.mark.parametrize(
"order_by_context,expected_order", "order_by_context,expected_order",
#### ####

View File

@@ -703,6 +703,183 @@ def test_traces_aggergate_order_by_count(
assert series[0]["values"][0]["value"] == 4 assert series[0]["values"][0]["value"] == 4
def test_traces_aggregate_with_mixed_field_selectors(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
) -> None:
"""
Setup:
Insert 4 traces with different attributes.
http-service: POST /integration -> SELECT, HTTP PATCH
topic-service: topic publish
Tests:
1. Query traces count for spans grouped by service.name
"""
http_service_trace_id = TraceIdGenerator.trace_id()
http_service_span_id = TraceIdGenerator.span_id()
http_service_db_span_id = TraceIdGenerator.span_id()
http_service_patch_span_id = TraceIdGenerator.span_id()
topic_service_trace_id = TraceIdGenerator.trace_id()
topic_service_span_id = TraceIdGenerator.span_id()
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
insert_traces(
[
Traces(
timestamp=now - timedelta(seconds=4),
duration=timedelta(seconds=3),
trace_id=http_service_trace_id,
span_id=http_service_span_id,
parent_span_id="",
name="POST /integration",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={
"deployment.environment": "production",
"service.name": "http-service",
"os.type": "linux",
"host.name": "linux-000",
"cloud.provider": "integration",
"cloud.account.id": "000",
},
attributes={
"net.transport": "IP.TCP",
"http.scheme": "http",
"http.user_agent": "Integration Test",
"http.request.method": "POST",
"http.response.status_code": "200",
},
),
Traces(
timestamp=now - timedelta(seconds=3.5),
duration=timedelta(seconds=0.5),
trace_id=http_service_trace_id,
span_id=http_service_db_span_id,
parent_span_id=http_service_span_id,
name="SELECT",
kind=TracesKind.SPAN_KIND_CLIENT,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={
"deployment.environment": "production",
"service.name": "http-service",
"os.type": "linux",
"host.name": "linux-000",
"cloud.provider": "integration",
"cloud.account.id": "000",
},
attributes={
"db.name": "integration",
"db.operation": "SELECT",
"db.statement": "SELECT * FROM integration",
},
),
Traces(
timestamp=now - timedelta(seconds=3),
duration=timedelta(seconds=1),
trace_id=http_service_trace_id,
span_id=http_service_patch_span_id,
parent_span_id=http_service_span_id,
name="HTTP PATCH",
kind=TracesKind.SPAN_KIND_CLIENT,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={
"deployment.environment": "production",
"service.name": "http-service",
"os.type": "linux",
"host.name": "linux-000",
"cloud.provider": "integration",
"cloud.account.id": "000",
},
attributes={
"http.request.method": "PATCH",
"http.status_code": "404",
},
),
Traces(
timestamp=now - timedelta(seconds=1),
duration=timedelta(seconds=4),
trace_id=topic_service_trace_id,
span_id=topic_service_span_id,
parent_span_id="",
name="topic publish",
kind=TracesKind.SPAN_KIND_PRODUCER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={
"deployment.environment": "production",
"service.name": "topic-service",
"os.type": "linux",
"host.name": "linux-001",
"cloud.provider": "integration",
"cloud.account.id": "001",
},
attributes={
"message.type": "SENT",
"messaging.operation": "publish",
"messaging.message.id": "001",
},
),
]
)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
query = {
"type": "builder_query",
"spec": {
"name": "A",
"signal": "traces",
"groupBy": [
{
"name": "service.name",
"fieldContext": "resource",
"fieldDataType": "string",
}
],
"aggregations": [
{"expression": "p99(duration_nano)", "alias": "p99"},
{"expression": "avg(duration_nano)", "alias": "avgDuration"},
{"expression": "count()", "alias": "numCalls"},
{"expression": "countIf(status_code = 2)", "alias": "numErrors"},
{
"expression": "countIf(response_status_code >= 400 AND response_status_code < 500)",
"alias": "num4XX",
},
],
"order": [{"key": {"name": "count()"}, "direction": "desc"}],
},
}
# Query traces count for spans
response = make_query_request(
signoz,
token,
start_ms=int(
(datetime.now(tz=timezone.utc) - timedelta(minutes=5)).timestamp() * 1000
),
end_ms=int(datetime.now(tz=timezone.utc).timestamp() * 1000),
request_type="time_series",
queries=[query],
)
assert response.status_code == HTTPStatus.OK
assert response.json()["status"] == "success"
results = response.json()["data"]["data"]["results"]
assert len(results) == 1
aggregations = results[0]["aggregations"]
assert (
aggregations[0]["series"][0]["values"][0]["value"] >= 2.5 * 1e9
) # p99 for http-service
def test_traces_fill_gaps( def test_traces_fill_gaps(
signoz: types.SigNoz, signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument create_user_admin: None, # pylint: disable=unused-argument