Compare commits

...

2 Commits

Author SHA1 Message Date
Tushar Vats
799fd53d24 fix: reverted changes to field mapper and added more integration tests 2026-02-04 14:25:57 +05:30
Tushar Vats
b4ac83aeac fix: handle traces intrinsic fields 2026-02-04 14:25:44 +05:30
7 changed files with 548 additions and 39 deletions

View File

@@ -11,5 +11,8 @@
"[go]": {
"editor.formatOnSave": true,
"editor.defaultFormatter": "golang.go"
},
"[sql]": {
"editor.defaultFormatter": "adpyke.vscode-sql-formatter"
}
}

View File

@@ -134,6 +134,12 @@ var (
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeNumber,
},
"timestamp": {
Name: "timestamp",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextSpan,
FieldDataType: telemetrytypes.FieldDataTypeNumber,
},
}
CalculatedFields = map[string]telemetrytypes.TelemetryFieldKey{

View File

@@ -188,6 +188,11 @@ func (m *defaultFieldMapper) getColumn(
return indexV3Columns["attributes_bool"], nil
}
case telemetrytypes.FieldContextSpan, telemetrytypes.FieldContextUnspecified:
/*
TODO: This is incorrect, we cannot assume all unspecified context fields are span context.
User could be referring to attributes, but we cannot fix this until we fix where_clause vistior
https://github.com/SigNoz/signoz/pull/10102
*/
// Check if this is a span scope field
if strings.ToLower(key.Name) == SpanSearchScopeRoot || strings.ToLower(key.Name) == SpanSearchScopeEntryPoint {
// The actual SQL will be generated in the condition builder
@@ -196,20 +201,28 @@ func (m *defaultFieldMapper) getColumn(
// TODO(srikanthccv): remove this when it's safe to remove
// issue with CH aliasing
/*
NOTE: There are fields which are deprecated for only to not show up as user suggestion and is possible that
they don't have a mapping in oldToNew map. So we need to look up in indexV3Columns directly for those fields.
For example: kind, timestamp etc.
*/
if _, ok := CalculatedFieldsDeprecated[key.Name]; ok {
return indexV3Columns[oldToNew[key.Name]], nil
// Check if we have a mapping for the deprecated calculated field
if col, ok := indexV3Columns[oldToNew[key.Name]]; ok {
return col, nil
}
}
if _, ok := IntrinsicFieldsDeprecated[key.Name]; ok {
// Check if we have a mapping for the deprecated intrinsic field
if _, ok := indexV3Columns[oldToNew[key.Name]]; ok {
return indexV3Columns[oldToNew[key.Name]], nil
if col, ok := indexV3Columns[oldToNew[key.Name]]; ok {
return col, nil
}
}
if col, ok := indexV3Columns[key.Name]; ok {
return col, nil
}
return nil, qbtypes.ErrColumnNotFound
}
return nil, qbtypes.ErrColumnNotFound
}

View File

@@ -74,6 +74,41 @@ func (b *traceQueryStatementBuilder) Build(
return nil, err
}
/*
Adding a tech debt note here:
This piece of code is a hot fix and should be removed once we close issue: engineering-pod/issues/3622
*/
/*
-------------------------------- Start of tech debt ----------------------------
*/
if requestType == qbtypes.RequestTypeRaw {
selectedFields := query.SelectFields
if len(selectedFields) == 0 {
sortedKeys := maps.Keys(DefaultFields)
slices.Sort(sortedKeys)
for _, key := range sortedKeys {
selectedFields = append(selectedFields, DefaultFields[key])
}
query.SelectFields = selectedFields
}
selectFieldKeys := []string{}
for _, field := range selectedFields {
selectFieldKeys = append(selectFieldKeys, field.Name)
}
for _, x := range []string{"timestamp", "span_id", "trace_id"} {
if !slices.Contains(selectFieldKeys, x) {
query.SelectFields = append(query.SelectFields, DefaultFields[x])
}
}
}
/*
-------------------------------- End of tech debt ----------------------------
*/
query = b.adjustKeys(ctx, keys, query, requestType)
// Check if filter contains trace_id(s) and optimize time range if needed
@@ -167,19 +202,13 @@ func (b *traceQueryStatementBuilder) adjustKeys(ctx context.Context, keys map[st
// 1. to not fail filter expression that use deprecated cols
// 2. this could have been moved to metadata fetching itself, however, that
// would mean, they also show up in suggestions we we don't want to do
// 3. reason for not doing a simple append is to keep intrinsic/calculated field first so that it gets
// priority in multi_if sql expression
for fieldKeyName, fieldKey := range IntrinsicFieldsDeprecated {
if _, ok := keys[fieldKeyName]; !ok {
keys[fieldKeyName] = []*telemetrytypes.TelemetryFieldKey{&fieldKey}
} else {
keys[fieldKeyName] = append(keys[fieldKeyName], &fieldKey)
}
keys[fieldKeyName] = append([]*telemetrytypes.TelemetryFieldKey{&fieldKey}, keys[fieldKeyName]...)
}
for fieldKeyName, fieldKey := range CalculatedFieldsDeprecated {
if _, ok := keys[fieldKeyName]; !ok {
keys[fieldKeyName] = []*telemetrytypes.TelemetryFieldKey{&fieldKey}
} else {
keys[fieldKeyName] = append(keys[fieldKeyName], &fieldKey)
}
keys[fieldKeyName] = append([]*telemetrytypes.TelemetryFieldKey{&fieldKey}, keys[fieldKeyName]...)
}
// Adjust keys for alias expressions in aggregations
@@ -282,29 +311,8 @@ func (b *traceQueryStatementBuilder) buildListQuery(
cteArgs = append(cteArgs, args)
}
selectedFields := query.SelectFields
if len(selectedFields) == 0 {
sortedKeys := maps.Keys(DefaultFields)
slices.Sort(sortedKeys)
for _, key := range sortedKeys {
selectedFields = append(selectedFields, DefaultFields[key])
}
}
selectFieldKeys := []string{}
for _, field := range selectedFields {
selectFieldKeys = append(selectFieldKeys, field.Name)
}
for _, x := range []string{"timestamp", "span_id", "trace_id"} {
if !slices.Contains(selectFieldKeys, x) {
selectedFields = append(selectedFields, DefaultFields[x])
}
}
// TODO: should we deprecate `SelectFields` and return everything from a span like we do for logs?
for _, field := range selectedFields {
for _, field := range query.SelectFields {
colExpr, err := b.fm.ColumnExpressionFor(ctx, &field, keys)
if err != nil {
return nil, err

View File

@@ -483,7 +483,7 @@ func TestStatementBuilderListQuery(t *testing.T) {
Limit: 10,
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT duration_nano AS `duration_nano`, name AS `name`, response_status_code AS `response_status_code`, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, `resource_string_service$$name_exists`==true, `resource_string_service$$name`, NULL) AS `service.name`, span_id AS `span_id`, timestamp AS `timestamp`, trace_id AS `trace_id` FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND true AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? ORDER BY attributes_string['user.id'] AS `user.id` desc LIMIT ?",
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT duration_nano AS `duration_nano`, name AS `name`, response_status_code AS `response_status_code`, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) AS `service.name`, span_id AS `span_id`, timestamp AS `timestamp`, trace_id AS `trace_id` FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND true AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? ORDER BY attributes_string['user.id'] AS `user.id` desc LIMIT ?",
Args: []any{"redis-manual", "%service.name%", "%service.name\":\"redis-manual%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
},
expectedErr: nil,
@@ -698,6 +698,113 @@ func TestStatementBuilderListQuery(t *testing.T) {
}
}
func TestStatementBuilderListQueryWithCorruptData(t *testing.T) {
cases := []struct {
name string
requestType qbtypes.RequestType
query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]
keysMap map[string][]*telemetrytypes.TelemetryFieldKey
expected qbtypes.Statement
expectedErr error
}{
{
name: "List query with empty select fields",
requestType: qbtypes.RequestTypeRaw,
keysMap: map[string][]*telemetrytypes.TelemetryFieldKey{
"timestamp": {
{
Name: "timestamp",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
},
},
query: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Signal: telemetrytypes.SignalTraces,
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
Filter: &qbtypes.Filter{},
Limit: 10,
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT duration_nano AS `duration_nano`, name AS `name`, response_status_code AS `response_status_code`, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) AS `service.name`, span_id AS `span_id`, timestamp AS `timestamp`, trace_id AS `trace_id` FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
},
expectedErr: nil,
},
{
name: "List query with empty select fields and order by timestamp",
requestType: qbtypes.RequestTypeRaw,
keysMap: map[string][]*telemetrytypes.TelemetryFieldKey{
"timestamp": {
{
Name: "timestamp",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
},
},
query: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Signal: telemetrytypes.SignalTraces,
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
Filter: &qbtypes.Filter{},
Limit: 10,
Order: []qbtypes.OrderBy{{
Key: qbtypes.OrderByKey{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "timestamp",
},
},
Direction: qbtypes.OrderDirectionAsc,
}},
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT duration_nano AS `duration_nano`, name AS `name`, response_status_code AS `response_status_code`, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) AS `service.name`, span_id AS `span_id`, timestamp AS `timestamp`, trace_id AS `trace_id` FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? ORDER BY timestamp AS `timestamp` asc LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
},
expectedErr: nil,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = c.keysMap
if mockMetadataStore.KeysMap == nil {
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
}
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
statementBuilder := NewTraceQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
fm,
cb,
resourceFilterStmtBuilder,
aggExprRewriter,
nil,
)
q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query, nil)
if c.expectedErr != nil {
require.Error(t, err)
require.Contains(t, err.Error(), c.expectedErr.Error())
} else {
require.NoError(t, err)
require.Equal(t, c.expected.Query, q.Query)
require.Equal(t, c.expected.Args, q.Args)
require.Equal(t, c.expected.Warnings, q.Warnings)
}
})
}
}
func TestStatementBuilderTraceQuery(t *testing.T) {
cases := []struct {
name string

View File

@@ -1,6 +1,6 @@
from datetime import datetime, timedelta, timezone
from http import HTTPStatus
from typing import Callable, Dict, List
from typing import Any, Callable, Dict, List
import pytest
import requests
@@ -14,7 +14,11 @@ from fixtures.querier import (
make_query_request,
)
from fixtures.traces import TraceIdGenerator, Traces, TracesKind, TracesStatusCode
from src.querier.util import assert_identical_query_response
from src.querier.util import (
assert_identical_query_response,
format_timestamp,
generate_traces_with_corrupt_metadata,
)
def test_traces_list(
@@ -475,6 +479,234 @@ def test_traces_list(
assert set(values) == set(["topic-service", "http-service"])
@pytest.mark.parametrize(
"payload,status_code,results",
[
# Case 1: order by timestamp field which there in attributes as well
pytest.param(
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "traces",
"disabled": False,
"order": [{"key": {"name": "timestamp"}, "direction": "desc"}],
"limit": 1,
},
},
HTTPStatus.OK,
lambda x: [
x[3].duration_nano,
x[3].name,
x[3].response_status_code,
x[3].service_name,
x[3].span_id,
format_timestamp(x[3].timestamp),
x[3].trace_id,
], # type: Callable[[List[Traces]], List[Any]]
),
# Case 2: order by attribute timestamp field which is there in attributes as well
# This should break but it doesn't because attribute.timestamp gets adjusted to timestamp
# because of default trace.timestamp gets added by default and bug in field mapper picks
# instrinsic field
pytest.param(
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "traces",
"disabled": False,
"order": [
{"key": {"name": "attribute.timestamp"}, "direction": "desc"}
],
"limit": 1,
},
},
HTTPStatus.OK,
lambda x: [
x[3].duration_nano,
x[3].name,
x[3].response_status_code,
x[3].service_name,
x[3].span_id,
format_timestamp(x[3].timestamp),
x[3].trace_id,
], # type: Callable[[List[Traces]], List[Any]]
),
# Case 3: select timestamp with empty order by
pytest.param(
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "traces",
"disabled": False,
"selectFields": [{"name": "timestamp"}],
"limit": 1,
},
},
HTTPStatus.OK,
lambda x: [
x[2].span_id,
format_timestamp(x[2].timestamp),
x[2].trace_id,
], # type: Callable[[List[Traces]], List[Any]]
),
# Case 4: select attribute.timestamp with empty order by
# This doesn't return any data because of where_clause using aliased timestamp
pytest.param(
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "traces",
"filter": {"expression": "attribute.timestamp exists"},
"disabled": False,
"selectFields": [{"name": "attribute.timestamp"}],
"limit": 1,
},
},
HTTPStatus.OK,
lambda x: [], # type: Callable[[List[Traces]], List[Any]]
),
# Case 5: select timestamp with timestamp order by
pytest.param(
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "traces",
"disabled": False,
"selectFields": [{"name": "timestamp"}],
"limit": 1,
"order": [{"key": {"name": "timestamp"}, "direction": "asc"}],
},
},
HTTPStatus.OK,
lambda x: [
x[0].span_id,
format_timestamp(x[0].timestamp),
x[0].trace_id,
], # type: Callable[[List[Traces]], List[Any]]
),
# Case 6: select duration_nano with duration order by
pytest.param(
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "traces",
"disabled": False,
"selectFields": [{"name": "duration_nano"}],
"limit": 1,
"order": [{"key": {"name": "duration_nano"}, "direction": "desc"}],
},
},
HTTPStatus.OK,
lambda x: [
x[1].duration_nano,
x[1].span_id,
format_timestamp(x[1].timestamp),
x[1].trace_id,
], # type: Callable[[List[Traces]], List[Any]]
),
# Case 7: select attribute.duration_nano with attribute.duration_nano order by
pytest.param(
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "traces",
"disabled": False,
"selectFields": [{"name": "attribute.duration_nano"}],
"filter": {"expression": "attribute.duration_nano exists"},
"limit": 1,
"order": [
{
"key": {"name": "attribute.duration_nano"},
"direction": "desc",
}
],
},
},
HTTPStatus.OK,
lambda x: [
"corrupt_data",
x[3].span_id,
format_timestamp(x[3].timestamp),
x[3].trace_id,
], # type: Callable[[List[Traces]], List[Any]]
),
# Case 8: select attribute.duration_nano with duration order by
pytest.param(
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "traces",
"disabled": False,
"selectFields": [{"name": "attribute.duration_nano"}],
"limit": 1,
"order": [{"key": {"name": "duration_nano"}, "direction": "desc"}],
},
},
HTTPStatus.OK,
lambda x: [
x[1].duration_nano,
x[1].span_id,
format_timestamp(x[1].timestamp),
x[1].trace_id,
], # type: Callable[[List[Traces]], List[Any]]
),
],
)
def test_traces_list_with_corrupt_data(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[List[Traces]], None],
payload: Dict[str, Any],
status_code: HTTPStatus,
results: Callable[[List[Traces]], List[Any]],
) -> None:
"""
Setup:
Insert 4 traces with corrupt attributes.
Tests:
"""
traces = generate_traces_with_corrupt_metadata()
insert_traces(traces)
# 4 Traces with corrupt metadata inserted
# traces[i] occured before traces[j] where i < j
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
response = make_query_request(
signoz,
token,
start_ms=int(
(datetime.now(tz=timezone.utc) - timedelta(minutes=5)).timestamp() * 1000
),
end_ms=int(datetime.now(tz=timezone.utc).timestamp() * 1000),
request_type="raw",
queries=[payload],
)
assert response.status_code == status_code
if response.status_code == HTTPStatus.OK:
if not results(traces):
# No results expected
assert response.json()["data"]["data"]["results"][0]["rows"] is None
else:
data = response.json()["data"]["data"]["results"][0]["rows"][0]["data"]
# Cannot compare values as they are randomly generated
for key, value in zip(list(data.keys()), results(traces)):
assert data[key] == value
@pytest.mark.parametrize(
"order_by,aggregation_alias,expected_status",
[

View File

@@ -1,7 +1,25 @@
from datetime import datetime, timedelta, timezone
from http import HTTPStatus
from typing import List
import requests
from fixtures.traces import TraceIdGenerator, Traces, TracesKind, TracesStatusCode
def format_timestamp(dt: datetime) -> str:
"""
Format a datetime object to match the API's timestamp format.
The API returns timestamps with minimal fractional seconds precision.
Example: 2026-02-03T20:54:56.5Z for 500000 microseconds
"""
base_str = dt.strftime("%Y-%m-%dT%H:%M:%S")
if dt.microsecond:
# Convert microseconds to fractional seconds and strip trailing zeros
fractional = f"{dt.microsecond / 1000000:.6f}"[2:].rstrip("0")
return f"{base_str}.{fractional}Z"
return f"{base_str}Z"
def assert_identical_query_response(
response1: requests.Response, response2: requests.Response
@@ -18,3 +36,125 @@ def assert_identical_query_response(
response1.json()["data"]["data"]["results"]
== response2.json()["data"]["data"]["results"]
), "Response data do not match"
def generate_traces_with_corrupt_metadata() -> List[Traces]:
"""
Specifically, entries with 'id', 'timestamp', 'trace_id' and 'duration_nano' fields in metadata
"""
http_service_trace_id = TraceIdGenerator.trace_id()
http_service_span_id = TraceIdGenerator.span_id()
http_service_db_span_id = TraceIdGenerator.span_id()
http_service_patch_span_id = TraceIdGenerator.span_id()
topic_service_trace_id = TraceIdGenerator.trace_id()
topic_service_span_id = TraceIdGenerator.span_id()
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
return [
Traces(
timestamp=now - timedelta(seconds=4),
duration=timedelta(seconds=3),
trace_id=http_service_trace_id,
span_id=http_service_span_id,
parent_span_id="",
name="POST /integration",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={
"deployment.environment": "production",
"service.name": "http-service",
"os.type": "linux",
"host.name": "linux-000",
"cloud.provider": "integration",
"cloud.account.id": "000",
"trace_id": "corrupt_data",
},
attributes={
"net.transport": "IP.TCP",
"http.scheme": "http",
"http.user_agent": "Integration Test",
"http.request.method": "POST",
"http.response.status_code": "200",
"timestamp": "corrupt_data",
},
),
Traces(
timestamp=now - timedelta(seconds=3.5),
duration=timedelta(seconds=5),
trace_id=http_service_trace_id,
span_id=http_service_db_span_id,
parent_span_id=http_service_span_id,
name="SELECT",
kind=TracesKind.SPAN_KIND_CLIENT,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={
"deployment.environment": "production",
"service.name": "http-service",
"os.type": "linux",
"host.name": "linux-000",
"cloud.provider": "integration",
"cloud.account.id": "000",
"timestamp": "corrupt_data",
},
attributes={
"db.name": "integration",
"db.operation": "SELECT",
"db.statement": "SELECT * FROM integration",
"trace_d": "corrupt_data",
},
),
Traces(
timestamp=now - timedelta(seconds=3),
duration=timedelta(seconds=1),
trace_id=http_service_trace_id,
span_id=http_service_patch_span_id,
parent_span_id=http_service_span_id,
name="HTTP PATCH",
kind=TracesKind.SPAN_KIND_CLIENT,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={
"deployment.environment": "production",
"service.name": "http-service",
"os.type": "linux",
"host.name": "linux-000",
"cloud.provider": "integration",
"cloud.account.id": "000",
"duration_nano": "corrupt_data",
},
attributes={
"http.request.method": "PATCH",
"http.status_code": "404",
"id": "1",
},
),
Traces(
timestamp=now - timedelta(seconds=1),
duration=timedelta(seconds=4),
trace_id=topic_service_trace_id,
span_id=topic_service_span_id,
parent_span_id="",
name="topic publish",
kind=TracesKind.SPAN_KIND_PRODUCER,
status_code=TracesStatusCode.STATUS_CODE_OK,
status_message="",
resources={
"deployment.environment": "production",
"service.name": "topic-service",
"os.type": "linux",
"host.name": "linux-001",
"cloud.provider": "integration",
"cloud.account.id": "001",
},
attributes={
"message.type": "SENT",
"messaging.operation": "publish",
"messaging.message.id": "001",
"duration_nano": "corrupt_data",
"id": 1,
},
),
]