mirror of
https://github.com/SigNoz/signoz.git
synced 2026-02-12 12:32:04 +00:00
Compare commits
6 Commits
test/uplot
...
tvats-impr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
92249ddbb5 | ||
|
|
f6d8ccfc69 | ||
|
|
3025f11ea1 | ||
|
|
cc6413349e | ||
|
|
4092913d39 | ||
|
|
642c75d002 |
@@ -34,7 +34,7 @@ const (
|
||||
LogsV2AttributesBoolColumn = "attributes_bool"
|
||||
LogsV2ResourcesStringColumn = "resources_string"
|
||||
LogsV2ScopeStringColumn = "scope_string"
|
||||
|
||||
LogsV2ResourceColumn = "resource"
|
||||
BodyJSONColumnPrefix = constants.BodyJSONColumnPrefix
|
||||
BodyPromotedColumnPrefix = constants.BodyPromotedColumnPrefix
|
||||
ArraySep = jsontypeexporter.ArraySeparator
|
||||
|
||||
@@ -100,6 +100,11 @@ func (m *fieldMapper) getColumn(_ context.Context, key *telemetrytypes.Telemetry
|
||||
return nil, qbtypes.ErrColumnNotFound
|
||||
}
|
||||
|
||||
// FieldFor returns the column expression for a given TelemetryFieldKey,
|
||||
// Example: "attributes_string['http.method']"
|
||||
//
|
||||
// Both FieldContext and FieldDataType must be specified in the TelemetryFieldKey.
|
||||
// Returns an error if the field cannot be resolved to a valid column expression.
|
||||
func (m *fieldMapper) FieldFor(ctx context.Context, key *telemetrytypes.TelemetryFieldKey) (string, error) {
|
||||
column, err := m.getColumn(ctx, key)
|
||||
if err != nil {
|
||||
@@ -155,51 +160,107 @@ func (m *fieldMapper) FieldFor(ctx context.Context, key *telemetrytypes.Telemetr
|
||||
return column.Name, nil
|
||||
}
|
||||
|
||||
// ColumnFor returns the schema.Column metadata for a given TelemetryFieldKey.
|
||||
//
|
||||
// Both FieldContext and FieldDataType must be specified in the TelemetryFieldKey.
|
||||
// Returns an error if the field cannot be resolved to a valid column.
|
||||
func (m *fieldMapper) ColumnFor(ctx context.Context, key *telemetrytypes.TelemetryFieldKey) (*schema.Column, error) {
|
||||
return m.getColumn(ctx, key)
|
||||
}
|
||||
|
||||
// ColumnExpressionFor returns an aliased expression for a given TelemetryFieldKey,
|
||||
// Example: "attributes_string['http.method'] AS `http.method`".
|
||||
//
|
||||
// It handles cases where the field key lacks sufficient context by attempting to
|
||||
// resolve the appropriate context from the provided keys map. If multiple contexts
|
||||
// are found for the same field name, it constructs a multiIf expression to select
|
||||
// the first non-empty value.
|
||||
//
|
||||
// This method is a wrapper over FieldFor to provide better error messages and aliasing.
|
||||
//
|
||||
// If no context is found, it returns an error optionally suggests a correction if a close match exists,
|
||||
func (m *fieldMapper) ColumnExpressionFor(
|
||||
ctx context.Context,
|
||||
field *telemetrytypes.TelemetryFieldKey,
|
||||
keys map[string][]*telemetrytypes.TelemetryFieldKey,
|
||||
) (string, error) {
|
||||
|
||||
populateMissingFieldContextAndDataType(keys, field)
|
||||
|
||||
colName, err := m.FieldFor(ctx, field)
|
||||
if errors.Is(err, qbtypes.ErrColumnNotFound) {
|
||||
// the key didn't have the right context to be added to the query
|
||||
// we try to use the context we know of
|
||||
keysForField := keys[field.Name]
|
||||
if len(keysForField) == 0 {
|
||||
// is it a static field?
|
||||
if _, ok := logsV2Columns[field.Name]; ok {
|
||||
// if it is, attach the column name directly
|
||||
field.FieldContext = telemetrytypes.FieldContextLog
|
||||
colName, _ = m.FieldFor(ctx, field)
|
||||
// no keys found for the field, suggest correction if possible
|
||||
correction, found := telemetrytypes.SuggestCorrection(field.Name, maps.Keys(keys))
|
||||
if found {
|
||||
// we found a close match, in the error message send the suggestion
|
||||
return "", errors.Wrap(err, errors.TypeInvalidInput, errors.CodeInvalidInput, correction)
|
||||
} else {
|
||||
// - the context is not provided
|
||||
// - there are not keys for the field
|
||||
// - it is not a static field
|
||||
// - the next best thing to do is see if there is a typo
|
||||
// and suggest a correction
|
||||
correction, found := telemetrytypes.SuggestCorrection(field.Name, maps.Keys(keys))
|
||||
if found {
|
||||
// we found a close match, in the error message send the suggestion
|
||||
return "", errors.Wrap(err, errors.TypeInvalidInput, errors.CodeInvalidInput, correction)
|
||||
} else {
|
||||
// not even a close match, return an error
|
||||
return "", errors.Wrapf(err, errors.TypeInvalidInput, errors.CodeInvalidInput, "field `%s` not found", field.Name)
|
||||
}
|
||||
// not even a close match, return an error
|
||||
return "", errors.Wrapf(err, errors.TypeInvalidInput, errors.CodeInvalidInput, "field `%s` not found", field.Name)
|
||||
}
|
||||
} else if len(keysForField) == 1 {
|
||||
// we have a single key for the field, use it
|
||||
colName, _ = m.FieldFor(ctx, keysForField[0])
|
||||
// This case is expected to be handled by populateMissingFieldContextAndDataType.
|
||||
// If we reach here, it indicates an internal inconsistency in field preprocessing.
|
||||
return "", errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "unexpected single key for field `%s`", field.Name)
|
||||
} else {
|
||||
// select any non-empty value from the keys
|
||||
args := []string{}
|
||||
for _, key := range keysForField {
|
||||
colName, _ = m.FieldFor(ctx, key)
|
||||
args = append(args, fmt.Sprintf("toString(%s) != '', toString(%s)", colName, colName))
|
||||
// check if either of context or data type matches
|
||||
if field.FieldContext != telemetrytypes.FieldContextUnspecified && key.FieldContext != field.FieldContext {
|
||||
continue
|
||||
}
|
||||
if field.FieldDataType != telemetrytypes.FieldDataTypeUnspecified && key.FieldDataType != field.FieldDataType {
|
||||
continue
|
||||
}
|
||||
|
||||
if key.Materialized {
|
||||
colName = telemetrytypes.FieldKeyToMaterializedColumnName(key)
|
||||
colNameExists := telemetrytypes.FieldKeyToMaterializedColumnNameForExists(key)
|
||||
args = append(args, fmt.Sprintf("%s==true, %s", colNameExists, colName))
|
||||
}
|
||||
|
||||
switch key.FieldContext {
|
||||
case telemetrytypes.FieldContextResource:
|
||||
args = append(args, fmt.Sprintf("%s.`%s` IS NOT NULL, %s.`%s`::String", LogsV2ResourceColumn, key.Name, LogsV2ResourceColumn, key.Name))
|
||||
args = append(args, fmt.Sprintf("mapContains(%s, '%s'), %s['%s']", LogsV2ResourcesStringColumn, key.Name, LogsV2ResourcesStringColumn, key.Name))
|
||||
case telemetrytypes.FieldContextAttribute:
|
||||
column, err := m.getColumn(ctx, key)
|
||||
// should not error out as we have created this key
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, errors.TypeInternal, errors.CodeInternal, err.Error())
|
||||
}
|
||||
args = append(args, fmt.Sprintf("mapContains(%s, '%s'), %s['%s']", column.Name, key.Name, column.Name, key.Name))
|
||||
|
||||
case telemetrytypes.FieldContextScope:
|
||||
column, err := m.getColumn(ctx, key)
|
||||
// should not error out as we have created this key
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, errors.TypeInternal, errors.CodeInternal, err.Error())
|
||||
}
|
||||
// Check if the column is a map type or a direct string column
|
||||
if column.Type == schema.ColumnTypeString {
|
||||
// For direct string columns like scope_name or scope_version
|
||||
args = append(args, fmt.Sprintf("%s != '', %s", column.Name, column.Name))
|
||||
} else {
|
||||
// For map columns like scope_string, use mapContains
|
||||
args = append(args, fmt.Sprintf("mapContains(%s, '%s'), %s['%s']", column.Name, key.Name, column.Name, key.Name))
|
||||
}
|
||||
|
||||
default:
|
||||
colName, err := m.FieldFor(ctx, key)
|
||||
// should not error out as we have created this key
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, errors.TypeInternal, errors.CodeInternal, err.Error())
|
||||
}
|
||||
args = append(args, fmt.Sprintf("toString(%s) != '', toString(%s)", colName, colName))
|
||||
}
|
||||
}
|
||||
if len(args) == 0 {
|
||||
return "", errors.Wrapf(err, errors.TypeInvalidInput, errors.CodeInvalidInput, "field `%s` not found", field.Name)
|
||||
}
|
||||
colName = fmt.Sprintf("multiIf(%s, NULL)", strings.Join(args, ", "))
|
||||
}
|
||||
@@ -207,3 +268,79 @@ func (m *fieldMapper) ColumnExpressionFor(
|
||||
|
||||
return fmt.Sprintf("%s AS `%s`", sqlbuilder.Escape(colName), field.Name), nil
|
||||
}
|
||||
|
||||
// populateMissingFieldContextAndDataType tries to populate missing FieldContext and FieldDataType from the keys map.
|
||||
func populateMissingFieldContextAndDataType(keys map[string][]*telemetrytypes.TelemetryFieldKey, field *telemetrytypes.TelemetryFieldKey) {
|
||||
|
||||
if field.FieldContext != telemetrytypes.FieldContextUnspecified && field.FieldDataType != telemetrytypes.FieldDataTypeUnspecified {
|
||||
// both context and data type are specified, nothing to do
|
||||
return
|
||||
}
|
||||
keysForField := keys[field.Name]
|
||||
if len(keysForField) == 0 {
|
||||
// Check if it's a top level static field
|
||||
if key, ok := logsV2Columns[field.Name]; ok {
|
||||
// if it is, populate context and data type
|
||||
field.FieldContext = telemetrytypes.FieldContextLog
|
||||
// infer data type from column type
|
||||
switch key.Type {
|
||||
case schema.ColumnTypeString,
|
||||
schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
schema.MapColumnType{
|
||||
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
ValueType: schema.ColumnTypeString,
|
||||
}:
|
||||
field.FieldDataType = telemetrytypes.FieldDataTypeString
|
||||
case schema.ColumnTypeUInt64,
|
||||
schema.ColumnTypeUInt32,
|
||||
schema.ColumnTypeUInt8,
|
||||
schema.MapColumnType{
|
||||
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
ValueType: schema.ColumnTypeFloat64,
|
||||
}:
|
||||
field.FieldDataType = telemetrytypes.FieldDataTypeNumber
|
||||
case schema.MapColumnType{
|
||||
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
ValueType: schema.ColumnTypeBool,
|
||||
}:
|
||||
field.FieldDataType = telemetrytypes.FieldDataTypeBool
|
||||
}
|
||||
return
|
||||
}
|
||||
// Check if it's a body JSON search
|
||||
if strings.HasPrefix(field.Name, BodyJSONStringSearchPrefix) {
|
||||
field.FieldContext = telemetrytypes.FieldContextLog
|
||||
field.FieldDataType = telemetrytypes.FieldDataTypeString
|
||||
return
|
||||
}
|
||||
// no keys for the field, nothing to do
|
||||
return
|
||||
} else if len(keysForField) == 1 {
|
||||
// we have a single key for the field, use it
|
||||
field.FieldContext = keysForField[0].FieldContext
|
||||
field.FieldDataType = keysForField[0].FieldDataType
|
||||
} else {
|
||||
// multiple keys found with same name,
|
||||
// filter out the ones which match the provided context or data type
|
||||
filteredKeysForField := make([]*telemetrytypes.TelemetryFieldKey, 0, len(keysForField))
|
||||
for _, key := range keysForField {
|
||||
// 1. if both context and data type are unspecified, consider this key
|
||||
// 2. if context matches, consider this key
|
||||
// 3. if data type matches, consider this key
|
||||
if (field.FieldContext == telemetrytypes.FieldContextUnspecified && field.FieldDataType == telemetrytypes.FieldDataTypeUnspecified) ||
|
||||
key.FieldContext == field.FieldContext ||
|
||||
key.FieldDataType == field.FieldDataType {
|
||||
filteredKeysForField = append(filteredKeysForField, key)
|
||||
}
|
||||
}
|
||||
|
||||
// if we have a single match, use it
|
||||
if len(filteredKeysForField) == 1 {
|
||||
field.FieldContext = filteredKeysForField[0].FieldContext
|
||||
field.FieldDataType = filteredKeysForField[0].FieldDataType
|
||||
return
|
||||
}
|
||||
// Should we give priority to top level fields here?
|
||||
// unable to disambiguate, for now leave as is
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,13 +5,14 @@ import (
|
||||
"testing"
|
||||
|
||||
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestGetColumn(t *testing.T) {
|
||||
func TestColumnFor(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
testCases := []struct {
|
||||
@@ -180,7 +181,7 @@ func TestGetColumn(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetFieldKeyName(t *testing.T) {
|
||||
func TestFieldFor(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
testCases := []struct {
|
||||
@@ -208,6 +209,15 @@ func TestGetFieldKeyName(t *testing.T) {
|
||||
expectedResult: "attributes_string['user.id']",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Map column type - dataType missing attribute",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "user.id",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
},
|
||||
expectedResult: "",
|
||||
expectedError: qbtypes.ErrColumnNotFound,
|
||||
},
|
||||
{
|
||||
name: "Map column type - number attribute",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
@@ -273,3 +283,242 @@ func TestGetFieldKeyName(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestColumnExpressionFor(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
key telemetrytypes.TelemetryFieldKey
|
||||
keys map[string][]*telemetrytypes.TelemetryFieldKey
|
||||
expectedResult string
|
||||
expectedError error
|
||||
}{
|
||||
{
|
||||
name: "Simple column type - timestamp",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "timestamp",
|
||||
FieldContext: telemetrytypes.FieldContextLog,
|
||||
},
|
||||
keys: nil,
|
||||
expectedResult: "timestamp AS `timestamp`",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Simple column type - timestamp with explicit datatype",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "timestamp",
|
||||
FieldContext: telemetrytypes.FieldContextLog,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
keys: nil,
|
||||
expectedResult: "timestamp AS `timestamp`",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Map column type - string attribute",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "user.id",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
keys: nil,
|
||||
expectedResult: "attributes_string['user.id'] AS `user.id`",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Map column type - number attribute",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "request.size",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeNumber,
|
||||
},
|
||||
keys: nil,
|
||||
expectedResult: "attributes_number['request.size'] AS `request.size`",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Map column type - bool attribute",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "request.success",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeBool,
|
||||
},
|
||||
keys: nil,
|
||||
expectedResult: "attributes_bool['request.success'] AS `request.success`",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Map column type - resource attribute",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
keys: nil,
|
||||
expectedResult: "multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) AS `service.name`",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Map column type - resource attribute - materialized",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
Materialized: true,
|
||||
},
|
||||
keys: nil,
|
||||
expectedResult: "multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, `resource_string_service$$$$name_exists`==true, `resource_string_service$$$$name`, NULL) AS `service.name`",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Scope field - scope name",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "name",
|
||||
FieldContext: telemetrytypes.FieldContextScope,
|
||||
},
|
||||
keys: nil,
|
||||
expectedResult: "scope_name AS `name`",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Field with missing context fallback to keys",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "http.method",
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
keys: map[string][]*telemetrytypes.TelemetryFieldKey{
|
||||
"http.method": {
|
||||
{
|
||||
Name: "http.method",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedResult: "attributes_string['http.method'] AS `http.method`",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Field with missing context fallback to keys with multiple context options",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "http.status_code",
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
keys: map[string][]*telemetrytypes.TelemetryFieldKey{
|
||||
"http.status_code": {
|
||||
{
|
||||
Name: "http.status_code",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
{
|
||||
Name: "http.status_code",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedResult: "multiIf(mapContains(attributes_string, 'http.status_code'), attributes_string['http.status_code'], resource.`http.status_code` IS NOT NULL, resource.`http.status_code`::String, mapContains(resources_string, 'http.status_code'), resources_string['http.status_code'], NULL) AS `http.status_code`",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Field with missing context fallback to keys with multiple dataType with data type provided",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "http.status_code",
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
keys: map[string][]*telemetrytypes.TelemetryFieldKey{
|
||||
"http.status_code": {
|
||||
{
|
||||
Name: "http.status_code",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
{
|
||||
Name: "http.status_code",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeFloat64,
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedResult: "attributes_string['http.status_code'] AS `http.status_code`",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Field with missing context fallback to keys with multiple dataType",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "http.status_code",
|
||||
},
|
||||
keys: map[string][]*telemetrytypes.TelemetryFieldKey{
|
||||
"http.status_code": {
|
||||
{
|
||||
Name: "http.status_code",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
{
|
||||
Name: "http.status_code",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeFloat64,
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedResult: "multiIf(mapContains(attributes_string, 'http.status_code'), attributes_string['http.status_code'], mapContains(attributes_number, 'http.status_code'), attributes_number['http.status_code'], NULL) AS `http.status_code`",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Field with missing context fallback to keys with multiple context and dataType options",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "http.status_code",
|
||||
},
|
||||
keys: map[string][]*telemetrytypes.TelemetryFieldKey{
|
||||
"http.status_code": {
|
||||
{
|
||||
Name: "http.status_code",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
{
|
||||
Name: "http.status_code",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeFloat64,
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedResult: "multiIf(mapContains(attributes_string, 'http.status_code'), attributes_string['http.status_code'], resource.`http.status_code` IS NOT NULL, resource.`http.status_code`::String, mapContains(resources_string, 'http.status_code'), resources_string['http.status_code'], NULL) AS `http.status_code`",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Non-existent column",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "nonexistent_field",
|
||||
},
|
||||
keys: map[string][]*telemetrytypes.TelemetryFieldKey{
|
||||
"existent_field": {
|
||||
{
|
||||
Name: "existent_field",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedResult: "",
|
||||
expectedError: errors.Wrap(qbtypes.ErrColumnNotFound, errors.TypeInvalidInput, errors.CodeInvalidInput, "did you mean: 'existent_field'?"),
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
fm := NewFieldMapper()
|
||||
result, err := fm.ColumnExpressionFor(ctx, &tc.key, tc.keys)
|
||||
|
||||
if tc.expectedError != nil {
|
||||
assert.Equal(t, tc.expectedError, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, tc.expectedResult, result)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -91,6 +91,9 @@ func (b *logQueryStatementBuilder) Build(
|
||||
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported request type: %s", requestType)
|
||||
}
|
||||
|
||||
// getKeySelectors extracts all TelemetryFieldKey used in the given query.
|
||||
//
|
||||
// It includes keys from aggregations, filters, group by, and order by clauses.
|
||||
func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) []*telemetrytypes.FieldKeySelector {
|
||||
var keySelectors []*telemetrytypes.FieldKeySelector
|
||||
|
||||
@@ -120,6 +123,15 @@ func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]) []
|
||||
})
|
||||
}
|
||||
|
||||
for idx := range query.SelectFields {
|
||||
keySelectors = append(keySelectors, &telemetrytypes.FieldKeySelector{
|
||||
Name: query.SelectFields[idx].Name,
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
FieldContext: query.SelectFields[idx].FieldContext,
|
||||
FieldDataType: query.SelectFields[idx].FieldDataType,
|
||||
})
|
||||
}
|
||||
|
||||
for idx := range keySelectors {
|
||||
keySelectors[idx].Signal = telemetrytypes.SignalLogs
|
||||
keySelectors[idx].SelectorMatchType = telemetrytypes.FieldSelectorMatchTypeExact
|
||||
|
||||
@@ -93,7 +93,7 @@ func (m *fieldMapper) ColumnFor(ctx context.Context, key *telemetrytypes.Telemet
|
||||
func (m *fieldMapper) ColumnExpressionFor(
|
||||
ctx context.Context,
|
||||
field *telemetrytypes.TelemetryFieldKey,
|
||||
keys map[string][]*telemetrytypes.TelemetryFieldKey,
|
||||
_ map[string][]*telemetrytypes.TelemetryFieldKey,
|
||||
) (string, error) {
|
||||
|
||||
colName, err := m.FieldFor(ctx, field)
|
||||
|
||||
@@ -218,3 +218,124 @@ func TestGetFieldKeyName(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestColumnExpressionFor(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
key telemetrytypes.TelemetryFieldKey
|
||||
keys map[string][]*telemetrytypes.TelemetryFieldKey
|
||||
expectedResult string
|
||||
expectedError error
|
||||
}{
|
||||
{
|
||||
name: "Simple column type - metric_name",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "metric_name",
|
||||
FieldContext: telemetrytypes.FieldContextMetric,
|
||||
},
|
||||
keys: nil,
|
||||
expectedResult: "metric_name AS `metric_name`",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Map column type - string label",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "user.id",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
keys: nil,
|
||||
expectedResult: "JSONExtractString(labels, 'user.id') AS `user.id`",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Map column type - number label",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "request.size",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeNumber,
|
||||
},
|
||||
keys: nil,
|
||||
expectedResult: "JSONExtractString(labels, 'request.size') AS `request.size`",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Map column type - bool label",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "request.success",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeBool,
|
||||
},
|
||||
keys: nil,
|
||||
expectedResult: "JSONExtractString(labels, 'request.success') AS `request.success`",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Map column type - resource label",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
keys: nil,
|
||||
expectedResult: "JSONExtractString(labels, 'service.name') AS `service.name`",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Field with duplicate contexts in keys",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "host.name",
|
||||
},
|
||||
keys: map[string][]*telemetrytypes.TelemetryFieldKey{
|
||||
"host.name": {
|
||||
{
|
||||
Name: "host.name",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
{
|
||||
Name: "host.name",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedResult: "JSONExtractString(labels, 'host.name') AS `host.name`",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Field with missing context fallback to keys",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "host.name",
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
keys: map[string][]*telemetrytypes.TelemetryFieldKey{
|
||||
"host.name": {
|
||||
{
|
||||
Name: "host.name",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedResult: "JSONExtractString(labels, 'host.name') AS `host.name`",
|
||||
expectedError: nil,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
fm := NewFieldMapper()
|
||||
result, err := fm.ColumnExpressionFor(ctx, &tc.key, tc.keys)
|
||||
|
||||
if tc.expectedError != nil {
|
||||
assert.Equal(t, tc.expectedError, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, tc.expectedResult, result)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -103,3 +103,114 @@ func TestGetFieldKeyName(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestColumnExpressionFor(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
key telemetrytypes.TelemetryFieldKey
|
||||
keys map[string][]*telemetrytypes.TelemetryFieldKey
|
||||
expectedResult string
|
||||
expectedError error
|
||||
}{
|
||||
{
|
||||
name: "Simple column type - timestamp",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "timestamp",
|
||||
FieldContext: telemetrytypes.FieldContextSpan,
|
||||
},
|
||||
keys: nil,
|
||||
expectedResult: "timestamp AS `timestamp`",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Map column type - string attribute",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "user.id",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
keys: nil,
|
||||
expectedResult: "attributes_string['user.id'] AS `user.id`",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Map column type - number attribute",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "request.size",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeNumber,
|
||||
},
|
||||
keys: nil,
|
||||
expectedResult: "attributes_number['request.size'] AS `request.size`",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Map column type - bool attribute",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "request.success",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeBool,
|
||||
},
|
||||
keys: nil,
|
||||
expectedResult: "attributes_bool['request.success'] AS `request.success`",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Map column type - resource attribute",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
keys: nil,
|
||||
expectedResult: "multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) AS `service.name`",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Map column type - resource attribute - materialized",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "deployment.environment",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
Materialized: true,
|
||||
},
|
||||
keys: nil,
|
||||
expectedResult: "multiIf(resource.`deployment.environment` IS NOT NULL, resource.`deployment.environment`::String, `resource_string_deployment$$$$environment_exists`==true, `resource_string_deployment$$$$environment`, NULL) AS `deployment.environment`",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Field with missing context fallback to keys",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "http.method",
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
keys: map[string][]*telemetrytypes.TelemetryFieldKey{
|
||||
"http.method": {
|
||||
{
|
||||
Name: "http.method",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedResult: "attributes_string['http.method'] AS `http.method`",
|
||||
expectedError: nil,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
fm := NewFieldMapper()
|
||||
result, err := fm.ColumnExpressionFor(ctx, &tc.key, tc.keys)
|
||||
|
||||
if tc.expectedError != nil {
|
||||
assert.Equal(t, tc.expectedError, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, tc.expectedResult, result)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -660,7 +660,7 @@ func TestStatementBuilderTraceQuery(t *testing.T) {
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
name: "list query with deprecated filter field",
|
||||
name: "list query with deprecated filter field",
|
||||
requestType: qbtypes.RequestTypeTrace,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
|
||||
@@ -105,6 +105,20 @@ func GetFieldKeyFromKeyText(key string) TelemetryFieldKey {
|
||||
return fieldKeySelector
|
||||
}
|
||||
|
||||
func GetKeyTextFromFieldKey(key TelemetryFieldKey) string {
|
||||
var sb strings.Builder
|
||||
if key.FieldContext != FieldContextUnspecified {
|
||||
sb.WriteString(key.FieldContext.StringValue())
|
||||
sb.WriteString(".")
|
||||
}
|
||||
sb.WriteString(key.Name)
|
||||
if key.FieldDataType != FieldDataTypeUnspecified {
|
||||
sb.WriteString(":")
|
||||
sb.WriteString(key.FieldDataType.StringValue())
|
||||
}
|
||||
return sb.String()
|
||||
}
|
||||
|
||||
func FieldKeyToMaterializedColumnName(key *TelemetryFieldKey) string {
|
||||
return fmt.Sprintf("`%s_%s_%s`", key.FieldContext.String, fieldDataTypes[key.FieldDataType.StringValue()].StringValue(), strings.ReplaceAll(key.Name, ".", "$$"))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user