Compare commits

...

3 Commits

Author SHA1 Message Date
Nikhil Soni
1d8a7429c5 feat: support promoted path in trace attributes queries 2026-06-23 23:36:22 +05:30
Nikhil Soni
9bb3f86a8a chore: fix formating 2026-06-23 23:33:27 +05:30
Nikhil Soni
d3a3b61716 feat: use json trace attribute for select/group by clause 2026-06-23 18:48:04 +05:30
9 changed files with 197 additions and 9 deletions

View File

@@ -2418,6 +2418,14 @@ func (k *telemetryMetaStore) updateColumnEvolutionMetadataForKeys(ctx context.Co
selector.FieldName = key.Name
if keyEvolutions, ok := evolutionsByUniqueKey[selector.QualifiedName()]; ok {
keysToUpdate[i].Evolutions = keyEvolutions
for _, evolution := range keyEvolutions {
if evolution.ColumnName == telemetrytraces.ColumnAttributesPromoted {
// field has been promoted into the dedicated JSON sub-column.
keysToUpdate[i].Promoted = true
break
}
}
}
}
}

View File

@@ -41,6 +41,9 @@ func (c *conditionBuilder) conditionFor(
value = querybuilder.FormatValueForContains(value)
}
// For filters, field mapper keeps WHERE on the map columns instead of JSON column.
ctx = qbtypes.WithFilterIntent(ctx)
// first, locate the raw column type (so we can choose the right EXISTS logic)
columns, err := c.fm.ColumnFor(ctx, startNs, endNs, key)
if err != nil {

View File

@@ -4,6 +4,11 @@ import (
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
const (
ColumnAttributes = "attributes"
ColumnAttributesPromoted = "attributes_promoted"
)
var (
IntrinsicFields = map[string]telemetrytypes.TelemetryFieldKey{
"trace_id": {

View File

@@ -46,6 +46,10 @@ var (
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
ValueType: schema.ColumnTypeBool,
}},
ColumnAttributes: {Name: ColumnAttributes, Type: schema.JSONColumnType{}},
ColumnAttributesPromoted: {Name: ColumnAttributesPromoted, Type: schema.JSONColumnType{}},
"resources_string": {Name: "resources_string", Type: schema.MapColumnType{
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
ValueType: schema.ColumnTypeString,
@@ -167,6 +171,54 @@ func NewFieldMapper() *defaultFieldMapper {
return &defaultFieldMapper{}
}
// chJSONTypeAndMapColumn returns the ClickHouse dynamic type used to read an
// attribute value out of a JSON column and the corresponding map column used as fallback
func chJSONTypeAndMapColumn(dt telemetrytypes.FieldDataType) (chType string, mapColumn string, ok bool) {
switch dt {
case telemetrytypes.FieldDataTypeString:
return "String", "attributes_string", true
case telemetrytypes.FieldDataTypeInt64,
telemetrytypes.FieldDataTypeFloat64,
telemetrytypes.FieldDataTypeNumber:
return "Float64", "attributes_number", true
case telemetrytypes.FieldDataTypeBool:
return "Bool", "attributes_bool", true
}
return "", "", false
}
// buildAttributeJSONExpr builds a value expression for an attribute key that
// prefers the JSON columns and falls back to the map column. When promoted is
// true the attributes_promoted sub-column (dedicated, fastest) is tried first:
//
// promoted: multiIf(attributes_promoted.`k` IS NOT NULL, attributes_promoted.`k`::T,
// attributes.`k` IS NOT NULL, attributes.`k`::T, attributes_<t>['k'])
// non-promoted: multiIf(attributes.`k` IS NOT NULL, attributes.`k`::T, attributes_<t>['k'])
func buildAttributeJSONExpr(key *telemetrytypes.TelemetryFieldKey, promoted bool) (string, bool) {
chType, mapColumn, ok := chJSONTypeAndMapColumn(key.FieldDataType)
if !ok {
return "", false
}
branches := []string{}
if promoted {
promotedPath := fmt.Sprintf("%s.`%s`", ColumnAttributesPromoted, key.Name)
branches = append(branches,
fmt.Sprintf("%s IS NOT NULL", promotedPath),
fmt.Sprintf("%s::%s", promotedPath, chType),
)
}
jsonPath := fmt.Sprintf("%s.`%s`", ColumnAttributes, key.Name)
branches = append(branches,
fmt.Sprintf("%s IS NOT NULL", jsonPath),
fmt.Sprintf("%s::%s", jsonPath, chType),
)
mapAccess := fmt.Sprintf("%s['%s']", mapColumn, key.Name)
expr := fmt.Sprintf("multiIf(%s, %s)", strings.Join(branches, ", "), mapAccess)
return expr, true
}
func (m *defaultFieldMapper) getColumn(
_ context.Context,
_, _ uint64,
@@ -250,6 +302,12 @@ func (m *defaultFieldMapper) FieldFor(
return key.Name, nil
}
if key.FieldContext == telemetrytypes.FieldContextAttribute && !key.Materialized && !qbtypes.IsFilterIntent(ctx) {
if expr, ok := buildAttributeJSONExpr(key, key.Promoted); ok {
return expr, nil
}
}
columns, err := m.getColumn(ctx, startNs, endNs, key)
if err != nil {
return "", err

View File

@@ -31,33 +31,33 @@ func TestGetFieldKeyName(t *testing.T) {
expectedError: nil,
},
{
name: "Map column type - string attribute",
name: "Attribute - string attribute uses JSON with map fallback",
key: telemetrytypes.TelemetryFieldKey{
Name: "user.id",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
expectedResult: "attributes_string['user.id']",
expectedResult: "multiIf(attributes.`user.id` IS NOT NULL, attributes.`user.id`::String, attributes_string['user.id'])",
expectedError: nil,
},
{
name: "Map column type - number attribute",
name: "Attribute - number attribute uses JSON with map fallback",
key: telemetrytypes.TelemetryFieldKey{
Name: "request.size",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeNumber,
},
expectedResult: "attributes_number['request.size']",
expectedResult: "multiIf(attributes.`request.size` IS NOT NULL, attributes.`request.size`::Float64, attributes_number['request.size'])",
expectedError: nil,
},
{
name: "Map column type - bool attribute",
name: "Attribute - bool attribute uses JSON with map fallback",
key: telemetrytypes.TelemetryFieldKey{
Name: "request.success",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeBool,
},
expectedResult: "attributes_bool['request.success']",
expectedResult: "multiIf(attributes.`request.success` IS NOT NULL, attributes.`request.success`::Bool, attributes_bool['request.success'])",
expectedError: nil,
},
{
@@ -190,3 +190,101 @@ func TestFieldForResourceWithEvolution(t *testing.T) {
})
}
}
func TestFieldForAttributeJSON(t *testing.T) {
start := uint64(time.Date(2024, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano())
end := uint64(time.Date(2024, 6, 5, 0, 0, 0, 0, time.UTC).UnixNano())
stringKey := telemetrytypes.TelemetryFieldKey{
Name: "http.method",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeString,
}
numberKey := telemetrytypes.TelemetryFieldKey{
Name: "http.status_code",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeNumber,
}
boolKey := telemetrytypes.TelemetryFieldKey{
Name: "request.success",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeBool,
}
// http.route has a real materialized ($$) physical column.
legacyMaterializedKey := telemetrytypes.TelemetryFieldKey{
Name: "http.route",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeString,
Materialized: true,
}
promotedStringKey := telemetrytypes.TelemetryFieldKey{
Name: "http.method",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeString,
Promoted: true,
}
promotedNumberKey := telemetrytypes.TelemetryFieldKey{
Name: "http.status_code",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeNumber,
Promoted: true,
}
testCases := []struct {
name string
key telemetrytypes.TelemetryFieldKey
filterIntent bool
expectedResult string
}{
{
name: "select/group by uses attributes JSON with map fallback - string",
key: stringKey,
expectedResult: "multiIf(attributes.`http.method` IS NOT NULL, attributes.`http.method`::String, attributes_string['http.method'])",
},
{
name: "select/group by uses attributes JSON with map fallback - number",
key: numberKey,
expectedResult: "multiIf(attributes.`http.status_code` IS NOT NULL, attributes.`http.status_code`::Float64, attributes_number['http.status_code'])",
},
{
name: "select/group by uses attributes JSON with map fallback - bool",
key: boolKey,
expectedResult: "multiIf(attributes.`request.success` IS NOT NULL, attributes.`request.success`::Bool, attributes_bool['request.success'])",
},
{
name: "promoted string key tries attributes_promoted first",
key: promotedStringKey,
expectedResult: "multiIf(attributes_promoted.`http.method` IS NOT NULL, attributes_promoted.`http.method`::String, attributes.`http.method` IS NOT NULL, attributes.`http.method`::String, attributes_string['http.method'])",
},
{
name: "promoted number key tries attributes_promoted first",
key: promotedNumberKey,
expectedResult: "multiIf(attributes_promoted.`http.status_code` IS NOT NULL, attributes_promoted.`http.status_code`::Float64, attributes.`http.status_code` IS NOT NULL, attributes.`http.status_code`::Float64, attributes_number['http.status_code'])",
},
{
name: "filter intent stays on map column",
key: stringKey,
filterIntent: true,
expectedResult: "attributes_string['http.method']",
},
{
name: "legacy materialized key keeps physical column",
key: legacyMaterializedKey,
expectedResult: "`attribute_string_http$$route`",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
fm := NewFieldMapper()
ctx := context.Background()
if tc.filterIntent {
ctx = qbtypes.WithFilterIntent(ctx)
}
key := tc.key
result, err := fm.FieldFor(ctx, start, end, &key)
require.NoError(t, err)
assert.Equal(t, tc.expectedResult, result)
})
}
}

View File

@@ -235,7 +235,7 @@ func TestStatementBuilder(t *testing.T) {
},
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ? GROUP BY fingerprint), __limit_cte AS (SELECT toString(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS `service.name`, sum(multiIf(mapContains(attributes_number, 'metric.max_count') = ?, toFloat64(attributes_number['metric.max_count']), NULL)) AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY `service.name` ORDER BY __result_0 desc LIMIT ?) SELECT toStartOfInterval(timestamp, INTERVAL 30 SECOND) AS ts, toString(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS `service.name`, sum(multiIf(mapContains(attributes_number, 'metric.max_count') = ?, toFloat64(attributes_number['metric.max_count']), NULL)) AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) GLOBAL IN (SELECT `service.name` FROM __limit_cte) GROUP BY ts, `service.name` ORDER BY ts desc",
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ? GROUP BY fingerprint), __limit_cte AS (SELECT toString(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS `service.name`, sum(multiIf(mapContains(attributes_number, 'metric.max_count') = ?, toFloat64(multiIf(attributes.`metric.max_count` IS NOT NULL, attributes.`metric.max_count`::Float64, attributes_number['metric.max_count'])), NULL)) AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY `service.name` ORDER BY __result_0 desc LIMIT ?) SELECT toStartOfInterval(timestamp, INTERVAL 30 SECOND) AS ts, toString(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS `service.name`, sum(multiIf(mapContains(attributes_number, 'metric.max_count') = ?, toFloat64(multiIf(attributes.`metric.max_count` IS NOT NULL, attributes.`metric.max_count`::Float64, attributes_number['metric.max_count'])), NULL)) AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) GLOBAL IN (SELECT `service.name` FROM __limit_cte) GROUP BY ts, `service.name` ORDER BY ts desc",
Args: []any{"redis-manual", "%service.name%", "%service.name\":\"redis-manual%", uint64(1747945619), uint64(1747983448), true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10, true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448)},
},
expectedErr: nil,
@@ -491,7 +491,7 @@ func TestStatementBuilderListQuery(t *testing.T) {
Limit: 10,
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ? GROUP BY fingerprint) SELECT duration_nano AS `duration_nano`, name AS `name`, response_status_code AS `response_status_code`, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) AS `service.name`, span_id AS `span_id`, timestamp AS `timestamp`, trace_id AS `trace_id` FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? ORDER BY attributes_string['user.id'] AS `user.id` desc LIMIT ?",
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ? GROUP BY fingerprint) SELECT duration_nano AS `duration_nano`, name AS `name`, response_status_code AS `response_status_code`, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) AS `service.name`, span_id AS `span_id`, timestamp AS `timestamp`, trace_id AS `trace_id` FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? ORDER BY multiIf(attributes.`user.id` IS NOT NULL, attributes.`user.id`::String, attributes_string['user.id']) AS `user.id` desc LIMIT ?",
Args: []any{"redis-manual", "%service.name%", "%service.name\":\"redis-manual%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
},
expectedErr: nil,

View File

@@ -732,5 +732,5 @@ func TestTraceOperatorStatementBuilderDeduplicatesKeys(t *testing.T) {
require.NoError(t, err)
require.Contains(t, q.Query,
"SELECT toString(multiIf(mapContains(attributes_string, 'http.method') = ?, attributes_string['http.method'], NULL)) AS `http.method`, count() AS __result_0 FROM A GROUP BY `http.method` ORDER BY __result_0 DESC")
"SELECT toString(multiIf(mapContains(attributes_string, 'http.method') = ?, multiIf(attributes.`http.method` IS NOT NULL, attributes.`http.method`::String, attributes_string['http.method']), NULL)) AS `http.method`, count() AS __result_0 FROM A GROUP BY `http.method` ORDER BY __result_0 DESC")
}

View File

@@ -0,0 +1,15 @@
package querybuildertypesv5
import "context"
// filterIntentKey is the context key used to mark that usage is for a filter (WHERE) clause
type filterIntentKey struct{}
func WithFilterIntent(ctx context.Context) context.Context {
return context.WithValue(ctx, filterIntentKey{}, true)
}
func IsFilterIntent(ctx context.Context) bool {
v, ok := ctx.Value(filterIntentKey{}).(bool)
return ok && v
}

View File

@@ -40,6 +40,7 @@ type TelemetryFieldKey struct {
JSONPlan JSONAccessPlan `json:"-"`
Indexes []TelemetryFieldKeySkipIndex `json:"-"`
Materialized bool `json:"-"` // refers to promoted in case of body.... fields
Promoted bool `json:"-"`
Evolutions []*EvolutionEntry `json:"-"`
}