mirror of
https://github.com/SigNoz/signoz.git
synced 2026-06-24 00:50:35 +01:00
Compare commits
3 Commits
main
...
ns/json-tr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1d8a7429c5 | ||
|
|
9bb3f86a8a | ||
|
|
d3a3b61716 |
@@ -2418,6 +2418,14 @@ func (k *telemetryMetaStore) updateColumnEvolutionMetadataForKeys(ctx context.Co
|
||||
selector.FieldName = key.Name
|
||||
if keyEvolutions, ok := evolutionsByUniqueKey[selector.QualifiedName()]; ok {
|
||||
keysToUpdate[i].Evolutions = keyEvolutions
|
||||
|
||||
for _, evolution := range keyEvolutions {
|
||||
if evolution.ColumnName == telemetrytraces.ColumnAttributesPromoted {
|
||||
// field has been promoted into the dedicated JSON sub-column.
|
||||
keysToUpdate[i].Promoted = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,6 +41,9 @@ func (c *conditionBuilder) conditionFor(
|
||||
value = querybuilder.FormatValueForContains(value)
|
||||
}
|
||||
|
||||
// For filters, field mapper keeps WHERE on the map columns instead of JSON column.
|
||||
ctx = qbtypes.WithFilterIntent(ctx)
|
||||
|
||||
// first, locate the raw column type (so we can choose the right EXISTS logic)
|
||||
columns, err := c.fm.ColumnFor(ctx, startNs, endNs, key)
|
||||
if err != nil {
|
||||
|
||||
@@ -4,6 +4,11 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
)
|
||||
|
||||
const (
|
||||
ColumnAttributes = "attributes"
|
||||
ColumnAttributesPromoted = "attributes_promoted"
|
||||
)
|
||||
|
||||
var (
|
||||
IntrinsicFields = map[string]telemetrytypes.TelemetryFieldKey{
|
||||
"trace_id": {
|
||||
|
||||
@@ -46,6 +46,10 @@ var (
|
||||
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
ValueType: schema.ColumnTypeBool,
|
||||
}},
|
||||
|
||||
ColumnAttributes: {Name: ColumnAttributes, Type: schema.JSONColumnType{}},
|
||||
ColumnAttributesPromoted: {Name: ColumnAttributesPromoted, Type: schema.JSONColumnType{}},
|
||||
|
||||
"resources_string": {Name: "resources_string", Type: schema.MapColumnType{
|
||||
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
ValueType: schema.ColumnTypeString,
|
||||
@@ -167,6 +171,54 @@ func NewFieldMapper() *defaultFieldMapper {
|
||||
return &defaultFieldMapper{}
|
||||
}
|
||||
|
||||
// chJSONTypeAndMapColumn returns the ClickHouse dynamic type used to read an
|
||||
// attribute value out of a JSON column and the corresponding map column used as fallback
|
||||
func chJSONTypeAndMapColumn(dt telemetrytypes.FieldDataType) (chType string, mapColumn string, ok bool) {
|
||||
switch dt {
|
||||
case telemetrytypes.FieldDataTypeString:
|
||||
return "String", "attributes_string", true
|
||||
case telemetrytypes.FieldDataTypeInt64,
|
||||
telemetrytypes.FieldDataTypeFloat64,
|
||||
telemetrytypes.FieldDataTypeNumber:
|
||||
return "Float64", "attributes_number", true
|
||||
case telemetrytypes.FieldDataTypeBool:
|
||||
return "Bool", "attributes_bool", true
|
||||
}
|
||||
return "", "", false
|
||||
}
|
||||
|
||||
// buildAttributeJSONExpr builds a value expression for an attribute key that
|
||||
// prefers the JSON columns and falls back to the map column. When promoted is
|
||||
// true the attributes_promoted sub-column (dedicated, fastest) is tried first:
|
||||
//
|
||||
// promoted: multiIf(attributes_promoted.`k` IS NOT NULL, attributes_promoted.`k`::T,
|
||||
// attributes.`k` IS NOT NULL, attributes.`k`::T, attributes_<t>['k'])
|
||||
// non-promoted: multiIf(attributes.`k` IS NOT NULL, attributes.`k`::T, attributes_<t>['k'])
|
||||
func buildAttributeJSONExpr(key *telemetrytypes.TelemetryFieldKey, promoted bool) (string, bool) {
|
||||
chType, mapColumn, ok := chJSONTypeAndMapColumn(key.FieldDataType)
|
||||
if !ok {
|
||||
return "", false
|
||||
}
|
||||
|
||||
branches := []string{}
|
||||
if promoted {
|
||||
promotedPath := fmt.Sprintf("%s.`%s`", ColumnAttributesPromoted, key.Name)
|
||||
branches = append(branches,
|
||||
fmt.Sprintf("%s IS NOT NULL", promotedPath),
|
||||
fmt.Sprintf("%s::%s", promotedPath, chType),
|
||||
)
|
||||
}
|
||||
jsonPath := fmt.Sprintf("%s.`%s`", ColumnAttributes, key.Name)
|
||||
branches = append(branches,
|
||||
fmt.Sprintf("%s IS NOT NULL", jsonPath),
|
||||
fmt.Sprintf("%s::%s", jsonPath, chType),
|
||||
)
|
||||
|
||||
mapAccess := fmt.Sprintf("%s['%s']", mapColumn, key.Name)
|
||||
expr := fmt.Sprintf("multiIf(%s, %s)", strings.Join(branches, ", "), mapAccess)
|
||||
return expr, true
|
||||
}
|
||||
|
||||
func (m *defaultFieldMapper) getColumn(
|
||||
_ context.Context,
|
||||
_, _ uint64,
|
||||
@@ -250,6 +302,12 @@ func (m *defaultFieldMapper) FieldFor(
|
||||
return key.Name, nil
|
||||
}
|
||||
|
||||
if key.FieldContext == telemetrytypes.FieldContextAttribute && !key.Materialized && !qbtypes.IsFilterIntent(ctx) {
|
||||
if expr, ok := buildAttributeJSONExpr(key, key.Promoted); ok {
|
||||
return expr, nil
|
||||
}
|
||||
}
|
||||
|
||||
columns, err := m.getColumn(ctx, startNs, endNs, key)
|
||||
if err != nil {
|
||||
return "", err
|
||||
|
||||
@@ -31,33 +31,33 @@ func TestGetFieldKeyName(t *testing.T) {
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Map column type - string attribute",
|
||||
name: "Attribute - string attribute uses JSON with map fallback",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "user.id",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
expectedResult: "attributes_string['user.id']",
|
||||
expectedResult: "multiIf(attributes.`user.id` IS NOT NULL, attributes.`user.id`::String, attributes_string['user.id'])",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Map column type - number attribute",
|
||||
name: "Attribute - number attribute uses JSON with map fallback",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "request.size",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeNumber,
|
||||
},
|
||||
expectedResult: "attributes_number['request.size']",
|
||||
expectedResult: "multiIf(attributes.`request.size` IS NOT NULL, attributes.`request.size`::Float64, attributes_number['request.size'])",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Map column type - bool attribute",
|
||||
name: "Attribute - bool attribute uses JSON with map fallback",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "request.success",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeBool,
|
||||
},
|
||||
expectedResult: "attributes_bool['request.success']",
|
||||
expectedResult: "multiIf(attributes.`request.success` IS NOT NULL, attributes.`request.success`::Bool, attributes_bool['request.success'])",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
@@ -190,3 +190,101 @@ func TestFieldForResourceWithEvolution(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestFieldForAttributeJSON(t *testing.T) {
|
||||
start := uint64(time.Date(2024, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano())
|
||||
end := uint64(time.Date(2024, 6, 5, 0, 0, 0, 0, time.UTC).UnixNano())
|
||||
|
||||
stringKey := telemetrytypes.TelemetryFieldKey{
|
||||
Name: "http.method",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
}
|
||||
numberKey := telemetrytypes.TelemetryFieldKey{
|
||||
Name: "http.status_code",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeNumber,
|
||||
}
|
||||
boolKey := telemetrytypes.TelemetryFieldKey{
|
||||
Name: "request.success",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeBool,
|
||||
}
|
||||
// http.route has a real materialized ($$) physical column.
|
||||
legacyMaterializedKey := telemetrytypes.TelemetryFieldKey{
|
||||
Name: "http.route",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
Materialized: true,
|
||||
}
|
||||
promotedStringKey := telemetrytypes.TelemetryFieldKey{
|
||||
Name: "http.method",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
Promoted: true,
|
||||
}
|
||||
promotedNumberKey := telemetrytypes.TelemetryFieldKey{
|
||||
Name: "http.status_code",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeNumber,
|
||||
Promoted: true,
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
key telemetrytypes.TelemetryFieldKey
|
||||
filterIntent bool
|
||||
expectedResult string
|
||||
}{
|
||||
{
|
||||
name: "select/group by uses attributes JSON with map fallback - string",
|
||||
key: stringKey,
|
||||
expectedResult: "multiIf(attributes.`http.method` IS NOT NULL, attributes.`http.method`::String, attributes_string['http.method'])",
|
||||
},
|
||||
{
|
||||
name: "select/group by uses attributes JSON with map fallback - number",
|
||||
key: numberKey,
|
||||
expectedResult: "multiIf(attributes.`http.status_code` IS NOT NULL, attributes.`http.status_code`::Float64, attributes_number['http.status_code'])",
|
||||
},
|
||||
{
|
||||
name: "select/group by uses attributes JSON with map fallback - bool",
|
||||
key: boolKey,
|
||||
expectedResult: "multiIf(attributes.`request.success` IS NOT NULL, attributes.`request.success`::Bool, attributes_bool['request.success'])",
|
||||
},
|
||||
{
|
||||
name: "promoted string key tries attributes_promoted first",
|
||||
key: promotedStringKey,
|
||||
expectedResult: "multiIf(attributes_promoted.`http.method` IS NOT NULL, attributes_promoted.`http.method`::String, attributes.`http.method` IS NOT NULL, attributes.`http.method`::String, attributes_string['http.method'])",
|
||||
},
|
||||
{
|
||||
name: "promoted number key tries attributes_promoted first",
|
||||
key: promotedNumberKey,
|
||||
expectedResult: "multiIf(attributes_promoted.`http.status_code` IS NOT NULL, attributes_promoted.`http.status_code`::Float64, attributes.`http.status_code` IS NOT NULL, attributes.`http.status_code`::Float64, attributes_number['http.status_code'])",
|
||||
},
|
||||
{
|
||||
name: "filter intent stays on map column",
|
||||
key: stringKey,
|
||||
filterIntent: true,
|
||||
expectedResult: "attributes_string['http.method']",
|
||||
},
|
||||
{
|
||||
name: "legacy materialized key keeps physical column",
|
||||
key: legacyMaterializedKey,
|
||||
expectedResult: "`attribute_string_http$$route`",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
fm := NewFieldMapper()
|
||||
ctx := context.Background()
|
||||
if tc.filterIntent {
|
||||
ctx = qbtypes.WithFilterIntent(ctx)
|
||||
}
|
||||
key := tc.key
|
||||
result, err := fm.FieldFor(ctx, start, end, &key)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, tc.expectedResult, result)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -235,7 +235,7 @@ func TestStatementBuilder(t *testing.T) {
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ? GROUP BY fingerprint), __limit_cte AS (SELECT toString(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS `service.name`, sum(multiIf(mapContains(attributes_number, 'metric.max_count') = ?, toFloat64(attributes_number['metric.max_count']), NULL)) AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY `service.name` ORDER BY __result_0 desc LIMIT ?) SELECT toStartOfInterval(timestamp, INTERVAL 30 SECOND) AS ts, toString(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS `service.name`, sum(multiIf(mapContains(attributes_number, 'metric.max_count') = ?, toFloat64(attributes_number['metric.max_count']), NULL)) AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) GLOBAL IN (SELECT `service.name` FROM __limit_cte) GROUP BY ts, `service.name` ORDER BY ts desc",
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ? GROUP BY fingerprint), __limit_cte AS (SELECT toString(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS `service.name`, sum(multiIf(mapContains(attributes_number, 'metric.max_count') = ?, toFloat64(multiIf(attributes.`metric.max_count` IS NOT NULL, attributes.`metric.max_count`::Float64, attributes_number['metric.max_count'])), NULL)) AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY `service.name` ORDER BY __result_0 desc LIMIT ?) SELECT toStartOfInterval(timestamp, INTERVAL 30 SECOND) AS ts, toString(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS `service.name`, sum(multiIf(mapContains(attributes_number, 'metric.max_count') = ?, toFloat64(multiIf(attributes.`metric.max_count` IS NOT NULL, attributes.`metric.max_count`::Float64, attributes_number['metric.max_count'])), NULL)) AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) GLOBAL IN (SELECT `service.name` FROM __limit_cte) GROUP BY ts, `service.name` ORDER BY ts desc",
|
||||
Args: []any{"redis-manual", "%service.name%", "%service.name\":\"redis-manual%", uint64(1747945619), uint64(1747983448), true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10, true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448)},
|
||||
},
|
||||
expectedErr: nil,
|
||||
@@ -491,7 +491,7 @@ func TestStatementBuilderListQuery(t *testing.T) {
|
||||
Limit: 10,
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ? GROUP BY fingerprint) SELECT duration_nano AS `duration_nano`, name AS `name`, response_status_code AS `response_status_code`, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) AS `service.name`, span_id AS `span_id`, timestamp AS `timestamp`, trace_id AS `trace_id` FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? ORDER BY attributes_string['user.id'] AS `user.id` desc LIMIT ?",
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ? GROUP BY fingerprint) SELECT duration_nano AS `duration_nano`, name AS `name`, response_status_code AS `response_status_code`, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) AS `service.name`, span_id AS `span_id`, timestamp AS `timestamp`, trace_id AS `trace_id` FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? ORDER BY multiIf(attributes.`user.id` IS NOT NULL, attributes.`user.id`::String, attributes_string['user.id']) AS `user.id` desc LIMIT ?",
|
||||
Args: []any{"redis-manual", "%service.name%", "%service.name\":\"redis-manual%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
|
||||
@@ -732,5 +732,5 @@ func TestTraceOperatorStatementBuilderDeduplicatesKeys(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Contains(t, q.Query,
|
||||
"SELECT toString(multiIf(mapContains(attributes_string, 'http.method') = ?, attributes_string['http.method'], NULL)) AS `http.method`, count() AS __result_0 FROM A GROUP BY `http.method` ORDER BY __result_0 DESC")
|
||||
"SELECT toString(multiIf(mapContains(attributes_string, 'http.method') = ?, multiIf(attributes.`http.method` IS NOT NULL, attributes.`http.method`::String, attributes_string['http.method']), NULL)) AS `http.method`, count() AS __result_0 FROM A GROUP BY `http.method` ORDER BY __result_0 DESC")
|
||||
}
|
||||
|
||||
15
pkg/types/querybuildertypes/querybuildertypesv5/context.go
Normal file
15
pkg/types/querybuildertypes/querybuildertypesv5/context.go
Normal file
@@ -0,0 +1,15 @@
|
||||
package querybuildertypesv5
|
||||
|
||||
import "context"
|
||||
|
||||
// filterIntentKey is the context key used to mark that usage is for a filter (WHERE) clause
|
||||
type filterIntentKey struct{}
|
||||
|
||||
func WithFilterIntent(ctx context.Context) context.Context {
|
||||
return context.WithValue(ctx, filterIntentKey{}, true)
|
||||
}
|
||||
|
||||
func IsFilterIntent(ctx context.Context) bool {
|
||||
v, ok := ctx.Value(filterIntentKey{}).(bool)
|
||||
return ok && v
|
||||
}
|
||||
@@ -40,6 +40,7 @@ type TelemetryFieldKey struct {
|
||||
JSONPlan JSONAccessPlan `json:"-"`
|
||||
Indexes []TelemetryFieldKeySkipIndex `json:"-"`
|
||||
Materialized bool `json:"-"` // refers to promoted in case of body.... fields
|
||||
Promoted bool `json:"-"`
|
||||
|
||||
Evolutions []*EvolutionEntry `json:"-"`
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user