mirror of
https://github.com/SigNoz/signoz.git
synced 2026-02-03 08:33:26 +00:00
* feat: has JSON QB * fix: tests expected queries and values * fix: ignored .vscode in gitignore * fix: tests GroupBy * revert: gitignore change * fix: build json plans in metadata * fix: empty filteredArrays condition * fix: tests * fix: tests * fix: json qb test fix * fix: review based on tushar * fix: changes based on review from Srikanth * fix: remove unnecessary bool checking * fix: removed comment * chore: var renamed * fix: merge conflict * test: fix * fix: tests * fix: go test flakiness --------- Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
363 lines
15 KiB
Go
363 lines
15 KiB
Go
package telemetrylogs
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
|
|
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
|
|
"github.com/SigNoz/signoz-otel-collector/utils"
|
|
"github.com/SigNoz/signoz/pkg/errors"
|
|
"github.com/SigNoz/signoz/pkg/querybuilder"
|
|
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
|
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
|
"github.com/huandu/go-sqlbuilder"
|
|
|
|
"golang.org/x/exp/maps"
|
|
)
|
|
|
|
var (
|
|
logsV2Columns = map[string]*schema.Column{
|
|
"ts_bucket_start": {Name: "ts_bucket_start", Type: schema.ColumnTypeUInt64},
|
|
"resource_fingerprint": {Name: "resource_fingerprint", Type: schema.ColumnTypeString},
|
|
|
|
"timestamp": {Name: "timestamp", Type: schema.ColumnTypeUInt64},
|
|
"observed_timestamp": {Name: "observed_timestamp", Type: schema.ColumnTypeUInt64},
|
|
"id": {Name: "id", Type: schema.ColumnTypeString},
|
|
"trace_id": {Name: "trace_id", Type: schema.ColumnTypeString},
|
|
"span_id": {Name: "span_id", Type: schema.ColumnTypeString},
|
|
"trace_flags": {Name: "trace_flags", Type: schema.ColumnTypeUInt32},
|
|
"severity_text": {Name: "severity_text", Type: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}},
|
|
"severity_number": {Name: "severity_number", Type: schema.ColumnTypeUInt8},
|
|
"body": {Name: "body", Type: schema.ColumnTypeString},
|
|
LogsV2BodyJSONColumn: {Name: LogsV2BodyJSONColumn, Type: schema.JSONColumnType{
|
|
MaxDynamicTypes: utils.ToPointer(uint(32)),
|
|
MaxDynamicPaths: utils.ToPointer(uint(0)),
|
|
}},
|
|
LogsV2BodyPromotedColumn: {Name: LogsV2BodyPromotedColumn, Type: schema.JSONColumnType{}},
|
|
"attributes_string": {Name: "attributes_string", Type: schema.MapColumnType{
|
|
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
|
ValueType: schema.ColumnTypeString,
|
|
}},
|
|
"attributes_number": {Name: "attributes_number", Type: schema.MapColumnType{
|
|
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
|
ValueType: schema.ColumnTypeFloat64,
|
|
}},
|
|
"attributes_bool": {Name: "attributes_bool", Type: schema.MapColumnType{
|
|
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
|
ValueType: schema.ColumnTypeBool,
|
|
}},
|
|
"resources_string": {Name: "resources_string", Type: schema.MapColumnType{
|
|
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
|
ValueType: schema.ColumnTypeString,
|
|
}},
|
|
"resource": {Name: "resource", Type: schema.JSONColumnType{}},
|
|
"scope_name": {Name: "scope_name", Type: schema.ColumnTypeString},
|
|
"scope_version": {Name: "scope_version", Type: schema.ColumnTypeString},
|
|
"scope_string": {Name: "scope_string", Type: schema.MapColumnType{
|
|
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
|
ValueType: schema.ColumnTypeString,
|
|
}},
|
|
}
|
|
)
|
|
|
|
type fieldMapper struct {
|
|
}
|
|
|
|
func NewFieldMapper() qbtypes.FieldMapper {
|
|
return &fieldMapper{}
|
|
}
|
|
|
|
func (m *fieldMapper) getColumn(_ context.Context, key *telemetrytypes.TelemetryFieldKey) (*schema.Column, error) {
|
|
switch key.FieldContext {
|
|
case telemetrytypes.FieldContextResource:
|
|
return logsV2Columns["resource"], nil
|
|
case telemetrytypes.FieldContextScope:
|
|
switch key.Name {
|
|
case "name", "scope.name", "scope_name":
|
|
return logsV2Columns["scope_name"], nil
|
|
case "version", "scope.version", "scope_version":
|
|
return logsV2Columns["scope_version"], nil
|
|
}
|
|
return logsV2Columns["scope_string"], nil
|
|
case telemetrytypes.FieldContextAttribute:
|
|
switch key.FieldDataType {
|
|
case telemetrytypes.FieldDataTypeString:
|
|
return logsV2Columns["attributes_string"], nil
|
|
case telemetrytypes.FieldDataTypeInt64, telemetrytypes.FieldDataTypeFloat64, telemetrytypes.FieldDataTypeNumber:
|
|
return logsV2Columns["attributes_number"], nil
|
|
case telemetrytypes.FieldDataTypeBool:
|
|
return logsV2Columns["attributes_bool"], nil
|
|
}
|
|
case telemetrytypes.FieldContextBody:
|
|
// Body context is for JSON body fields
|
|
// Use body_json if feature flag is enabled
|
|
if querybuilder.BodyJSONQueryEnabled {
|
|
return logsV2Columns[LogsV2BodyJSONColumn], nil
|
|
}
|
|
// Fall back to legacy body column
|
|
return logsV2Columns["body"], nil
|
|
case telemetrytypes.FieldContextLog, telemetrytypes.FieldContextUnspecified:
|
|
col, ok := logsV2Columns[key.Name]
|
|
if !ok {
|
|
// check if the key has body JSON search
|
|
if strings.HasPrefix(key.Name, telemetrytypes.BodyJSONStringSearchPrefix) {
|
|
// Use body_json if feature flag is enabled and we have a body condition builder
|
|
if querybuilder.BodyJSONQueryEnabled {
|
|
return logsV2Columns[LogsV2BodyJSONColumn], nil
|
|
}
|
|
// Fall back to legacy body column
|
|
return logsV2Columns["body"], nil
|
|
}
|
|
return nil, qbtypes.ErrColumnNotFound
|
|
}
|
|
return col, nil
|
|
}
|
|
|
|
return nil, qbtypes.ErrColumnNotFound
|
|
}
|
|
|
|
func (m *fieldMapper) FieldFor(ctx context.Context, key *telemetrytypes.TelemetryFieldKey) (string, error) {
|
|
column, err := m.getColumn(ctx, key)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
switch column.Type.GetType() {
|
|
case schema.ColumnTypeEnumJSON:
|
|
// json is only supported for resource context as of now
|
|
switch key.FieldContext {
|
|
case telemetrytypes.FieldContextResource:
|
|
oldColumn := logsV2Columns["resources_string"]
|
|
oldKeyName := fmt.Sprintf("%s['%s']", oldColumn.Name, key.Name)
|
|
|
|
// have to add ::string as clickHouse throws an error :- data types Variant/Dynamic are not allowed in GROUP BY
|
|
// once clickHouse dependency is updated, we need to check if we can remove it.
|
|
if key.Materialized {
|
|
oldKeyName = telemetrytypes.FieldKeyToMaterializedColumnName(key)
|
|
oldKeyNameExists := telemetrytypes.FieldKeyToMaterializedColumnNameForExists(key)
|
|
return fmt.Sprintf("multiIf(%s.`%s` IS NOT NULL, %s.`%s`::String, %s==true, %s, NULL)", column.Name, key.Name, column.Name, key.Name, oldKeyNameExists, oldKeyName), nil
|
|
}
|
|
return fmt.Sprintf("multiIf(%s.`%s` IS NOT NULL, %s.`%s`::String, mapContains(%s, '%s'), %s, NULL)", column.Name, key.Name, column.Name, key.Name, oldColumn.Name, key.Name, oldKeyName), nil
|
|
case telemetrytypes.FieldContextBody:
|
|
if key.JSONDataType == nil {
|
|
return "", qbtypes.ErrColumnNotFound
|
|
}
|
|
|
|
if key.KeyNameContainsArray() && !key.JSONDataType.IsArray {
|
|
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "FieldFor not supported for nested fields; only supported for flat paths (e.g. body.status.detail) and paths of Array type: %s(%s)", key.Name, key.FieldDataType)
|
|
}
|
|
|
|
return m.buildFieldForJSON(key)
|
|
default:
|
|
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "only resource/body context fields are supported for json columns, got %s", key.FieldContext.String)
|
|
}
|
|
case schema.ColumnTypeEnumLowCardinality:
|
|
switch elementType := column.Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
|
|
case schema.ColumnTypeEnumString:
|
|
return column.Name, nil
|
|
default:
|
|
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for low cardinality column type %s", elementType)
|
|
}
|
|
case schema.ColumnTypeEnumString,
|
|
schema.ColumnTypeEnumUInt64, schema.ColumnTypeEnumUInt32, schema.ColumnTypeEnumUInt8:
|
|
return column.Name, nil
|
|
case schema.ColumnTypeEnumMap:
|
|
keyType := column.Type.(schema.MapColumnType).KeyType
|
|
if _, ok := keyType.(schema.LowCardinalityColumnType); !ok {
|
|
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "key type %s is not supported for map column type %s", keyType, column.Type)
|
|
}
|
|
|
|
switch valueType := column.Type.(schema.MapColumnType).ValueType; valueType.GetType() {
|
|
case schema.ColumnTypeEnumString, schema.ColumnTypeEnumBool, schema.ColumnTypeEnumFloat64:
|
|
// a key could have been materialized, if so return the materialized column name
|
|
if key.Materialized {
|
|
return telemetrytypes.FieldKeyToMaterializedColumnName(key), nil
|
|
}
|
|
return fmt.Sprintf("%s['%s']", column.Name, key.Name), nil
|
|
default:
|
|
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for map column type %s", valueType)
|
|
}
|
|
}
|
|
// should not reach here
|
|
return column.Name, nil
|
|
}
|
|
|
|
func (m *fieldMapper) ColumnFor(ctx context.Context, key *telemetrytypes.TelemetryFieldKey) (*schema.Column, error) {
|
|
return m.getColumn(ctx, key)
|
|
}
|
|
|
|
func (m *fieldMapper) ColumnExpressionFor(
|
|
ctx context.Context,
|
|
field *telemetrytypes.TelemetryFieldKey,
|
|
keys map[string][]*telemetrytypes.TelemetryFieldKey,
|
|
) (string, error) {
|
|
|
|
colName, err := m.FieldFor(ctx, field)
|
|
if errors.Is(err, qbtypes.ErrColumnNotFound) {
|
|
// the key didn't have the right context to be added to the query
|
|
// we try to use the context we know of
|
|
keysForField := keys[field.Name]
|
|
if len(keysForField) == 0 {
|
|
// is it a static field?
|
|
if _, ok := logsV2Columns[field.Name]; ok {
|
|
// if it is, attach the column name directly
|
|
field.FieldContext = telemetrytypes.FieldContextLog
|
|
colName, _ = m.FieldFor(ctx, field)
|
|
} else {
|
|
// - the context is not provided
|
|
// - there are not keys for the field
|
|
// - it is not a static field
|
|
// - the next best thing to do is see if there is a typo
|
|
// and suggest a correction
|
|
correction, found := telemetrytypes.SuggestCorrection(field.Name, maps.Keys(keys))
|
|
if found {
|
|
// we found a close match, in the error message send the suggestion
|
|
return "", errors.Wrap(err, errors.TypeInvalidInput, errors.CodeInvalidInput, correction)
|
|
} else {
|
|
// not even a close match, return an error
|
|
return "", errors.Wrapf(err, errors.TypeInvalidInput, errors.CodeInvalidInput, "field `%s` not found", field.Name)
|
|
}
|
|
}
|
|
} else if len(keysForField) == 1 {
|
|
// we have a single key for the field, use it
|
|
colName, _ = m.FieldFor(ctx, keysForField[0])
|
|
} else {
|
|
// select any non-empty value from the keys
|
|
args := []string{}
|
|
for _, key := range keysForField {
|
|
colName, _ = m.FieldFor(ctx, key)
|
|
args = append(args, fmt.Sprintf("toString(%s) != '', toString(%s)", colName, colName))
|
|
}
|
|
colName = fmt.Sprintf("multiIf(%s, NULL)", strings.Join(args, ", "))
|
|
}
|
|
}
|
|
|
|
return fmt.Sprintf("%s AS `%s`", sqlbuilder.Escape(colName), field.Name), nil
|
|
}
|
|
|
|
// buildFieldForJSON builds the field expression for body JSON fields using arrayConcat pattern
|
|
func (m *fieldMapper) buildFieldForJSON(key *telemetrytypes.TelemetryFieldKey) (string, error) {
|
|
plan := key.JSONPlan
|
|
if len(plan) == 0 {
|
|
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput,
|
|
"Could not find any valid paths for: %s", key.Name)
|
|
}
|
|
|
|
if plan[0].IsTerminal {
|
|
node := plan[0]
|
|
|
|
expr := fmt.Sprintf("dynamicElement(%s, '%s')", node.FieldPath(), node.TerminalConfig.ElemType.StringValue())
|
|
if key.Materialized {
|
|
if len(plan) < 2 {
|
|
return "", errors.Newf(errors.TypeUnexpected, CodePromotedPlanMissing,
|
|
"plan length is less than 2 for promoted path: %s", key.Name)
|
|
}
|
|
|
|
// promoted column first then body_json column
|
|
// TODO(Piyush): Change this in future for better performance
|
|
expr = fmt.Sprintf("coalesce(%s, %s)",
|
|
fmt.Sprintf("dynamicElement(%s, '%s')", plan[1].FieldPath(), plan[1].TerminalConfig.ElemType.StringValue()),
|
|
expr,
|
|
)
|
|
}
|
|
|
|
return expr, nil
|
|
}
|
|
|
|
// Build arrayConcat pattern directly from the tree structure
|
|
arrayConcatExpr, err := m.buildArrayConcat(plan)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return arrayConcatExpr, nil
|
|
}
|
|
|
|
// buildArrayConcat builds the arrayConcat pattern directly from the tree structure
|
|
func (m *fieldMapper) buildArrayConcat(plan telemetrytypes.JSONAccessPlan) (string, error) {
|
|
if len(plan) == 0 {
|
|
return "", errors.Newf(errors.TypeInternal, CodeGroupByPlanEmpty, "group by plan is empty while building arrayConcat")
|
|
}
|
|
|
|
// Build arrayMap expressions for ALL available branches at the root level.
|
|
// Iterate branches in deterministic order (JSON then Dynamic) so generated SQL
|
|
// is stable across environments; map iteration order is random in Go.
|
|
var arrayMapExpressions []string
|
|
for _, node := range plan {
|
|
for _, branchType := range node.BranchesInOrder() {
|
|
expr, err := m.buildArrayMap(node, branchType)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
arrayMapExpressions = append(arrayMapExpressions, expr)
|
|
}
|
|
}
|
|
if len(arrayMapExpressions) == 0 {
|
|
return "", errors.Newf(errors.TypeInternal, CodeArrayMapExpressionsEmpty, "array map expressions are empty while building arrayConcat")
|
|
}
|
|
|
|
// Build the arrayConcat expression
|
|
arrayConcatExpr := fmt.Sprintf("arrayConcat(%s)", strings.Join(arrayMapExpressions, ", "))
|
|
|
|
// Wrap with arrayFlatten
|
|
arrayFlattenExpr := fmt.Sprintf("arrayFlatten(%s)", arrayConcatExpr)
|
|
|
|
return arrayFlattenExpr, nil
|
|
}
|
|
|
|
// buildArrayMap builds the arrayMap expression for a specific branch, handling all sub-branches
|
|
func (m *fieldMapper) buildArrayMap(currentNode *telemetrytypes.JSONAccessNode, branchType telemetrytypes.JSONAccessBranchType) (string, error) {
|
|
if currentNode == nil {
|
|
return "", errors.Newf(errors.TypeInternal, CodeCurrentNodeNil, "current node is nil while building arrayMap")
|
|
}
|
|
|
|
childNode := currentNode.Branches[branchType]
|
|
if childNode == nil {
|
|
return "", errors.Newf(errors.TypeInternal, CodeChildNodeNil, "child node is nil while building arrayMap")
|
|
}
|
|
|
|
// Build the array expression for this level
|
|
var arrayExpr string
|
|
if branchType == telemetrytypes.BranchJSON {
|
|
// Array(JSON) branch
|
|
arrayExpr = fmt.Sprintf("dynamicElement(%s, 'Array(JSON(max_dynamic_types=%d, max_dynamic_paths=%d))')",
|
|
currentNode.FieldPath(), currentNode.MaxDynamicTypes, currentNode.MaxDynamicPaths)
|
|
} else {
|
|
// Array(Dynamic) branch - filter for JSON objects
|
|
dynBaseExpr := fmt.Sprintf("dynamicElement(%s, 'Array(Dynamic)')", currentNode.FieldPath())
|
|
arrayExpr = fmt.Sprintf("arrayMap(x->assumeNotNull(dynamicElement(x, 'JSON')), arrayFilter(x->(dynamicType(x) = 'JSON'), %s))", dynBaseExpr)
|
|
}
|
|
|
|
// If this is the terminal level, return the simple arrayMap
|
|
if childNode.IsTerminal {
|
|
dynamicElementExpr := fmt.Sprintf("dynamicElement(%s, '%s')", childNode.FieldPath(),
|
|
childNode.TerminalConfig.ElemType.StringValue(),
|
|
)
|
|
return fmt.Sprintf("arrayMap(%s->%s, %s)", currentNode.Alias(), dynamicElementExpr, arrayExpr), nil
|
|
}
|
|
|
|
// For non-terminal nodes, we need to handle ALL possible branches at the next level.
|
|
// Use deterministic branch order so generated SQL is stable across environments.
|
|
var nestedExpressions []string
|
|
for _, branchType := range childNode.BranchesInOrder() {
|
|
expr, err := m.buildArrayMap(childNode, branchType)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
nestedExpressions = append(nestedExpressions, expr)
|
|
}
|
|
|
|
// If we have multiple nested expressions, we need to concat them
|
|
var nestedExpr string
|
|
if len(nestedExpressions) == 1 {
|
|
nestedExpr = nestedExpressions[0]
|
|
} else if len(nestedExpressions) > 1 {
|
|
nestedExpr = fmt.Sprintf("arrayConcat(%s)", strings.Join(nestedExpressions, ", "))
|
|
} else {
|
|
return "", errors.Newf(errors.TypeInternal, CodeNestedExpressionsEmpty, "nested expressions are empty while building arrayMap")
|
|
}
|
|
|
|
return fmt.Sprintf("arrayMap(%s->%s, %s)", currentNode.Alias(), nestedExpr, arrayExpr), nil
|
|
}
|