mirror of
https://github.com/SigNoz/signoz.git
synced 2026-02-08 10:49:56 +00:00
Compare commits
1 Commits
multiple-t
...
v0.110.2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
391b3a34e1 |
3
.vscode/settings.json
vendored
3
.vscode/settings.json
vendored
@@ -11,5 +11,8 @@
|
||||
"[go]": {
|
||||
"editor.formatOnSave": true,
|
||||
"editor.defaultFormatter": "golang.go"
|
||||
},
|
||||
"[sql]": {
|
||||
"editor.defaultFormatter": "adpyke.vscode-sql-formatter"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -134,6 +134,12 @@ var (
|
||||
FieldContext: telemetrytypes.FieldContextSpan,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeNumber,
|
||||
},
|
||||
"timestamp": {
|
||||
Name: "timestamp",
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
FieldContext: telemetrytypes.FieldContextSpan,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeNumber,
|
||||
},
|
||||
}
|
||||
|
||||
CalculatedFields = map[string]telemetrytypes.TelemetryFieldKey{
|
||||
|
||||
@@ -188,6 +188,11 @@ func (m *defaultFieldMapper) getColumn(
|
||||
return indexV3Columns["attributes_bool"], nil
|
||||
}
|
||||
case telemetrytypes.FieldContextSpan, telemetrytypes.FieldContextUnspecified:
|
||||
/*
|
||||
TODO: This is incorrect, we cannot assume all unspecified context fields are span context.
|
||||
User could be referring to attributes, but we cannot fix this until we fix where_clause vistior
|
||||
https://github.com/SigNoz/signoz/pull/10102
|
||||
*/
|
||||
// Check if this is a span scope field
|
||||
if strings.ToLower(key.Name) == SpanSearchScopeRoot || strings.ToLower(key.Name) == SpanSearchScopeEntryPoint {
|
||||
// The actual SQL will be generated in the condition builder
|
||||
@@ -196,20 +201,28 @@ func (m *defaultFieldMapper) getColumn(
|
||||
|
||||
// TODO(srikanthccv): remove this when it's safe to remove
|
||||
// issue with CH aliasing
|
||||
|
||||
/*
|
||||
NOTE: There are fields which are deprecated for only to not show up as user suggestion and is possible that
|
||||
they don't have a mapping in oldToNew map. So we need to look up in indexV3Columns directly for those fields.
|
||||
For example: kind, timestamp etc.
|
||||
*/
|
||||
if _, ok := CalculatedFieldsDeprecated[key.Name]; ok {
|
||||
return indexV3Columns[oldToNew[key.Name]], nil
|
||||
// Check if we have a mapping for the deprecated calculated field
|
||||
if col, ok := indexV3Columns[oldToNew[key.Name]]; ok {
|
||||
return col, nil
|
||||
}
|
||||
}
|
||||
if _, ok := IntrinsicFieldsDeprecated[key.Name]; ok {
|
||||
// Check if we have a mapping for the deprecated intrinsic field
|
||||
if _, ok := indexV3Columns[oldToNew[key.Name]]; ok {
|
||||
return indexV3Columns[oldToNew[key.Name]], nil
|
||||
if col, ok := indexV3Columns[oldToNew[key.Name]]; ok {
|
||||
return col, nil
|
||||
}
|
||||
}
|
||||
|
||||
if col, ok := indexV3Columns[key.Name]; ok {
|
||||
return col, nil
|
||||
}
|
||||
return nil, qbtypes.ErrColumnNotFound
|
||||
}
|
||||
return nil, qbtypes.ErrColumnNotFound
|
||||
}
|
||||
|
||||
@@ -74,6 +74,41 @@ func (b *traceQueryStatementBuilder) Build(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
/*
|
||||
Adding a tech debt note here:
|
||||
This piece of code is a hot fix and should be removed once we close issue: engineering-pod/issues/3622
|
||||
*/
|
||||
/*
|
||||
-------------------------------- Start of tech debt ----------------------------
|
||||
*/
|
||||
if requestType == qbtypes.RequestTypeRaw {
|
||||
|
||||
selectedFields := query.SelectFields
|
||||
|
||||
if len(selectedFields) == 0 {
|
||||
sortedKeys := maps.Keys(DefaultFields)
|
||||
slices.Sort(sortedKeys)
|
||||
for _, key := range sortedKeys {
|
||||
selectedFields = append(selectedFields, DefaultFields[key])
|
||||
}
|
||||
query.SelectFields = selectedFields
|
||||
}
|
||||
|
||||
selectFieldKeys := []string{}
|
||||
for _, field := range selectedFields {
|
||||
selectFieldKeys = append(selectFieldKeys, field.Name)
|
||||
}
|
||||
|
||||
for _, x := range []string{"timestamp", "span_id", "trace_id"} {
|
||||
if !slices.Contains(selectFieldKeys, x) {
|
||||
query.SelectFields = append(query.SelectFields, DefaultFields[x])
|
||||
}
|
||||
}
|
||||
}
|
||||
/*
|
||||
-------------------------------- End of tech debt ----------------------------
|
||||
*/
|
||||
|
||||
query = b.adjustKeys(ctx, keys, query, requestType)
|
||||
|
||||
// Check if filter contains trace_id(s) and optimize time range if needed
|
||||
@@ -167,19 +202,13 @@ func (b *traceQueryStatementBuilder) adjustKeys(ctx context.Context, keys map[st
|
||||
// 1. to not fail filter expression that use deprecated cols
|
||||
// 2. this could have been moved to metadata fetching itself, however, that
|
||||
// would mean, they also show up in suggestions we we don't want to do
|
||||
// 3. reason for not doing a simple append is to keep intrinsic/calculated field first so that it gets
|
||||
// priority in multi_if sql expression
|
||||
for fieldKeyName, fieldKey := range IntrinsicFieldsDeprecated {
|
||||
if _, ok := keys[fieldKeyName]; !ok {
|
||||
keys[fieldKeyName] = []*telemetrytypes.TelemetryFieldKey{&fieldKey}
|
||||
} else {
|
||||
keys[fieldKeyName] = append(keys[fieldKeyName], &fieldKey)
|
||||
}
|
||||
keys[fieldKeyName] = append([]*telemetrytypes.TelemetryFieldKey{&fieldKey}, keys[fieldKeyName]...)
|
||||
}
|
||||
for fieldKeyName, fieldKey := range CalculatedFieldsDeprecated {
|
||||
if _, ok := keys[fieldKeyName]; !ok {
|
||||
keys[fieldKeyName] = []*telemetrytypes.TelemetryFieldKey{&fieldKey}
|
||||
} else {
|
||||
keys[fieldKeyName] = append(keys[fieldKeyName], &fieldKey)
|
||||
}
|
||||
keys[fieldKeyName] = append([]*telemetrytypes.TelemetryFieldKey{&fieldKey}, keys[fieldKeyName]...)
|
||||
}
|
||||
|
||||
// Adjust keys for alias expressions in aggregations
|
||||
@@ -282,29 +311,8 @@ func (b *traceQueryStatementBuilder) buildListQuery(
|
||||
cteArgs = append(cteArgs, args)
|
||||
}
|
||||
|
||||
selectedFields := query.SelectFields
|
||||
|
||||
if len(selectedFields) == 0 {
|
||||
sortedKeys := maps.Keys(DefaultFields)
|
||||
slices.Sort(sortedKeys)
|
||||
for _, key := range sortedKeys {
|
||||
selectedFields = append(selectedFields, DefaultFields[key])
|
||||
}
|
||||
}
|
||||
|
||||
selectFieldKeys := []string{}
|
||||
for _, field := range selectedFields {
|
||||
selectFieldKeys = append(selectFieldKeys, field.Name)
|
||||
}
|
||||
|
||||
for _, x := range []string{"timestamp", "span_id", "trace_id"} {
|
||||
if !slices.Contains(selectFieldKeys, x) {
|
||||
selectedFields = append(selectedFields, DefaultFields[x])
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: should we deprecate `SelectFields` and return everything from a span like we do for logs?
|
||||
for _, field := range selectedFields {
|
||||
for _, field := range query.SelectFields {
|
||||
colExpr, err := b.fm.ColumnExpressionFor(ctx, &field, keys)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -483,7 +483,7 @@ func TestStatementBuilderListQuery(t *testing.T) {
|
||||
Limit: 10,
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT duration_nano AS `duration_nano`, name AS `name`, response_status_code AS `response_status_code`, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, `resource_string_service$$name_exists`==true, `resource_string_service$$name`, NULL) AS `service.name`, span_id AS `span_id`, timestamp AS `timestamp`, trace_id AS `trace_id` FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND true AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? ORDER BY attributes_string['user.id'] AS `user.id` desc LIMIT ?",
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT duration_nano AS `duration_nano`, name AS `name`, response_status_code AS `response_status_code`, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) AS `service.name`, span_id AS `span_id`, timestamp AS `timestamp`, trace_id AS `trace_id` FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND true AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? ORDER BY attributes_string['user.id'] AS `user.id` desc LIMIT ?",
|
||||
Args: []any{"redis-manual", "%service.name%", "%service.name\":\"redis-manual%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
@@ -698,6 +698,113 @@ func TestStatementBuilderListQuery(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestStatementBuilderListQueryWithCorruptData(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
requestType qbtypes.RequestType
|
||||
query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]
|
||||
keysMap map[string][]*telemetrytypes.TelemetryFieldKey
|
||||
expected qbtypes.Statement
|
||||
expectedErr error
|
||||
}{
|
||||
{
|
||||
name: "List query with empty select fields",
|
||||
requestType: qbtypes.RequestTypeRaw,
|
||||
keysMap: map[string][]*telemetrytypes.TelemetryFieldKey{
|
||||
"timestamp": {
|
||||
{
|
||||
Name: "timestamp",
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
},
|
||||
},
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
|
||||
Filter: &qbtypes.Filter{},
|
||||
Limit: 10,
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT duration_nano AS `duration_nano`, name AS `name`, response_status_code AS `response_status_code`, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) AS `service.name`, span_id AS `span_id`, timestamp AS `timestamp`, trace_id AS `trace_id` FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Args: []any{uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
name: "List query with empty select fields and order by timestamp",
|
||||
requestType: qbtypes.RequestTypeRaw,
|
||||
keysMap: map[string][]*telemetrytypes.TelemetryFieldKey{
|
||||
"timestamp": {
|
||||
{
|
||||
Name: "timestamp",
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
},
|
||||
},
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
|
||||
Filter: &qbtypes.Filter{},
|
||||
Limit: 10,
|
||||
Order: []qbtypes.OrderBy{{
|
||||
Key: qbtypes.OrderByKey{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "timestamp",
|
||||
},
|
||||
},
|
||||
Direction: qbtypes.OrderDirectionAsc,
|
||||
}},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT duration_nano AS `duration_nano`, name AS `name`, response_status_code AS `response_status_code`, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) AS `service.name`, span_id AS `span_id`, timestamp AS `timestamp`, trace_id AS `trace_id` FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? ORDER BY timestamp AS `timestamp` asc LIMIT ?",
|
||||
Args: []any{uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore.KeysMap = c.keysMap
|
||||
if mockMetadataStore.KeysMap == nil {
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
}
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
|
||||
|
||||
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
|
||||
|
||||
statementBuilder := NewTraceQueryStatementBuilder(
|
||||
instrumentationtest.New().ToProviderSettings(),
|
||||
mockMetadataStore,
|
||||
fm,
|
||||
cb,
|
||||
resourceFilterStmtBuilder,
|
||||
aggExprRewriter,
|
||||
nil,
|
||||
)
|
||||
|
||||
q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query, nil)
|
||||
|
||||
if c.expectedErr != nil {
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), c.expectedErr.Error())
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, c.expected.Query, q.Query)
|
||||
require.Equal(t, c.expected.Args, q.Args)
|
||||
require.Equal(t, c.expected.Warnings, q.Warnings)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestStatementBuilderTraceQuery(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from http import HTTPStatus
|
||||
from typing import Callable, Dict, List
|
||||
from typing import Any, Callable, Dict, List
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
@@ -14,7 +14,11 @@ from fixtures.querier import (
|
||||
make_query_request,
|
||||
)
|
||||
from fixtures.traces import TraceIdGenerator, Traces, TracesKind, TracesStatusCode
|
||||
from src.querier.util import assert_identical_query_response
|
||||
from src.querier.util import (
|
||||
assert_identical_query_response,
|
||||
format_timestamp,
|
||||
generate_traces_with_corrupt_metadata,
|
||||
)
|
||||
|
||||
|
||||
def test_traces_list(
|
||||
@@ -475,6 +479,234 @@ def test_traces_list(
|
||||
assert set(values) == set(["topic-service", "http-service"])
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"payload,status_code,results",
|
||||
[
|
||||
# Case 1: order by timestamp field which there in attributes as well
|
||||
pytest.param(
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"signal": "traces",
|
||||
"disabled": False,
|
||||
"order": [{"key": {"name": "timestamp"}, "direction": "desc"}],
|
||||
"limit": 1,
|
||||
},
|
||||
},
|
||||
HTTPStatus.OK,
|
||||
lambda x: [
|
||||
x[3].duration_nano,
|
||||
x[3].name,
|
||||
x[3].response_status_code,
|
||||
x[3].service_name,
|
||||
x[3].span_id,
|
||||
format_timestamp(x[3].timestamp),
|
||||
x[3].trace_id,
|
||||
], # type: Callable[[List[Traces]], List[Any]]
|
||||
),
|
||||
# Case 2: order by attribute timestamp field which is there in attributes as well
|
||||
# This should break but it doesn't because attribute.timestamp gets adjusted to timestamp
|
||||
# because of default trace.timestamp gets added by default and bug in field mapper picks
|
||||
# instrinsic field
|
||||
pytest.param(
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"signal": "traces",
|
||||
"disabled": False,
|
||||
"order": [
|
||||
{"key": {"name": "attribute.timestamp"}, "direction": "desc"}
|
||||
],
|
||||
"limit": 1,
|
||||
},
|
||||
},
|
||||
HTTPStatus.OK,
|
||||
lambda x: [
|
||||
x[3].duration_nano,
|
||||
x[3].name,
|
||||
x[3].response_status_code,
|
||||
x[3].service_name,
|
||||
x[3].span_id,
|
||||
format_timestamp(x[3].timestamp),
|
||||
x[3].trace_id,
|
||||
], # type: Callable[[List[Traces]], List[Any]]
|
||||
),
|
||||
# Case 3: select timestamp with empty order by
|
||||
pytest.param(
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"signal": "traces",
|
||||
"disabled": False,
|
||||
"selectFields": [{"name": "timestamp"}],
|
||||
"limit": 1,
|
||||
},
|
||||
},
|
||||
HTTPStatus.OK,
|
||||
lambda x: [
|
||||
x[2].span_id,
|
||||
format_timestamp(x[2].timestamp),
|
||||
x[2].trace_id,
|
||||
], # type: Callable[[List[Traces]], List[Any]]
|
||||
),
|
||||
# Case 4: select attribute.timestamp with empty order by
|
||||
# This doesn't return any data because of where_clause using aliased timestamp
|
||||
pytest.param(
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"signal": "traces",
|
||||
"filter": {"expression": "attribute.timestamp exists"},
|
||||
"disabled": False,
|
||||
"selectFields": [{"name": "attribute.timestamp"}],
|
||||
"limit": 1,
|
||||
},
|
||||
},
|
||||
HTTPStatus.OK,
|
||||
lambda x: [], # type: Callable[[List[Traces]], List[Any]]
|
||||
),
|
||||
# Case 5: select timestamp with timestamp order by
|
||||
pytest.param(
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"signal": "traces",
|
||||
"disabled": False,
|
||||
"selectFields": [{"name": "timestamp"}],
|
||||
"limit": 1,
|
||||
"order": [{"key": {"name": "timestamp"}, "direction": "asc"}],
|
||||
},
|
||||
},
|
||||
HTTPStatus.OK,
|
||||
lambda x: [
|
||||
x[0].span_id,
|
||||
format_timestamp(x[0].timestamp),
|
||||
x[0].trace_id,
|
||||
], # type: Callable[[List[Traces]], List[Any]]
|
||||
),
|
||||
# Case 6: select duration_nano with duration order by
|
||||
pytest.param(
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"signal": "traces",
|
||||
"disabled": False,
|
||||
"selectFields": [{"name": "duration_nano"}],
|
||||
"limit": 1,
|
||||
"order": [{"key": {"name": "duration_nano"}, "direction": "desc"}],
|
||||
},
|
||||
},
|
||||
HTTPStatus.OK,
|
||||
lambda x: [
|
||||
x[1].duration_nano,
|
||||
x[1].span_id,
|
||||
format_timestamp(x[1].timestamp),
|
||||
x[1].trace_id,
|
||||
], # type: Callable[[List[Traces]], List[Any]]
|
||||
),
|
||||
# Case 7: select attribute.duration_nano with attribute.duration_nano order by
|
||||
pytest.param(
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"signal": "traces",
|
||||
"disabled": False,
|
||||
"selectFields": [{"name": "attribute.duration_nano"}],
|
||||
"filter": {"expression": "attribute.duration_nano exists"},
|
||||
"limit": 1,
|
||||
"order": [
|
||||
{
|
||||
"key": {"name": "attribute.duration_nano"},
|
||||
"direction": "desc",
|
||||
}
|
||||
],
|
||||
},
|
||||
},
|
||||
HTTPStatus.OK,
|
||||
lambda x: [
|
||||
"corrupt_data",
|
||||
x[3].span_id,
|
||||
format_timestamp(x[3].timestamp),
|
||||
x[3].trace_id,
|
||||
], # type: Callable[[List[Traces]], List[Any]]
|
||||
),
|
||||
# Case 8: select attribute.duration_nano with duration order by
|
||||
pytest.param(
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"signal": "traces",
|
||||
"disabled": False,
|
||||
"selectFields": [{"name": "attribute.duration_nano"}],
|
||||
"limit": 1,
|
||||
"order": [{"key": {"name": "duration_nano"}, "direction": "desc"}],
|
||||
},
|
||||
},
|
||||
HTTPStatus.OK,
|
||||
lambda x: [
|
||||
x[1].duration_nano,
|
||||
x[1].span_id,
|
||||
format_timestamp(x[1].timestamp),
|
||||
x[1].trace_id,
|
||||
], # type: Callable[[List[Traces]], List[Any]]
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_traces_list_with_corrupt_data(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_traces: Callable[[List[Traces]], None],
|
||||
payload: Dict[str, Any],
|
||||
status_code: HTTPStatus,
|
||||
results: Callable[[List[Traces]], List[Any]],
|
||||
) -> None:
|
||||
"""
|
||||
Setup:
|
||||
Insert 4 traces with corrupt attributes.
|
||||
Tests:
|
||||
"""
|
||||
|
||||
traces = generate_traces_with_corrupt_metadata()
|
||||
insert_traces(traces)
|
||||
# 4 Traces with corrupt metadata inserted
|
||||
# traces[i] occured before traces[j] where i < j
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
response = make_query_request(
|
||||
signoz,
|
||||
token,
|
||||
start_ms=int(
|
||||
(datetime.now(tz=timezone.utc) - timedelta(minutes=5)).timestamp() * 1000
|
||||
),
|
||||
end_ms=int(datetime.now(tz=timezone.utc).timestamp() * 1000),
|
||||
request_type="raw",
|
||||
queries=[payload],
|
||||
)
|
||||
|
||||
assert response.status_code == status_code
|
||||
|
||||
if response.status_code == HTTPStatus.OK:
|
||||
|
||||
if not results(traces):
|
||||
# No results expected
|
||||
assert response.json()["data"]["data"]["results"][0]["rows"] is None
|
||||
else:
|
||||
data = response.json()["data"]["data"]["results"][0]["rows"][0]["data"]
|
||||
# Cannot compare values as they are randomly generated
|
||||
for key, value in zip(list(data.keys()), results(traces)):
|
||||
assert data[key] == value
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"order_by,aggregation_alias,expected_status",
|
||||
[
|
||||
|
||||
@@ -1,7 +1,25 @@
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from http import HTTPStatus
|
||||
from typing import List
|
||||
|
||||
import requests
|
||||
|
||||
from fixtures.traces import TraceIdGenerator, Traces, TracesKind, TracesStatusCode
|
||||
|
||||
|
||||
def format_timestamp(dt: datetime) -> str:
|
||||
"""
|
||||
Format a datetime object to match the API's timestamp format.
|
||||
The API returns timestamps with minimal fractional seconds precision.
|
||||
Example: 2026-02-03T20:54:56.5Z for 500000 microseconds
|
||||
"""
|
||||
base_str = dt.strftime("%Y-%m-%dT%H:%M:%S")
|
||||
if dt.microsecond:
|
||||
# Convert microseconds to fractional seconds and strip trailing zeros
|
||||
fractional = f"{dt.microsecond / 1000000:.6f}"[2:].rstrip("0")
|
||||
return f"{base_str}.{fractional}Z"
|
||||
return f"{base_str}Z"
|
||||
|
||||
|
||||
def assert_identical_query_response(
|
||||
response1: requests.Response, response2: requests.Response
|
||||
@@ -18,3 +36,125 @@ def assert_identical_query_response(
|
||||
response1.json()["data"]["data"]["results"]
|
||||
== response2.json()["data"]["data"]["results"]
|
||||
), "Response data do not match"
|
||||
|
||||
|
||||
def generate_traces_with_corrupt_metadata() -> List[Traces]:
|
||||
"""
|
||||
Specifically, entries with 'id', 'timestamp', 'trace_id' and 'duration_nano' fields in metadata
|
||||
"""
|
||||
http_service_trace_id = TraceIdGenerator.trace_id()
|
||||
http_service_span_id = TraceIdGenerator.span_id()
|
||||
http_service_db_span_id = TraceIdGenerator.span_id()
|
||||
http_service_patch_span_id = TraceIdGenerator.span_id()
|
||||
topic_service_trace_id = TraceIdGenerator.trace_id()
|
||||
topic_service_span_id = TraceIdGenerator.span_id()
|
||||
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
|
||||
return [
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=4),
|
||||
duration=timedelta(seconds=3),
|
||||
trace_id=http_service_trace_id,
|
||||
span_id=http_service_span_id,
|
||||
parent_span_id="",
|
||||
name="POST /integration",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={
|
||||
"deployment.environment": "production",
|
||||
"service.name": "http-service",
|
||||
"os.type": "linux",
|
||||
"host.name": "linux-000",
|
||||
"cloud.provider": "integration",
|
||||
"cloud.account.id": "000",
|
||||
"trace_id": "corrupt_data",
|
||||
},
|
||||
attributes={
|
||||
"net.transport": "IP.TCP",
|
||||
"http.scheme": "http",
|
||||
"http.user_agent": "Integration Test",
|
||||
"http.request.method": "POST",
|
||||
"http.response.status_code": "200",
|
||||
"timestamp": "corrupt_data",
|
||||
},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=3.5),
|
||||
duration=timedelta(seconds=5),
|
||||
trace_id=http_service_trace_id,
|
||||
span_id=http_service_db_span_id,
|
||||
parent_span_id=http_service_span_id,
|
||||
name="SELECT",
|
||||
kind=TracesKind.SPAN_KIND_CLIENT,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={
|
||||
"deployment.environment": "production",
|
||||
"service.name": "http-service",
|
||||
"os.type": "linux",
|
||||
"host.name": "linux-000",
|
||||
"cloud.provider": "integration",
|
||||
"cloud.account.id": "000",
|
||||
"timestamp": "corrupt_data",
|
||||
},
|
||||
attributes={
|
||||
"db.name": "integration",
|
||||
"db.operation": "SELECT",
|
||||
"db.statement": "SELECT * FROM integration",
|
||||
"trace_d": "corrupt_data",
|
||||
},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=3),
|
||||
duration=timedelta(seconds=1),
|
||||
trace_id=http_service_trace_id,
|
||||
span_id=http_service_patch_span_id,
|
||||
parent_span_id=http_service_span_id,
|
||||
name="HTTP PATCH",
|
||||
kind=TracesKind.SPAN_KIND_CLIENT,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={
|
||||
"deployment.environment": "production",
|
||||
"service.name": "http-service",
|
||||
"os.type": "linux",
|
||||
"host.name": "linux-000",
|
||||
"cloud.provider": "integration",
|
||||
"cloud.account.id": "000",
|
||||
"duration_nano": "corrupt_data",
|
||||
},
|
||||
attributes={
|
||||
"http.request.method": "PATCH",
|
||||
"http.status_code": "404",
|
||||
"id": "1",
|
||||
},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=1),
|
||||
duration=timedelta(seconds=4),
|
||||
trace_id=topic_service_trace_id,
|
||||
span_id=topic_service_span_id,
|
||||
parent_span_id="",
|
||||
name="topic publish",
|
||||
kind=TracesKind.SPAN_KIND_PRODUCER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
status_message="",
|
||||
resources={
|
||||
"deployment.environment": "production",
|
||||
"service.name": "topic-service",
|
||||
"os.type": "linux",
|
||||
"host.name": "linux-001",
|
||||
"cloud.provider": "integration",
|
||||
"cloud.account.id": "001",
|
||||
},
|
||||
attributes={
|
||||
"message.type": "SENT",
|
||||
"messaging.operation": "publish",
|
||||
"messaging.message.id": "001",
|
||||
"duration_nano": "corrupt_data",
|
||||
"id": 1,
|
||||
},
|
||||
),
|
||||
]
|
||||
|
||||
Reference in New Issue
Block a user