Compare commits

...

2 Commits

Author SHA1 Message Date
Nikhil Soni
9bb3f86a8a chore: fix formating 2026-06-23 23:33:27 +05:30
Nikhil Soni
d3a3b61716 feat: use json trace attribute for select/group by clause 2026-06-23 18:48:04 +05:30
7 changed files with 151 additions and 9 deletions

View File

@@ -41,6 +41,9 @@ func (c *conditionBuilder) conditionFor(
value = querybuilder.FormatValueForContains(value)
}
// For filters, field mapper keeps WHERE on the map columns instead of JSON column.
ctx = qbtypes.WithFilterIntent(ctx)
// first, locate the raw column type (so we can choose the right EXISTS logic)
columns, err := c.fm.ColumnFor(ctx, startNs, endNs, key)
if err != nil {

View File

@@ -4,6 +4,11 @@ import (
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
const (
ColumnAttributes = "attributes"
ColumnAttributesPromoted = "attributes_promoted"
)
var (
IntrinsicFields = map[string]telemetrytypes.TelemetryFieldKey{
"trace_id": {

View File

@@ -46,6 +46,10 @@ var (
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
ValueType: schema.ColumnTypeBool,
}},
ColumnAttributes: {Name: ColumnAttributes, Type: schema.JSONColumnType{}},
ColumnAttributesPromoted: {Name: ColumnAttributesPromoted, Type: schema.JSONColumnType{}},
"resources_string": {Name: "resources_string", Type: schema.MapColumnType{
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
ValueType: schema.ColumnTypeString,
@@ -167,6 +171,40 @@ func NewFieldMapper() *defaultFieldMapper {
return &defaultFieldMapper{}
}
// chJSONTypeAndMapColumn returns the ClickHouse dynamic type used to read an
// attribute value out of a JSON column and the corresponding map column used as fallback
func chJSONTypeAndMapColumn(dt telemetrytypes.FieldDataType) (chType string, mapColumn string, ok bool) {
switch dt {
case telemetrytypes.FieldDataTypeString:
return "String", "attributes_string", true
case telemetrytypes.FieldDataTypeInt64,
telemetrytypes.FieldDataTypeFloat64,
telemetrytypes.FieldDataTypeNumber:
return "Float64", "attributes_number", true
case telemetrytypes.FieldDataTypeBool:
return "Bool", "attributes_bool", true
}
return "", "", false
}
// buildAttributeJSONExpr builds a value expression for an attribute key that
// prefers the JSON columns and falls back to the map column
//
// TODO: once promoted-path metadata is wired, prepend an
// attributes_promoted.`k` branch for promoted keys. The map fallback branch
// also lets the existing Evolutions mechanism prune to a single column once
// evolution entries are populated for these columns.
func buildAttributeJSONExpr(key *telemetrytypes.TelemetryFieldKey) (string, bool) {
chType, mapColumn, ok := chJSONTypeAndMapColumn(key.FieldDataType)
if !ok {
return "", false
}
jsonPath := fmt.Sprintf("%s.`%s`", ColumnAttributes, key.Name)
mapAccess := fmt.Sprintf("%s['%s']", mapColumn, key.Name)
expr := fmt.Sprintf("multiIf(%s IS NOT NULL, %s::%s, %s)", jsonPath, jsonPath, chType, mapAccess)
return expr, true
}
func (m *defaultFieldMapper) getColumn(
_ context.Context,
_, _ uint64,
@@ -250,6 +288,12 @@ func (m *defaultFieldMapper) FieldFor(
return key.Name, nil
}
if key.FieldContext == telemetrytypes.FieldContextAttribute && !key.Materialized && !qbtypes.IsFilterIntent(ctx) {
if expr, ok := buildAttributeJSONExpr(key, key.Promoted); ok {
return expr, nil
}
}
columns, err := m.getColumn(ctx, startNs, endNs, key)
if err != nil {
return "", err

View File

@@ -31,33 +31,33 @@ func TestGetFieldKeyName(t *testing.T) {
expectedError: nil,
},
{
name: "Map column type - string attribute",
name: "Attribute - string attribute uses JSON with map fallback",
key: telemetrytypes.TelemetryFieldKey{
Name: "user.id",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
expectedResult: "attributes_string['user.id']",
expectedResult: "multiIf(attributes.`user.id` IS NOT NULL, attributes.`user.id`::String, attributes_string['user.id'])",
expectedError: nil,
},
{
name: "Map column type - number attribute",
name: "Attribute - number attribute uses JSON with map fallback",
key: telemetrytypes.TelemetryFieldKey{
Name: "request.size",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeNumber,
},
expectedResult: "attributes_number['request.size']",
expectedResult: "multiIf(attributes.`request.size` IS NOT NULL, attributes.`request.size`::Float64, attributes_number['request.size'])",
expectedError: nil,
},
{
name: "Map column type - bool attribute",
name: "Attribute - bool attribute uses JSON with map fallback",
key: telemetrytypes.TelemetryFieldKey{
Name: "request.success",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeBool,
},
expectedResult: "attributes_bool['request.success']",
expectedResult: "multiIf(attributes.`request.success` IS NOT NULL, attributes.`request.success`::Bool, attributes_bool['request.success'])",
expectedError: nil,
},
{
@@ -190,3 +190,78 @@ func TestFieldForResourceWithEvolution(t *testing.T) {
})
}
}
func TestFieldForAttributeJSON(t *testing.T) {
start := uint64(time.Date(2024, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano())
end := uint64(time.Date(2024, 6, 5, 0, 0, 0, 0, time.UTC).UnixNano())
stringKey := telemetrytypes.TelemetryFieldKey{
Name: "http.method",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeString,
}
numberKey := telemetrytypes.TelemetryFieldKey{
Name: "http.status_code",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeNumber,
}
boolKey := telemetrytypes.TelemetryFieldKey{
Name: "request.success",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeBool,
}
materializedKey := telemetrytypes.TelemetryFieldKey{
Name: "http.method",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeString,
Materialized: true,
}
testCases := []struct {
name string
key telemetrytypes.TelemetryFieldKey
filterIntent bool
expectedResult string
}{
{
name: "select/group by uses attributes JSON with map fallback - string",
key: stringKey,
expectedResult: "multiIf(attributes.`http.method` IS NOT NULL, attributes.`http.method`::String, attributes_string['http.method'])",
},
{
name: "select/group by uses attributes JSON with map fallback - number",
key: numberKey,
expectedResult: "multiIf(attributes.`http.status_code` IS NOT NULL, attributes.`http.status_code`::Float64, attributes_number['http.status_code'])",
},
{
name: "select/group by uses attributes JSON with map fallback - bool",
key: boolKey,
expectedResult: "multiIf(attributes.`request.success` IS NOT NULL, attributes.`request.success`::Bool, attributes_bool['request.success'])",
},
{
name: "filter intent stays on map column",
key: stringKey,
filterIntent: true,
expectedResult: "attributes_string['http.method']",
},
{
name: "materialized key keeps legacy physical column",
key: materializedKey,
expectedResult: "`attribute_string_http$$method`",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
fm := NewFieldMapper()
ctx := context.Background()
if tc.filterIntent {
ctx = qbtypes.WithFilterIntent(ctx)
}
key := tc.key
result, err := fm.FieldFor(ctx, start, end, &key)
require.NoError(t, err)
assert.Equal(t, tc.expectedResult, result)
})
}
}

View File

@@ -235,7 +235,7 @@ func TestStatementBuilder(t *testing.T) {
},
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ? GROUP BY fingerprint), __limit_cte AS (SELECT toString(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS `service.name`, sum(multiIf(mapContains(attributes_number, 'metric.max_count') = ?, toFloat64(attributes_number['metric.max_count']), NULL)) AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY `service.name` ORDER BY __result_0 desc LIMIT ?) SELECT toStartOfInterval(timestamp, INTERVAL 30 SECOND) AS ts, toString(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS `service.name`, sum(multiIf(mapContains(attributes_number, 'metric.max_count') = ?, toFloat64(attributes_number['metric.max_count']), NULL)) AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) GLOBAL IN (SELECT `service.name` FROM __limit_cte) GROUP BY ts, `service.name` ORDER BY ts desc",
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ? GROUP BY fingerprint), __limit_cte AS (SELECT toString(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS `service.name`, sum(multiIf(mapContains(attributes_number, 'metric.max_count') = ?, toFloat64(multiIf(attributes.`metric.max_count` IS NOT NULL, attributes.`metric.max_count`::Float64, attributes_number['metric.max_count'])), NULL)) AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY `service.name` ORDER BY __result_0 desc LIMIT ?) SELECT toStartOfInterval(timestamp, INTERVAL 30 SECOND) AS ts, toString(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS `service.name`, sum(multiIf(mapContains(attributes_number, 'metric.max_count') = ?, toFloat64(multiIf(attributes.`metric.max_count` IS NOT NULL, attributes.`metric.max_count`::Float64, attributes_number['metric.max_count'])), NULL)) AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) GLOBAL IN (SELECT `service.name` FROM __limit_cte) GROUP BY ts, `service.name` ORDER BY ts desc",
Args: []any{"redis-manual", "%service.name%", "%service.name\":\"redis-manual%", uint64(1747945619), uint64(1747983448), true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10, true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448)},
},
expectedErr: nil,
@@ -491,7 +491,7 @@ func TestStatementBuilderListQuery(t *testing.T) {
Limit: 10,
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ? GROUP BY fingerprint) SELECT duration_nano AS `duration_nano`, name AS `name`, response_status_code AS `response_status_code`, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) AS `service.name`, span_id AS `span_id`, timestamp AS `timestamp`, trace_id AS `trace_id` FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? ORDER BY attributes_string['user.id'] AS `user.id` desc LIMIT ?",
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ? GROUP BY fingerprint) SELECT duration_nano AS `duration_nano`, name AS `name`, response_status_code AS `response_status_code`, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) AS `service.name`, span_id AS `span_id`, timestamp AS `timestamp`, trace_id AS `trace_id` FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? ORDER BY multiIf(attributes.`user.id` IS NOT NULL, attributes.`user.id`::String, attributes_string['user.id']) AS `user.id` desc LIMIT ?",
Args: []any{"redis-manual", "%service.name%", "%service.name\":\"redis-manual%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
},
expectedErr: nil,

View File

@@ -732,5 +732,5 @@ func TestTraceOperatorStatementBuilderDeduplicatesKeys(t *testing.T) {
require.NoError(t, err)
require.Contains(t, q.Query,
"SELECT toString(multiIf(mapContains(attributes_string, 'http.method') = ?, attributes_string['http.method'], NULL)) AS `http.method`, count() AS __result_0 FROM A GROUP BY `http.method` ORDER BY __result_0 DESC")
"SELECT toString(multiIf(mapContains(attributes_string, 'http.method') = ?, multiIf(attributes.`http.method` IS NOT NULL, attributes.`http.method`::String, attributes_string['http.method']), NULL)) AS `http.method`, count() AS __result_0 FROM A GROUP BY `http.method` ORDER BY __result_0 DESC")
}

View File

@@ -0,0 +1,15 @@
package querybuildertypesv5
import "context"
// filterIntentKey is the context key used to mark that usage is for a filter (WHERE) clause
type filterIntentKey struct{}
func WithFilterIntent(ctx context.Context) context.Context {
return context.WithValue(ctx, filterIntentKey{}, true)
}
func IsFilterIntent(ctx context.Context) bool {
v, ok := ctx.Value(filterIntentKey{}).(bool)
return ok && v
}