mirror of
https://github.com/SigNoz/signoz.git
synced 2026-06-24 17:10:31 +01:00
Compare commits
26 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b4b2d7bb66 | ||
|
|
e16416475b | ||
|
|
0ea7c1ae6e | ||
|
|
a023c8ed4a | ||
|
|
a73ae62cd1 | ||
|
|
ec6fb58052 | ||
|
|
d3d13eb7ff | ||
|
|
782de2b210 | ||
|
|
d3c38693f3 | ||
|
|
8791df3697 | ||
|
|
eb719c3d0d | ||
|
|
f10435c210 | ||
|
|
f3f1e9cb59 | ||
|
|
d0370ce3ef | ||
|
|
d169761e65 | ||
|
|
87864ef5d4 | ||
|
|
2e0bc8998e | ||
|
|
7e1f4aa50d | ||
|
|
35da39247c | ||
|
|
ceccc47a34 | ||
|
|
23da5e22ec | ||
|
|
4c1b479149 | ||
|
|
f72204a8b2 | ||
|
|
deb3f385fa | ||
|
|
77ce5f86b1 | ||
|
|
ff211de441 |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -231,4 +231,5 @@ cython_debug/
|
||||
# LSP config files
|
||||
pyrightconfig.json
|
||||
|
||||
|
||||
# agents
|
||||
.claude/settings.local.json
|
||||
|
||||
@@ -56,6 +56,17 @@ func QueryStringToKeysSelectors(query string) []*telemetrytypes.FieldKeySelector
|
||||
FieldDataType: key.FieldDataType,
|
||||
})
|
||||
}
|
||||
|
||||
// todo(tushar): consider reverting changes done to this method in below PR to avoid scope specific checks
|
||||
// https://github.com/SigNoz/signoz/issues/11374
|
||||
if key.FieldContext == telemetrytypes.FieldContextScope {
|
||||
keys = append(keys, &telemetrytypes.FieldKeySelector{
|
||||
Name: key.FieldContext.StringValue() + "." + key.Name,
|
||||
Signal: key.Signal,
|
||||
FieldContext: telemetrytypes.FieldContextUnspecified, // this allows 'scope.' prefix for keys with other context as well
|
||||
FieldDataType: key.FieldDataType,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -72,6 +72,23 @@ func TestQueryToKeys(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
query: `scope.version = '1.0.0'`,
|
||||
expectedKeys: []telemetrytypes.FieldKeySelector{
|
||||
{
|
||||
Name: "version",
|
||||
Signal: telemetrytypes.SignalUnspecified,
|
||||
FieldContext: telemetrytypes.FieldContextScope,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
|
||||
},
|
||||
{
|
||||
Name: "scope.version",
|
||||
Signal: telemetrytypes.SignalUnspecified,
|
||||
FieldContext: telemetrytypes.FieldContextUnspecified,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, testCase := range testCases {
|
||||
|
||||
@@ -200,7 +200,7 @@ func (t *telemetryMetaStore) getTracesKeys(ctx context.Context, fieldKeySelector
|
||||
`CASE
|
||||
// WHEN tagType = 'spanfield' THEN 1
|
||||
WHEN tagType = 'resource' THEN 2
|
||||
// WHEN tagType = 'scope' THEN 3
|
||||
WHEN tagType = 'scope' THEN 3
|
||||
WHEN tagType = 'tag' THEN 4
|
||||
ELSE 5
|
||||
END as priority`,
|
||||
|
||||
@@ -51,6 +51,7 @@ var (
|
||||
ValueType: schema.ColumnTypeString,
|
||||
}},
|
||||
"resource": {Name: "resource", Type: schema.JSONColumnType{}},
|
||||
"scope": {Name: "scope", Type: schema.JSONColumnType{}},
|
||||
|
||||
"events": {Name: "events", Type: schema.ArrayColumnType{
|
||||
ElementType: schema.ColumnTypeString,
|
||||
@@ -176,7 +177,7 @@ func (m *defaultFieldMapper) getColumn(
|
||||
case telemetrytypes.FieldContextResource:
|
||||
return []*schema.Column{indexV3Columns["resources_string"], indexV3Columns["resource"]}, nil
|
||||
case telemetrytypes.FieldContextScope:
|
||||
return []*schema.Column{}, qbtypes.ErrColumnNotFound
|
||||
return []*schema.Column{indexV3Columns["scope"]}, nil
|
||||
case telemetrytypes.FieldContextAttribute:
|
||||
switch key.FieldDataType {
|
||||
case telemetrytypes.FieldDataTypeString:
|
||||
@@ -278,14 +279,24 @@ func (m *defaultFieldMapper) FieldFor(
|
||||
|
||||
switch column.Type.GetType() {
|
||||
case schema.ColumnTypeEnumJSON:
|
||||
// json is only supported for resource context as of now
|
||||
if key.FieldContext != telemetrytypes.FieldContextResource {
|
||||
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "only resource context fields are supported for json columns, got %s", key.FieldContext.String)
|
||||
}
|
||||
// have to add ::string as clickHouse throws an error :- data types Variant/Dynamic are not allowed in GROUP BY
|
||||
// once clickHouse dependency is updated, we need to check if we can remove it.
|
||||
exprs = append(exprs, fmt.Sprintf("%s.`%s`::String", columnName, key.Name))
|
||||
existExpr = append(existExpr, fmt.Sprintf("%s.`%s` IS NOT NULL", columnName, key.Name))
|
||||
switch key.FieldContext {
|
||||
case telemetrytypes.FieldContextResource:
|
||||
exprs = append(exprs, fmt.Sprintf("%s.`%s`::String", columnName, key.Name))
|
||||
existExpr = append(existExpr, fmt.Sprintf("%s.`%s` IS NOT NULL", columnName, key.Name))
|
||||
case telemetrytypes.FieldContextScope:
|
||||
switch key.Name {
|
||||
case "scope.name", "scope.version":
|
||||
exprs = append(exprs, fmt.Sprintf("%s::String", key.Name))
|
||||
existExpr = append(existExpr, fmt.Sprintf("%s IS NOT NULL", key.Name))
|
||||
default:
|
||||
exprs = append(exprs, fmt.Sprintf("%s.attributes.`%s`::String", columnName, key.Name))
|
||||
existExpr = append(existExpr, fmt.Sprintf("%s.attributes.`%s` IS NOT NULL", columnName, key.Name))
|
||||
}
|
||||
default:
|
||||
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "only resource and scope context fields are supported for json columns, got %s", key.FieldContext.String)
|
||||
}
|
||||
case schema.ColumnTypeEnumString,
|
||||
schema.ColumnTypeEnumUInt64,
|
||||
schema.ColumnTypeEnumUInt32,
|
||||
|
||||
@@ -82,6 +82,33 @@ func TestGetFieldKeyName(t *testing.T) {
|
||||
expectedResult: "multiIf(resource.`deployment.environment` IS NOT NULL, resource.`deployment.environment`::String, `resource_string_deployment$$environment_exists`==true, `resource_string_deployment$$environment`, NULL)",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Scope field - scope.name",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "scope.name",
|
||||
FieldContext: telemetrytypes.FieldContextScope,
|
||||
},
|
||||
expectedResult: "scope.name::String",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Scope field - scope.version",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "scope.version",
|
||||
FieldContext: telemetrytypes.FieldContextScope,
|
||||
},
|
||||
expectedResult: "scope.version::String",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Scope field - custom attribute",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "custom.attr",
|
||||
FieldContext: telemetrytypes.FieldContextScope,
|
||||
},
|
||||
expectedResult: "scope.attributes.`custom.attr`::String",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
// Query like `attribute.attribute_string:string` should resolve to `attributes_string['attribute_string']`.
|
||||
name: "Attribute key whose name collides with contextual map column resolves as a map lookup",
|
||||
|
||||
@@ -370,6 +370,94 @@ func TestStatementBuilder(t *testing.T) {
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
name: "scope.name filter and group by",
|
||||
requestType: qbtypes.RequestTypeTimeSeries,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
|
||||
Aggregations: []qbtypes.TraceAggregation{
|
||||
{
|
||||
Expression: "count()",
|
||||
},
|
||||
},
|
||||
Filter: &qbtypes.Filter{
|
||||
Expression: "scope.name = 'opentelemetry-io'",
|
||||
},
|
||||
Limit: 10,
|
||||
GroupBy: []qbtypes.GroupByKey{
|
||||
{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "scope.name",
|
||||
FieldContext: telemetrytypes.FieldContextScope,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __limit_cte AS (SELECT toString(multiIf(scope.name::String IS NOT NULL, scope.name::String, NULL)) AS `scope.name`, count() AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE (scope.name::String = ? AND scope.name::String IS NOT NULL) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY `scope.name` ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(timestamp, INTERVAL 30 SECOND) AS ts, toString(multiIf(scope.name::String IS NOT NULL, scope.name::String, NULL)) AS `scope.name`, count() AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE (scope.name::String = ? AND scope.name::String IS NOT NULL) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`scope.name`) GLOBAL IN (SELECT `scope.name` FROM __limit_cte) GROUP BY ts, `scope.name`",
|
||||
Args: []any{"opentelemetry-io", "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10, "opentelemetry-io", "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448)},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
name: "scope.version filter with scope.name group by",
|
||||
requestType: qbtypes.RequestTypeTimeSeries,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
|
||||
Aggregations: []qbtypes.TraceAggregation{
|
||||
{
|
||||
Expression: "count()",
|
||||
},
|
||||
},
|
||||
Filter: &qbtypes.Filter{
|
||||
Expression: "scope.version = '1.0.0'",
|
||||
},
|
||||
Limit: 10,
|
||||
GroupBy: []qbtypes.GroupByKey{
|
||||
{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "scope.name",
|
||||
FieldContext: telemetrytypes.FieldContextScope,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __limit_cte AS (SELECT toString(multiIf(scope.name::String IS NOT NULL, scope.name::String, NULL)) AS `scope.name`, count() AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE (scope.version::String = ? AND scope.version::String IS NOT NULL) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY `scope.name` ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(timestamp, INTERVAL 30 SECOND) AS ts, toString(multiIf(scope.name::String IS NOT NULL, scope.name::String, NULL)) AS `scope.name`, count() AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE (scope.version::String = ? AND scope.version::String IS NOT NULL) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`scope.name`) GLOBAL IN (SELECT `scope.name` FROM __limit_cte) GROUP BY ts, `scope.name`",
|
||||
Args: []any{"1.0.0", "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10, "1.0.0", "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448)},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
name: "scope.version filter only (no scope field in group by)",
|
||||
requestType: qbtypes.RequestTypeTimeSeries,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
|
||||
Aggregations: []qbtypes.TraceAggregation{
|
||||
{
|
||||
Expression: "count()",
|
||||
},
|
||||
},
|
||||
Filter: &qbtypes.Filter{
|
||||
Expression: "scope.version = '1.0.0'",
|
||||
},
|
||||
Limit: 10,
|
||||
GroupBy: []qbtypes.GroupByKey{
|
||||
{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __limit_cte AS (SELECT toString(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS `service.name`, count() AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE (scope.version::String = ? AND scope.version::String IS NOT NULL) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY `service.name` ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(timestamp, INTERVAL 30 SECOND) AS ts, toString(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS `service.name`, count() AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE (scope.version::String = ? AND scope.version::String IS NOT NULL) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) GLOBAL IN (SELECT `service.name` FROM __limit_cte) GROUP BY ts, `service.name`",
|
||||
Args: []any{"1.0.0", "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10, "1.0.0", "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448)},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
fm := NewFieldMapper()
|
||||
@@ -793,6 +881,32 @@ func TestStatementBuilderListQueryWithCorruptData(t *testing.T) {
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
name: "List query with scope filter only (no scope in select or group by)",
|
||||
requestType: qbtypes.RequestTypeRaw,
|
||||
keysMap: map[string][]*telemetrytypes.TelemetryFieldKey{
|
||||
"scope.version": {
|
||||
{
|
||||
Name: "scope.version",
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
FieldContext: telemetrytypes.FieldContextScope,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
},
|
||||
},
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
|
||||
Filter: &qbtypes.Filter{
|
||||
Expression: "scope.version = '1.0.0'",
|
||||
},
|
||||
Limit: 10,
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "SELECT timestamp AS `timestamp`, trace_id AS `trace_id`, span_id AS `span_id`, trace_state AS `trace_state`, parent_span_id AS `parent_span_id`, flags AS `flags`, name AS `name`, kind AS `kind`, kind_string AS `kind_string`, duration_nano AS `duration_nano`, status_code AS `status_code`, status_message AS `status_message`, status_code_string AS `status_code_string`, events AS `events`, links AS `links`, response_status_code AS `response_status_code`, external_http_url AS `external_http_url`, http_url AS `http_url`, external_http_method AS `external_http_method`, http_method AS `http_method`, http_host AS `http_host`, db_name AS `db_name`, db_operation AS `db_operation`, has_error AS `has_error`, is_remote AS `is_remote`, attributes_string, attributes_number, attributes_bool, resources_string FROM signoz_traces.distributed_signoz_index_v3 WHERE (scope.version::String = ? AND scope.version::String IS NOT NULL) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Args: []any{"1.0.0", "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
|
||||
@@ -113,6 +113,20 @@ func buildCompleteFieldKeyMap(releaseTime time.Time) map[string][]*telemetrytype
|
||||
FieldDataType: telemetrytypes.FieldDataTypeBool,
|
||||
},
|
||||
},
|
||||
"scope.name": {
|
||||
{
|
||||
Name: "scope.name",
|
||||
FieldContext: telemetrytypes.FieldContextScope,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
},
|
||||
"scope.version": {
|
||||
{
|
||||
Name: "scope.version",
|
||||
FieldContext: telemetrytypes.FieldContextScope,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, keys := range keysMap {
|
||||
for _, key := range keys {
|
||||
|
||||
24
tests/fixtures/querier.py
vendored
24
tests/fixtures/querier.py
vendored
@@ -862,6 +862,8 @@ def generate_traces_with_corrupt_metadata() -> list[Traces]:
|
||||
"cloud.provider": "integration",
|
||||
"cloud.account.id": "000",
|
||||
"trace_id": "corrupt_data",
|
||||
"scope_name": "corrupt_data",
|
||||
"scope.scope.name": "corrupt_data",
|
||||
},
|
||||
attributes={
|
||||
"net.transport": "IP.TCP",
|
||||
@@ -870,7 +872,10 @@ def generate_traces_with_corrupt_metadata() -> list[Traces]:
|
||||
"http.request.method": "POST",
|
||||
"http.response.status_code": "200",
|
||||
"timestamp": "corrupt_data",
|
||||
"version": "1.0.0",
|
||||
"scope.scope.version": "1.0.0",
|
||||
},
|
||||
scope={"name": "io.signoz.http.server", "version": "2.0.0"},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=3.5),
|
||||
@@ -890,12 +895,24 @@ def generate_traces_with_corrupt_metadata() -> list[Traces]:
|
||||
"cloud.provider": "integration",
|
||||
"cloud.account.id": "000",
|
||||
"timestamp": "corrupt_data",
|
||||
"scope.attributes.name": "corrupt_data",
|
||||
},
|
||||
attributes={
|
||||
"db.name": "integration",
|
||||
"db.operation": "SELECT",
|
||||
"db.statement": "SELECT * FROM integration",
|
||||
"trace_d": "corrupt_data",
|
||||
"scope.attributes.version": "corrupt_data",
|
||||
},
|
||||
scope={
|
||||
"name": "io.opentelemetry.contrib.http",
|
||||
"version": "1.0.0",
|
||||
"attributes": {
|
||||
"telemetry.sdk.language": "cpp",
|
||||
"name": "not-the-real-name",
|
||||
"version": "not-the-real-version",
|
||||
"attributes": "literally-a-key-named-attributes",
|
||||
},
|
||||
},
|
||||
),
|
||||
Traces(
|
||||
@@ -916,12 +933,15 @@ def generate_traces_with_corrupt_metadata() -> list[Traces]:
|
||||
"cloud.provider": "integration",
|
||||
"cloud.account.id": "000",
|
||||
"duration_nano": "corrupt_data",
|
||||
"scope.scope.attributes.version": "corrupt_data",
|
||||
},
|
||||
attributes={
|
||||
"http.request.method": "PATCH",
|
||||
"http.status_code": "404",
|
||||
"id": "1",
|
||||
"scope.scope.version": "corrupt_data",
|
||||
},
|
||||
scope={"name": "io.signoz.http.client", "version": "2.0.0"},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=1),
|
||||
@@ -940,6 +960,7 @@ def generate_traces_with_corrupt_metadata() -> list[Traces]:
|
||||
"host.name": "linux-001",
|
||||
"cloud.provider": "integration",
|
||||
"cloud.account.id": "001",
|
||||
"scope.scope.version": "corrupt_data",
|
||||
},
|
||||
attributes={
|
||||
"message.type": "SENT",
|
||||
@@ -947,6 +968,9 @@ def generate_traces_with_corrupt_metadata() -> list[Traces]:
|
||||
"messaging.message.id": "001",
|
||||
"duration_nano": "corrupt_data",
|
||||
"id": 1,
|
||||
"scope": "corrupt_data",
|
||||
"scope.attributes.name": "corrupt_data",
|
||||
},
|
||||
scope={"name": "io.signoz.messaging", "version": "3.0.0"},
|
||||
),
|
||||
]
|
||||
|
||||
34
tests/fixtures/traces.py
vendored
34
tests/fixtures/traces.py
vendored
@@ -286,6 +286,7 @@ class Traces(ABC):
|
||||
db_operation: str
|
||||
has_error: bool
|
||||
is_remote: str
|
||||
scope_json: dict[str, Any]
|
||||
|
||||
resource: list[TracesResource]
|
||||
tag_attributes: list[TracesTagAttributes]
|
||||
@@ -311,6 +312,7 @@ class Traces(ABC):
|
||||
links: list[TracesLink] = [],
|
||||
trace_state: str = "",
|
||||
flags: np.uint32 = 0,
|
||||
scope: dict[str, Any] = {},
|
||||
resource_write_mode: Literal["legacy_only", "dual_write"] = "dual_write",
|
||||
) -> None:
|
||||
if timestamp is None:
|
||||
@@ -392,6 +394,35 @@ class Traces(ABC):
|
||||
# Calculate resource fingerprint
|
||||
self.resource_fingerprint = LogsOrTracesFingerprint(self.resources_string).calculate()
|
||||
|
||||
# Process scope mirroring the InstrumentationScope on the OTLP span.
|
||||
scope_name = scope.get("name", "")
|
||||
scope_version = scope.get("version", "")
|
||||
scope_string = {k: str(v) for k, v in scope.get("attributes", {}).items()}
|
||||
self.scope_json = {
|
||||
"name": scope_name,
|
||||
"version": scope_version,
|
||||
"attributes": scope_string,
|
||||
}
|
||||
|
||||
scope_keys = {"scope.name": scope_name, "scope.version": scope_version}
|
||||
scope_keys.update(scope_string)
|
||||
for k, v in scope_keys.items():
|
||||
if v == "":
|
||||
continue
|
||||
self.tag_attributes.append(
|
||||
TracesTagAttributes(
|
||||
timestamp=timestamp,
|
||||
tag_key=k,
|
||||
tag_type="scope",
|
||||
tag_data_type="string",
|
||||
string_value=v,
|
||||
number_value=None,
|
||||
)
|
||||
)
|
||||
self.attribute_keys.append(
|
||||
TracesResourceOrAttributeKeys(name=k, datatype="string", tag_type="scope")
|
||||
)
|
||||
|
||||
# Process attributes by type and populate custom fields
|
||||
self.attribute_string = {}
|
||||
self.attributes_number = {}
|
||||
@@ -644,6 +675,7 @@ class Traces(ABC):
|
||||
self.has_error,
|
||||
self.is_remote,
|
||||
self.resource_json,
|
||||
self.scope_json,
|
||||
],
|
||||
dtype=object,
|
||||
)
|
||||
@@ -675,6 +707,7 @@ class Traces(ABC):
|
||||
attributes=data.get("attributes", {}),
|
||||
trace_state=data.get("trace_state", ""),
|
||||
flags=data.get("flags", 0),
|
||||
scope=data.get("scope", {}),
|
||||
)
|
||||
|
||||
@classmethod
|
||||
@@ -814,6 +847,7 @@ def insert_traces_to_clickhouse(conn, traces: list[Traces]) -> None:
|
||||
"has_error",
|
||||
"is_remote",
|
||||
"resource",
|
||||
"scope",
|
||||
],
|
||||
data=[trace.np_arr() for trace in traces],
|
||||
)
|
||||
|
||||
@@ -709,6 +709,26 @@ def test_traces_list(
|
||||
x[1].trace_id,
|
||||
], # type: Callable[[List[Traces]], List[Any]]
|
||||
),
|
||||
# Case 9: filter on the intrinsic scope.version. Only x[1] should match
|
||||
pytest.param(
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"signal": "traces",
|
||||
"disabled": False,
|
||||
"selectFields": [{"name": "timestamp"}],
|
||||
"filter": {"expression": "scope.version = '1.0.0'"},
|
||||
"limit": 1,
|
||||
},
|
||||
},
|
||||
HTTPStatus.OK,
|
||||
lambda x: [
|
||||
x[1].span_id,
|
||||
format_timestamp(x[1].timestamp),
|
||||
x[1].trace_id,
|
||||
], # type: Callable[[List[Traces]], List[Any]]
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_traces_list_with_corrupt_data(
|
||||
@@ -755,6 +775,153 @@ def test_traces_list_with_corrupt_data(
|
||||
assert data[key] == value
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"filter_expression,expected_indices",
|
||||
[
|
||||
# Intrinsic scope.name / scope.version resolve to the JSON sub-columns.
|
||||
pytest.param("scope.name = 'io.signoz.payment'", [1]),
|
||||
pytest.param("scope.version = '2.3.1'", [0]),
|
||||
# A scope attribute resolves against the scope JSON column's attributes.
|
||||
pytest.param("scope.telemetry.sdk.language = 'python'", [1]),
|
||||
# `env.tier` is a span attribute on span 0 and a scope attribute on
|
||||
# span 1. Unprefixed -> no explicit context, so it is checked in every
|
||||
# applicable context (attribute OR scope) and both spans match.
|
||||
pytest.param("env.tier = 'gold'", [0, 1]),
|
||||
# The explicit `scope.` prefix forces scope context only, so span 0's
|
||||
# span attribute is ignored — only span 1 matches.
|
||||
pytest.param("scope.env.tier = 'gold'", [1]),
|
||||
# `scope.name` matches BOTH the intrinsic scope.name field (span 0) and a
|
||||
# scope attribute literally named `name` (span 1's scope attribute
|
||||
# name='io.signoz.checkout').
|
||||
pytest.param("scope.name = 'io.signoz.checkout'", [0, 1]),
|
||||
# `scope.name` also matches a span attribute literally named `scope.name`
|
||||
# (attribute context) — span 2 carries attribute scope.name='attr-scope-name'.
|
||||
pytest.param("scope.name = 'attr-scope-name'", [2]),
|
||||
# An unprefixed `name` resolves to the intrinsic span `name` column and a
|
||||
# `name` scope attribute, but NOT the scope.name field. Span 2's span
|
||||
# name and span 1's scope attribute `name` both equal 'io.signoz.checkout';
|
||||
# span 0's scope.name field equals it too but is NOT matched.
|
||||
pytest.param("name = 'io.signoz.checkout'", [1, 2]),
|
||||
# A value that no resolvable key holds (scope.name/scope.version field,
|
||||
# a `name`/`version` scope attribute, or a same-named attribute/resource)
|
||||
# returns nothing.
|
||||
pytest.param("scope.version = 'corrupt_data'", []),
|
||||
pytest.param("scope.name = 'corrupt_data'", []),
|
||||
],
|
||||
)
|
||||
def test_traces_list_with_scope_filter(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_traces: Callable[[list[Traces]], None],
|
||||
filter_expression: str,
|
||||
expected_indices: list[int],
|
||||
) -> None:
|
||||
"""
|
||||
Setup three spans that have different scope key resolution.
|
||||
Tests:
|
||||
- Filtering on scope.name / scope.version / a scope attribute.
|
||||
- An unprefixed key is resolved across contexts (scope checked alongside
|
||||
attribute / intrinsic), while a `scope.`-prefixed key is scope-only.
|
||||
- `scope.name` hits the intrinsic field, a `name` scope attribute, and a
|
||||
span attribute `scope.name` (cross-context), while a bare
|
||||
`name` hits the span name column (and a `name` scope attribute) but never
|
||||
the scope.name field.
|
||||
"""
|
||||
trace_id = TraceIdGenerator.trace_id()
|
||||
span_ids = [TraceIdGenerator.span_id() for _ in range(3)]
|
||||
now = datetime.now(tz=UTC).replace(second=0, microsecond=0)
|
||||
|
||||
traces = [
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=4),
|
||||
duration=timedelta(seconds=2),
|
||||
trace_id=trace_id,
|
||||
span_id=span_ids[0],
|
||||
parent_span_id="",
|
||||
name="GET /checkout",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
resources={"service.name": "checkout"},
|
||||
attributes={"http.request.method": "GET", "env.tier": "gold"},
|
||||
scope={
|
||||
"name": "io.signoz.checkout",
|
||||
"version": "2.3.1",
|
||||
"attributes": {"telemetry.sdk.language": "go"},
|
||||
},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=2),
|
||||
duration=timedelta(seconds=1),
|
||||
trace_id=trace_id,
|
||||
span_id=span_ids[1],
|
||||
parent_span_id="",
|
||||
name="POST /pay",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
resources={"service.name": "payment"},
|
||||
attributes={"http.request.method": "POST"},
|
||||
# env.tier is a scope attribute here (cross-context with span 0);
|
||||
# `name` is a scope attribute colliding with span 0's scope.name.
|
||||
scope={
|
||||
"name": "io.signoz.payment",
|
||||
"version": "4.5.6",
|
||||
"attributes": {
|
||||
"telemetry.sdk.language": "python",
|
||||
"env.tier": "gold",
|
||||
"name": "io.signoz.checkout",
|
||||
},
|
||||
},
|
||||
),
|
||||
Traces(
|
||||
timestamp=now - timedelta(seconds=1),
|
||||
duration=timedelta(seconds=1),
|
||||
trace_id=trace_id,
|
||||
span_id=span_ids[2],
|
||||
parent_span_id="",
|
||||
# span name collides with span 0's scope.name value
|
||||
name="io.signoz.checkout",
|
||||
kind=TracesKind.SPAN_KIND_SERVER,
|
||||
status_code=TracesStatusCode.STATUS_CODE_OK,
|
||||
resources={"service.name": "probe"},
|
||||
# a span attribute named `scope.name`
|
||||
attributes={"scope.name": "attr-scope-name"},
|
||||
scope={"name": "span-gamma", "version": "9.9.9"},
|
||||
),
|
||||
]
|
||||
insert_traces(traces)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
|
||||
response = make_query_request(
|
||||
signoz,
|
||||
token,
|
||||
start_ms=int((datetime.now(tz=UTC) - timedelta(minutes=5)).timestamp() * 1000),
|
||||
end_ms=int(datetime.now(tz=UTC).timestamp() * 1000),
|
||||
request_type="raw",
|
||||
queries=[
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"signal": "traces",
|
||||
"disabled": False,
|
||||
"selectFields": [{"name": "timestamp"}],
|
||||
"filter": {"expression": filter_expression},
|
||||
"limit": 10,
|
||||
},
|
||||
}
|
||||
],
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
|
||||
rows = response.json()["data"]["data"]["results"][0]["rows"] or []
|
||||
got_span_ids = {row["data"]["span_id"] for row in rows}
|
||||
expected_span_ids = {traces[i].span_id for i in expected_indices}
|
||||
assert got_span_ids == expected_span_ids
|
||||
|
||||
|
||||
def _verify_events_links_full(rows: list[dict], traces: list[Traces]) -> None:
|
||||
"""Empty-selectFields case: events/links arrive parsed into structured objects.
|
||||
Every row's events/links should match the fixture's stored parsed shape
|
||||
|
||||
Reference in New Issue
Block a user