diff --git a/pkg/telemetrylogs/statement_builder.go b/pkg/telemetrylogs/statement_builder.go index 33d50c8c9f..4075191472 100644 --- a/pkg/telemetrylogs/statement_builder.go +++ b/pkg/telemetrylogs/statement_builder.go @@ -140,6 +140,22 @@ func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) [] } func (b *logQueryStatementBuilder) adjustKeys(ctx context.Context, keys map[string][]*telemetrytypes.TelemetryFieldKey, query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation], requestType qbtypes.RequestType) qbtypes.QueryBuilderQuery[qbtypes.LogAggregation] { + + // Always ensure timestamp and id are present in keys map + keys["id"] = append([]*telemetrytypes.TelemetryFieldKey{{ + Name: "id", + Signal: telemetrytypes.SignalLogs, + FieldContext: telemetrytypes.FieldContextLog, + FieldDataType: telemetrytypes.FieldDataTypeString, + }}, keys["id"]...) + + keys["timestamp"] = append([]*telemetrytypes.TelemetryFieldKey{{ + Name: "timestamp", + Signal: telemetrytypes.SignalLogs, + FieldContext: telemetrytypes.FieldContextLog, + FieldDataType: telemetrytypes.FieldDataTypeNumber, + }}, keys["timestamp"]...) + /* Adjust keys for alias expressions in aggregations */ @@ -183,15 +199,6 @@ func (b *logQueryStatementBuilder) adjustKeys(ctx context.Context, keys map[stri b.logger.InfoContext(ctx, "key adjustment action", "action", action) } - keys["id"] = []*telemetrytypes.TelemetryFieldKey{ - { - Name: "id", - Signal: telemetrytypes.SignalLogs, - FieldContext: telemetrytypes.FieldContextLog, - FieldDataType: telemetrytypes.FieldDataTypeString, - }, - } - return query } diff --git a/pkg/telemetrymetadata/metadata.go b/pkg/telemetrymetadata/metadata.go index b568f82352..e4c627247c 100644 --- a/pkg/telemetrymetadata/metadata.go +++ b/pkg/telemetrymetadata/metadata.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "log/slog" - "slices" "strings" "github.com/SigNoz/signoz/pkg/errors" @@ -279,23 +278,6 @@ func (t *telemetryMetaStore) getTracesKeys(ctx context.Context, fieldKeySelector } } - // skip the keys that don't match data type - if field, exists := telemetrytraces.IntrinsicFields[key]; exists { - if len(dataTypes) > 0 && - slices.Index(dataTypes, field.FieldDataType) == -1 && - field.FieldDataType != telemetrytypes.FieldDataTypeUnspecified { - continue - } - } - - if field, exists := telemetrytraces.CalculatedFields[key]; exists { - if len(dataTypes) > 0 && - slices.Index(dataTypes, field.FieldDataType) == -1 && - field.FieldDataType != telemetrytypes.FieldDataTypeUnspecified { - continue - } - } - if found { if field, exists := telemetrytraces.IntrinsicFields[key]; exists { if _, added := mapOfKeys[field.Name+";"+field.FieldContext.StringValue()+";"+field.FieldDataType.StringValue()]; !added { @@ -548,15 +530,6 @@ func (t *telemetryMetaStore) getLogsKeys(ctx context.Context, fieldKeySelectors } } - // skip the keys that don't match data type - if field, exists := telemetrylogs.IntrinsicFields[key]; exists { - if len(dataTypes) > 0 && - slices.Index(dataTypes, field.FieldDataType) == -1 && - field.FieldDataType != telemetrytypes.FieldDataTypeUnspecified { - continue - } - } - if found { if field, exists := telemetrylogs.IntrinsicFields[key]; exists { if _, added := mapOfKeys[field.Name+";"+field.FieldContext.StringValue()+";"+field.FieldDataType.StringValue()]; !added { diff --git a/pkg/telemetrytraces/statement_builder.go b/pkg/telemetrytraces/statement_builder.go index 77134f547e..08112fbb30 100644 --- a/pkg/telemetrytraces/statement_builder.go +++ b/pkg/telemetrytraces/statement_builder.go @@ -162,6 +162,26 @@ func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]) func (b *traceQueryStatementBuilder) adjustKeys(ctx context.Context, keys map[string][]*telemetrytypes.TelemetryFieldKey, query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation], requestType qbtypes.RequestType) qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation] { + // add deprecated fields only during statement building + // why? + // 1. to not fail filter expression that use deprecated cols + // 2. this could have been moved to metadata fetching itself, however, that + // would mean, they also show up in suggestions we we don't want to do + for fieldKeyName, fieldKey := range IntrinsicFieldsDeprecated { + if _, ok := keys[fieldKeyName]; !ok { + keys[fieldKeyName] = []*telemetrytypes.TelemetryFieldKey{&fieldKey} + } else { + keys[fieldKeyName] = append(keys[fieldKeyName], &fieldKey) + } + } + for fieldKeyName, fieldKey := range CalculatedFieldsDeprecated { + if _, ok := keys[fieldKeyName]; !ok { + keys[fieldKeyName] = []*telemetrytypes.TelemetryFieldKey{&fieldKey} + } else { + keys[fieldKeyName] = append(keys[fieldKeyName], &fieldKey) + } + } + // Adjust keys for alias expressions in aggregations actions := querybuilder.AdjustKeysForAliasExpressions(&query, requestType) @@ -203,26 +223,6 @@ func (b *traceQueryStatementBuilder) adjustKeys(ctx context.Context, keys map[st b.logger.InfoContext(ctx, "key adjustment action", "action", action) } - // add deprecated fields only during statement building - // why? - // 1. to not fail filter expression that use deprecated cols - // 2. this could have been moved to metadata fetching itself, however, that - // would mean, they also show up in suggestions we we don't want to do - for fieldKeyName, fieldKey := range IntrinsicFieldsDeprecated { - if _, ok := keys[fieldKeyName]; !ok { - keys[fieldKeyName] = []*telemetrytypes.TelemetryFieldKey{&fieldKey} - } else { - keys[fieldKeyName] = append(keys[fieldKeyName], &fieldKey) - } - } - for fieldKeyName, fieldKey := range CalculatedFieldsDeprecated { - if _, ok := keys[fieldKeyName]; !ok { - keys[fieldKeyName] = []*telemetrytypes.TelemetryFieldKey{&fieldKey} - } else { - keys[fieldKeyName] = append(keys[fieldKeyName], &fieldKey) - } - } - return query } diff --git a/tests/integration/fixtures/auth.py b/tests/integration/fixtures/auth.py index 952a17322d..57b1cfdb08 100644 --- a/tests/integration/fixtures/auth.py +++ b/tests/integration/fixtures/auth.py @@ -20,9 +20,9 @@ USER_ADMIN_NAME = "admin" USER_ADMIN_EMAIL = "admin@integration.test" USER_ADMIN_PASSWORD = "password123Z$" -USER_EDITOR_NAME = 'editor' -USER_EDITOR_EMAIL = 'editor@integration.test' -USER_EDITOR_PASSWORD = 'password123Z$' +USER_EDITOR_NAME = "editor" +USER_EDITOR_EMAIL = "editor@integration.test" +USER_EDITOR_PASSWORD = "password123Z$" @pytest.fixture(name="create_user_admin", scope="package") diff --git a/tests/integration/src/passwordauthn/04_password.py b/tests/integration/src/passwordauthn/04_password.py index 9739e9dfa4..0445e1af88 100644 --- a/tests/integration/src/passwordauthn/04_password.py +++ b/tests/integration/src/passwordauthn/04_password.py @@ -7,8 +7,6 @@ from sqlalchemy import sql from fixtures import types from fixtures.logger import setup_logger -from datetime import datetime, timedelta, timezone - logger = setup_logger(__name__) @@ -243,6 +241,7 @@ def test_reset_password_with_no_password( token = get_token("admin+password@integration.test", "FINALPASSword123!#[") assert token is not None + def test_forgot_password_returns_204_for_nonexistent_email( signoz: types.SigNoz, ) -> None: @@ -292,7 +291,11 @@ def test_forgot_password_creates_reset_token( # Create a user specifically for testing forgot password response = requests.post( signoz.self.host_configs["8080"].get("/api/v1/invite"), - json={"email": "forgot@integration.test", "role": "EDITOR", "name": "forgotpassword user"}, + json={ + "email": "forgot@integration.test", + "role": "EDITOR", + "name": "forgotpassword user", + }, timeout=2, headers={"Authorization": f"Bearer {admin_token}"}, ) @@ -360,11 +363,7 @@ def test_forgot_password_creates_reset_token( assert response.status_code == HTTPStatus.OK user_response = response.json()["data"] found_user = next( - ( - user - for user in user_response - if user["email"] == "forgot@integration.test" - ), + (user for user in user_response if user["email"] == "forgot@integration.test"), None, ) assert found_user is not None @@ -374,16 +373,20 @@ def test_forgot_password_creates_reset_token( # First get the password_id from factor_password, then get the token with signoz.sqlstore.conn.connect() as conn: result = conn.execute( - sql.text(""" + sql.text( + """ SELECT rpt.token FROM reset_password_token rpt JOIN factor_password fp ON rpt.password_id = fp.id WHERE fp.user_id = :user_id - """), + """ + ), {"user_id": found_user["id"]}, ) row = result.fetchone() - assert row is not None, "Reset password token should exist after calling forgotPassword" + assert ( + row is not None + ), "Reset password token should exist after calling forgotPassword" reset_token = row[0] assert reset_token is not None @@ -426,11 +429,7 @@ def test_reset_password_with_expired_token( assert response.status_code == HTTPStatus.OK user_response = response.json()["data"] found_user = next( - ( - user - for user in user_response - if user["email"] == "forgot@integration.test" - ), + (user for user in user_response if user["email"] == "forgot@integration.test"), None, ) assert found_user is not None @@ -464,12 +463,14 @@ def test_reset_password_with_expired_token( with signoz.sqlstore.conn.connect() as conn: # First get the token result = conn.execute( - sql.text(""" + sql.text( + """ SELECT rpt.token, rpt.id FROM reset_password_token rpt JOIN factor_password fp ON rpt.password_id = fp.id WHERE fp.user_id = :user_id - """), + """ + ), {"user_id": found_user["id"]}, ) row = result.fetchone() @@ -479,11 +480,13 @@ def test_reset_password_with_expired_token( # Now expire the token by setting expires_at to a past time conn.execute( - sql.text(""" + sql.text( + """ UPDATE reset_password_token SET expires_at = :expired_time WHERE id = :token_id - """), + """ + ), { "expired_time": "2020-01-01 00:00:00", "token_id": token_id, diff --git a/tests/integration/src/querier/01_logs.py b/tests/integration/src/querier/01_logs.py index e9c65699b0..ab46c58901 100644 --- a/tests/integration/src/querier/01_logs.py +++ b/tests/integration/src/querier/01_logs.py @@ -399,6 +399,174 @@ def test_logs_list( assert "d-001" in values +def test_logs_list_with_corrupt_data( + signoz: types.SigNoz, + create_user_admin: None, # pylint: disable=unused-argument + get_token: Callable[[str, str], str], + insert_logs: Callable[[List[Logs]], None], +) -> None: + """ + Setup: + Insert 2 logs with different attributes + + Tests: + 1. Query logs for the last 10 seconds and check if the logs are returned in the correct order + 2. Query values of severity_text attribute from the autocomplete API + 3. Query values of severity_text attribute from the fields API + 4. Query values of code.file attribute from the autocomplete API + 5. Query values of code.file attribute from the fields API + 6. Query values of code.line attribute from the autocomplete API + 7. Query values of code.line attribute from the fields API + """ + insert_logs( + [ + Logs( + timestamp=datetime.now(tz=timezone.utc) - timedelta(seconds=1), + resources={ + "deployment.environment": "production", + "service.name": "java", + "os.type": "linux", + "host.name": "linux-001", + "cloud.provider": "integration", + "cloud.account.id": "001", + "timestamp": "2024-01-01T00:00:00Z", + }, + attributes={ + "log.iostream": "stdout", + "logtag": "F", + "code.file": "/opt/Integration.java", + "code.function": "com.example.Integration.process", + "code.line": 120, + "telemetry.sdk.language": "java", + "id": "1", + }, + body="This is a log message, coming from a java application", + severity_text="DEBUG", + ), + Logs( + timestamp=datetime.now(tz=timezone.utc), + resources={ + "deployment.environment": "production", + "service.name": "go", + "os.type": "linux", + "host.name": "linux-001", + "cloud.provider": "integration", + "cloud.account.id": "001", + "id": 2, + }, + attributes={ + "log.iostream": "stdout", + "logtag": "F", + "code.file": "/opt/integration.go", + "code.function": "com.example.Integration.process", + "code.line": 120, + "metric.domain_id": "d-001", + "telemetry.sdk.language": "go", + "timestamp": "invalid-timestamp", + }, + body="This is a log message, coming from a go application", + severity_text="INFO", + ), + ] + ) + + token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD) + + # Query Logs for the last 10 seconds and check if the logs are returned in the correct order + response = requests.post( + signoz.self.host_configs["8080"].get("/api/v5/query_range"), + timeout=2, + headers={ + "authorization": f"Bearer {token}", + }, + json={ + "schemaVersion": "v1", + "start": int( + (datetime.now(tz=timezone.utc) - timedelta(seconds=10)).timestamp() + * 1000 + ), + "end": int(datetime.now(tz=timezone.utc).timestamp() * 1000), + "requestType": "raw", + "compositeQuery": { + "queries": [ + { + "type": "builder_query", + "spec": { + "name": "A", + "signal": "logs", + "disabled": False, + "limit": 100, + "offset": 0, + "order": [ + {"key": {"name": "timestamp"}, "direction": "desc"}, + {"key": {"name": "id"}, "direction": "desc"}, + ], + "having": {"expression": ""}, + "aggregations": [{"expression": "count()"}], + }, + } + ] + }, + "formatOptions": {"formatTableResultForUI": False, "fillGaps": False}, + }, + ) + + assert response.status_code == HTTPStatus.OK + assert response.json()["status"] == "success" + + results = response.json()["data"]["data"]["results"] + assert len(results) == 1 + + rows = results[0]["rows"] + assert len(rows) == 2 + + assert ( + rows[0]["data"]["body"] == "This is a log message, coming from a go application" + ) + assert rows[0]["data"]["resources_string"] == { + "cloud.account.id": "001", + "cloud.provider": "integration", + "deployment.environment": "production", + "host.name": "linux-001", + "os.type": "linux", + "service.name": "go", + "id": "2", + } + assert rows[0]["data"]["attributes_string"] == { + "code.file": "/opt/integration.go", + "code.function": "com.example.Integration.process", + "log.iostream": "stdout", + "logtag": "F", + "metric.domain_id": "d-001", + "telemetry.sdk.language": "go", + "timestamp": "invalid-timestamp", + } + assert rows[0]["data"]["attributes_number"] == {"code.line": 120} + + assert ( + rows[1]["data"]["body"] + == "This is a log message, coming from a java application" + ) + assert rows[1]["data"]["resources_string"] == { + "cloud.account.id": "001", + "cloud.provider": "integration", + "deployment.environment": "production", + "host.name": "linux-001", + "os.type": "linux", + "service.name": "java", + "timestamp": "2024-01-01T00:00:00Z", + } + assert rows[1]["data"]["attributes_string"] == { + "code.file": "/opt/Integration.java", + "code.function": "com.example.Integration.process", + "id": "1", + "log.iostream": "stdout", + "logtag": "F", + "telemetry.sdk.language": "java", + } + assert rows[1]["data"]["attributes_number"] == {"code.line": 120} + + @pytest.mark.parametrize( "order_by_context,expected_order", #### diff --git a/tests/integration/src/querier/04_traces.py b/tests/integration/src/querier/04_traces.py index eb48e499ae..fdacc77087 100644 --- a/tests/integration/src/querier/04_traces.py +++ b/tests/integration/src/querier/04_traces.py @@ -703,6 +703,183 @@ def test_traces_aggergate_order_by_count( assert series[0]["values"][0]["value"] == 4 +def test_traces_aggregate_with_mixed_field_selectors( + signoz: types.SigNoz, + create_user_admin: None, # pylint: disable=unused-argument + get_token: Callable[[str, str], str], + insert_traces: Callable[[List[Traces]], None], +) -> None: + """ + Setup: + Insert 4 traces with different attributes. + http-service: POST /integration -> SELECT, HTTP PATCH + topic-service: topic publish + + Tests: + 1. Query traces count for spans grouped by service.name + """ + http_service_trace_id = TraceIdGenerator.trace_id() + http_service_span_id = TraceIdGenerator.span_id() + http_service_db_span_id = TraceIdGenerator.span_id() + http_service_patch_span_id = TraceIdGenerator.span_id() + topic_service_trace_id = TraceIdGenerator.trace_id() + topic_service_span_id = TraceIdGenerator.span_id() + + now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0) + + insert_traces( + [ + Traces( + timestamp=now - timedelta(seconds=4), + duration=timedelta(seconds=3), + trace_id=http_service_trace_id, + span_id=http_service_span_id, + parent_span_id="", + name="POST /integration", + kind=TracesKind.SPAN_KIND_SERVER, + status_code=TracesStatusCode.STATUS_CODE_OK, + status_message="", + resources={ + "deployment.environment": "production", + "service.name": "http-service", + "os.type": "linux", + "host.name": "linux-000", + "cloud.provider": "integration", + "cloud.account.id": "000", + }, + attributes={ + "net.transport": "IP.TCP", + "http.scheme": "http", + "http.user_agent": "Integration Test", + "http.request.method": "POST", + "http.response.status_code": "200", + }, + ), + Traces( + timestamp=now - timedelta(seconds=3.5), + duration=timedelta(seconds=0.5), + trace_id=http_service_trace_id, + span_id=http_service_db_span_id, + parent_span_id=http_service_span_id, + name="SELECT", + kind=TracesKind.SPAN_KIND_CLIENT, + status_code=TracesStatusCode.STATUS_CODE_OK, + status_message="", + resources={ + "deployment.environment": "production", + "service.name": "http-service", + "os.type": "linux", + "host.name": "linux-000", + "cloud.provider": "integration", + "cloud.account.id": "000", + }, + attributes={ + "db.name": "integration", + "db.operation": "SELECT", + "db.statement": "SELECT * FROM integration", + }, + ), + Traces( + timestamp=now - timedelta(seconds=3), + duration=timedelta(seconds=1), + trace_id=http_service_trace_id, + span_id=http_service_patch_span_id, + parent_span_id=http_service_span_id, + name="HTTP PATCH", + kind=TracesKind.SPAN_KIND_CLIENT, + status_code=TracesStatusCode.STATUS_CODE_OK, + status_message="", + resources={ + "deployment.environment": "production", + "service.name": "http-service", + "os.type": "linux", + "host.name": "linux-000", + "cloud.provider": "integration", + "cloud.account.id": "000", + }, + attributes={ + "http.request.method": "PATCH", + "http.status_code": "404", + }, + ), + Traces( + timestamp=now - timedelta(seconds=1), + duration=timedelta(seconds=4), + trace_id=topic_service_trace_id, + span_id=topic_service_span_id, + parent_span_id="", + name="topic publish", + kind=TracesKind.SPAN_KIND_PRODUCER, + status_code=TracesStatusCode.STATUS_CODE_OK, + status_message="", + resources={ + "deployment.environment": "production", + "service.name": "topic-service", + "os.type": "linux", + "host.name": "linux-001", + "cloud.provider": "integration", + "cloud.account.id": "001", + }, + attributes={ + "message.type": "SENT", + "messaging.operation": "publish", + "messaging.message.id": "001", + }, + ), + ] + ) + + token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD) + + query = { + "type": "builder_query", + "spec": { + "name": "A", + "signal": "traces", + "groupBy": [ + { + "name": "service.name", + "fieldContext": "resource", + "fieldDataType": "string", + } + ], + "aggregations": [ + {"expression": "p99(duration_nano)", "alias": "p99"}, + {"expression": "avg(duration_nano)", "alias": "avgDuration"}, + {"expression": "count()", "alias": "numCalls"}, + {"expression": "countIf(status_code = 2)", "alias": "numErrors"}, + { + "expression": "countIf(response_status_code >= 400 AND response_status_code < 500)", + "alias": "num4XX", + }, + ], + "order": [{"key": {"name": "count()"}, "direction": "desc"}], + }, + } + + # Query traces count for spans + response = make_query_request( + signoz, + token, + start_ms=int( + (datetime.now(tz=timezone.utc) - timedelta(minutes=5)).timestamp() * 1000 + ), + end_ms=int(datetime.now(tz=timezone.utc).timestamp() * 1000), + request_type="time_series", + queries=[query], + ) + + assert response.status_code == HTTPStatus.OK + assert response.json()["status"] == "success" + results = response.json()["data"]["data"]["results"] + assert len(results) == 1 + aggregations = results[0]["aggregations"] + + assert ( + aggregations[0]["series"][0]["values"][0]["value"] >= 2.5 * 1e9 + ) # p99 for http-service + + def test_traces_fill_gaps( signoz: types.SigNoz, create_user_admin: None, # pylint: disable=unused-argument