Compare commits

...

21 Commits

Author SHA1 Message Date
Piyush Singariya
77d72cabd0 feat: router 2026-01-29 13:09:01 +05:30
Piyush Singariya
57c51f070c fix: merge json body columns together 2026-01-28 14:36:15 +05:30
Piyush Singariya
36becfc7a2 fix: removed comment 2026-01-27 13:20:52 +05:30
Piyush Singariya
8e71de09f3 fix: remove unnecessary bool checking 2026-01-27 13:16:30 +05:30
Piyush Singariya
56de92de73 fix: changes based on review from Srikanth 2026-01-27 13:12:32 +05:30
Piyush Singariya
62b10f8e77 Merge branch 'main' into has-jsonqb 2026-01-27 10:04:32 +05:30
Piyush Singariya
20b53d7856 fix: review based on tushar 2026-01-27 10:04:15 +05:30
Piyush Singariya
8f2c506304 fix: json qb test fix 2026-01-22 22:00:06 +05:30
Srikanth Chekuri
7b5b9027dd Merge branch 'main' into has-jsonqb 2026-01-22 20:19:39 +05:30
Piyush Singariya
b77f97fcb7 fix: tests 2026-01-22 17:24:26 +05:30
Piyush Singariya
62942a4162 fix: tests 2026-01-22 15:47:20 +05:30
Piyush Singariya
349bbbbf1d Merge branch 'main' into has-jsonqb 2026-01-22 12:36:45 +05:30
Piyush Singariya
1966a7a5f6 fix: empty filteredArrays condition 2026-01-22 12:36:29 +05:30
Piyush Singariya
a4eed9ff13 fix: build json plans in metadata 2026-01-22 12:33:51 +05:30
Piyush Singariya
24d1ee33b5 revert: gitignore change 2026-01-22 10:39:02 +05:30
Srikanth Chekuri
3402203021 Merge branch 'main' into has-jsonqb 2026-01-21 13:47:39 +05:30
Piyush Singariya
e8e4897cc8 fix: tests GroupBy 2026-01-20 12:10:44 +05:30
Piyush Singariya
96fb88aaee fix: ignored .vscode in gitignore 2026-01-20 11:47:34 +05:30
Piyush Singariya
5a00e6c2cd Merge branch 'main' into has-jsonqb 2026-01-20 11:44:32 +05:30
Piyush Singariya
e2500cff7d fix: tests expected queries and values 2026-01-20 11:40:56 +05:30
Piyush Singariya
4864c3bc37 feat: has JSON QB 2026-01-20 11:23:50 +05:30
30 changed files with 1309 additions and 633 deletions

1
.gitignore vendored
View File

@@ -57,7 +57,6 @@ bin/
.local/
*/query-service/queries.active
ee/query-service/db
# e2e
e2e/node_modules/

5
go.mod
View File

@@ -8,7 +8,7 @@ require (
github.com/ClickHouse/clickhouse-go/v2 v2.40.1
github.com/DATA-DOG/go-sqlmock v1.5.2
github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd
github.com/SigNoz/signoz-otel-collector v0.129.10-rc.9
github.com/SigNoz/signoz-otel-collector v0.129.13-rc.2
github.com/antlr4-go/antlr/v4 v4.13.1
github.com/antonmedv/expr v1.15.3
github.com/bytedance/sonic v1.14.1
@@ -363,3 +363,6 @@ require (
)
replace github.com/expr-lang/expr => github.com/SigNoz/expr v1.17.7-beta
replace (
github.com/SigNoz/signoz-otel-collector v0.129.13-rc.2 => ../signoz-otel-collector
)

2
go.sum
View File

@@ -108,6 +108,8 @@ github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd h1:Bk43AsDYe0fhkb
github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd/go.mod h1:nxRcH/OEdM8QxzH37xkGzomr1O0JpYBRS6pwjsWW6Pc=
github.com/SigNoz/signoz-otel-collector v0.129.10-rc.9 h1:WmYDSSwzyW2yiJ3tPq5AFdjsrz3NBdtPkygtFKOsACw=
github.com/SigNoz/signoz-otel-collector v0.129.10-rc.9/go.mod h1:4eJCRUd/P4OiCHXvGYZK8q6oyBVGJFVj/G6qKSoN/TQ=
github.com/SigNoz/signoz-otel-collector v0.129.13-rc.2 h1:225Seh4b0tBavJ7dRTqX2BNG0egEXc4polH4Di5LJJA=
github.com/SigNoz/signoz-otel-collector v0.129.13-rc.2/go.mod h1:4eJCRUd/P4OiCHXvGYZK8q6oyBVGJFVj/G6qKSoN/TQ=
github.com/Yiling-J/theine-go v0.6.2 h1:1GeoXeQ0O0AUkiwj2S9Jc0Mzx+hpqzmqsJ4kIC4M9AY=
github.com/Yiling-J/theine-go v0.6.2/go.mod h1:08QpMa5JZ2pKN+UJCRrCasWYO1IKCdl54Xa836rpmDU=
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c=

View File

@@ -105,7 +105,7 @@ func newProvider(
// Create log statement builder
logFieldMapper := telemetrylogs.NewFieldMapper()
logConditionBuilder := telemetrylogs.NewConditionBuilder(logFieldMapper, telemetryMetadataStore)
logConditionBuilder := telemetrylogs.NewConditionBuilder(logFieldMapper)
logResourceFilterStmtBuilder := resourcefilter.NewLogResourceFilterStatementBuilder(
settings,
resourceFilterFieldMapper,

View File

@@ -690,6 +690,22 @@ func (v *filterExpressionVisitor) VisitFunctionCall(ctx *grammar.FunctionCallCon
v.errors = append(v.errors, fmt.Sprintf("function `%s` expects key parameter to be a field key", functionName))
return ""
}
// filter arrays from keys
if BodyJSONQueryEnabled && functionName != "hasToken" {
filteredKeys := []*telemetrytypes.TelemetryFieldKey{}
for _, key := range keys {
if key.FieldDataType.IsArray() {
filteredKeys = append(filteredKeys, key)
}
}
if len(filteredKeys) == 0 {
v.errors = append(v.errors, fmt.Sprintf("function `%s` expects key parameter to be an array field; no array fields found", functionName))
return ""
}
keys = filteredKeys
}
value := params[1:]
var conds []string
for _, key := range keys {
@@ -716,7 +732,16 @@ func (v *filterExpressionVisitor) VisitFunctionCall(ctx *grammar.FunctionCallCon
} else {
// this is that all other functions only support array fields
if key.FieldContext == telemetrytypes.FieldContextBody {
fieldName, _ = v.jsonKeyToKey(context.Background(), key, qbtypes.FilterOperatorUnknown, value)
var err error
if BodyJSONQueryEnabled {
fieldName, err = v.fieldMapper.FieldFor(context.Background(), key)
if err != nil {
v.errors = append(v.errors, fmt.Sprintf("failed to get field name for key %s: %s", key.Name, err.Error()))
return ""
}
} else {
fieldName, _ = v.jsonKeyToKey(context.Background(), key, qbtypes.FilterOperatorUnknown, value)
}
} else {
// TODO(add docs for json body search)
if v.mainErrorURL == "" {

View File

@@ -0,0 +1,144 @@
package telemetrylogs
import (
"context"
"fmt"
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
)
// CompositeColumnStrategy handles condition building for composite columns,
// which are logical fields that span multiple physical columns.
// Examples: "body" field that can be body, body_json, or body_json_promoted.
type CompositeColumnStrategy struct {
fm qbtypes.FieldMapper
}
func NewCompositeColumnStrategy(fm qbtypes.FieldMapper) *CompositeColumnStrategy {
return &CompositeColumnStrategy{fm: fm}
}
func (s *CompositeColumnStrategy) CanHandle(ctx context.Context, key *telemetrytypes.TelemetryFieldKey, column *schema.Column) bool {
// Check if this is a composite column by looking it up in the logical field registry
_, exists := logicalFieldRegistry[key.Name]
return exists
}
func (s *CompositeColumnStrategy) BuildCondition(
ctx context.Context,
key *telemetrytypes.TelemetryFieldKey,
operator qbtypes.FilterOperator,
value any,
sb *sqlbuilder.SelectBuilder,
) (string, error) {
metadata, exists := logicalFieldRegistry[key.Name]
if !exists {
// Should not happen if CanHandle returned true, but handle gracefully
return "", qbtypes.ErrColumnNotFound
}
// Check if there's a custom condition handler for this operator
if metadata.Conditions != nil {
if handler, ok := metadata.Conditions[operator]; ok {
return handler(sb, value)
}
}
// Fallback: use the expression from the logical field and apply standard operators
expr := metadata.Expr()
// Format value for string search operators
if operator.IsStringSearchOperator() {
value = querybuilder.FormatValueForContains(value)
}
// Apply standard operators using the logical field expression
return s.buildConditionForExpression(sb, expr, operator, value)
}
// buildConditionForExpression builds conditions for a given expression string.
// This is used by composite columns that have their own expressions.
func (s *CompositeColumnStrategy) buildConditionForExpression(
sb *sqlbuilder.SelectBuilder,
expr string,
operator qbtypes.FilterOperator,
value any,
) (string, error) {
switch operator {
case qbtypes.FilterOperatorEqual:
return sb.E(expr, value), nil
case qbtypes.FilterOperatorNotEqual:
return sb.NE(expr, value), nil
case qbtypes.FilterOperatorGreaterThan:
return sb.G(expr, value), nil
case qbtypes.FilterOperatorGreaterThanOrEq:
return sb.GE(expr, value), nil
case qbtypes.FilterOperatorLessThan:
return sb.LT(expr, value), nil
case qbtypes.FilterOperatorLessThanOrEq:
return sb.LE(expr, value), nil
case qbtypes.FilterOperatorLike:
return sb.Like(expr, value), nil
case qbtypes.FilterOperatorNotLike:
return sb.NotLike(expr, value), nil
case qbtypes.FilterOperatorILike:
return sb.ILike(expr, value), nil
case qbtypes.FilterOperatorNotILike:
return sb.NotILike(expr, value), nil
case qbtypes.FilterOperatorContains:
return sb.ILike(expr, fmt.Sprintf("%%%s%%", value)), nil
case qbtypes.FilterOperatorNotContains:
return sb.NotILike(expr, fmt.Sprintf("%%%s%%", value)), nil
case qbtypes.FilterOperatorRegexp:
return fmt.Sprintf(`match(%s, %s)`, sqlbuilder.Escape(expr), sb.Var(value)), nil
case qbtypes.FilterOperatorNotRegexp:
return fmt.Sprintf(`NOT match(%s, %s)`, sqlbuilder.Escape(expr), sb.Var(value)), nil
case qbtypes.FilterOperatorBetween:
values, ok := value.([]any)
if !ok || len(values) != 2 {
return "", qbtypes.ErrBetweenValues
}
return sb.Between(expr, values[0], values[1]), nil
case qbtypes.FilterOperatorNotBetween:
values, ok := value.([]any)
if !ok || len(values) != 2 {
return "", qbtypes.ErrBetweenValues
}
return sb.NotBetween(expr, values[0], values[1]), nil
case qbtypes.FilterOperatorIn:
values, ok := value.([]any)
if !ok {
return "", qbtypes.ErrInValues
}
conditions := []string{}
for _, v := range values {
conditions = append(conditions, sb.E(expr, v))
}
return sb.Or(conditions...), nil
case qbtypes.FilterOperatorNotIn:
values, ok := value.([]any)
if !ok {
return "", qbtypes.ErrInValues
}
conditions := []string{}
for _, v := range values {
conditions = append(conditions, sb.NE(expr, v))
}
return sb.And(conditions...), nil
case qbtypes.FilterOperatorExists, qbtypes.FilterOperatorNotExists:
// For composite columns, exists/not exists should be handled by the custom conditions
// If we reach here, it means no custom handler was provided
// We'll use a simple IS NOT NULL check on the expression
if operator == qbtypes.FilterOperatorExists {
return fmt.Sprintf("%s IS NOT NULL", expr), nil
}
return fmt.Sprintf("%s IS NULL", expr), nil
default:
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported operator: %v", operator)
}
}

View File

@@ -16,12 +16,11 @@ import (
)
type conditionBuilder struct {
fm qbtypes.FieldMapper
metadataStore telemetrytypes.MetadataStore
fm qbtypes.FieldMapper
}
func NewConditionBuilder(fm qbtypes.FieldMapper, metadataStore telemetrytypes.MetadataStore) *conditionBuilder {
return &conditionBuilder{fm: fm, metadataStore: metadataStore}
func NewConditionBuilder(fm qbtypes.FieldMapper) *conditionBuilder {
return &conditionBuilder{fm: fm}
}
func (c *conditionBuilder) conditionFor(
@@ -37,7 +36,8 @@ func (c *conditionBuilder) conditionFor(
}
if column.IsJSONColumn() && querybuilder.BodyJSONQueryEnabled {
cond, err := c.buildJSONCondition(ctx, key, operator, value, sb)
valueType, value := InferDataType(value, operator, key)
cond, err := NewJSONConditionBuilder(key, valueType).buildJSONCondition(operator, value, sb)
if err != nil {
return "", err
}

View File

@@ -373,8 +373,7 @@ func TestConditionFor(t *testing.T) {
}
fm := NewFieldMapper()
mockMetadataStore := buildTestTelemetryMetadataStore()
conditionBuilder := NewConditionBuilder(fm, mockMetadataStore)
conditionBuilder := NewConditionBuilder(fm)
for _, tc := range testCases {
sb := sqlbuilder.NewSelectBuilder()
@@ -427,8 +426,7 @@ func TestConditionForMultipleKeys(t *testing.T) {
}
fm := NewFieldMapper()
mockMetadataStore := buildTestTelemetryMetadataStore()
conditionBuilder := NewConditionBuilder(fm, mockMetadataStore)
conditionBuilder := NewConditionBuilder(fm)
for _, tc := range testCases {
sb := sqlbuilder.NewSelectBuilder()
@@ -687,8 +685,7 @@ func TestConditionForJSONBodySearch(t *testing.T) {
}
fm := NewFieldMapper()
mockMetadataStore := buildTestTelemetryMetadataStore()
conditionBuilder := NewConditionBuilder(fm, mockMetadataStore)
conditionBuilder := NewConditionBuilder(fm)
for _, tc := range testCases {
sb := sqlbuilder.NewSelectBuilder()

View File

@@ -0,0 +1,72 @@
package telemetrylogs
import (
"context"
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
)
// ConditionStrategy defines the interface for building conditions.
// Different strategies handle different data access patterns:
// - DirectExpressionStrategy: Physical columns, map expressions, simple JSON paths
// - LambdaExpressionStrategy: JSON paths with array traversal requiring nested lambdas
// - CompositeColumnStrategy: Logical fields spanning multiple physical columns
type ConditionStrategy interface {
// BuildCondition builds a SQL condition for the given key, operator, and value.
BuildCondition(
ctx context.Context,
key *telemetrytypes.TelemetryFieldKey,
operator qbtypes.FilterOperator,
value any,
sb *sqlbuilder.SelectBuilder,
) (string, error)
// CanHandle returns true if this strategy can handle the given field key and column.
CanHandle(ctx context.Context, key *telemetrytypes.TelemetryFieldKey, column *schema.Column) bool
}
// ConditionStrategyRouter determines which strategy to use for a given field.
type ConditionStrategyRouter struct {
strategies []ConditionStrategy
fm qbtypes.FieldMapper
}
// NewConditionStrategyRouter creates a new router with all available strategies.
func NewConditionStrategyRouter(fm qbtypes.FieldMapper) *ConditionStrategyRouter {
return &ConditionStrategyRouter{
strategies: []ConditionStrategy{
NewCompositeColumnStrategy(fm),
NewLambdaExpressionStrategy(fm),
NewDirectExpressionStrategy(fm),
},
fm: fm,
}
}
// Route determines which strategy to use and builds the condition.
func (r *ConditionStrategyRouter) Route(
ctx context.Context,
key *telemetrytypes.TelemetryFieldKey,
operator qbtypes.FilterOperator,
value any,
sb *sqlbuilder.SelectBuilder,
) (string, error) {
column, err := r.fm.ColumnFor(ctx, key)
if err != nil {
return "", err
}
// Find the first strategy that can handle this field
for _, strategy := range r.strategies {
if strategy.CanHandle(ctx, key, column) {
return strategy.BuildCondition(ctx, key, operator, value, sb)
}
}
// Fallback to direct expression if no strategy matches
directStrategy := NewDirectExpressionStrategy(r.fm)
return directStrategy.BuildCondition(ctx, key, operator, value, sb)
}

View File

@@ -118,3 +118,30 @@ var (
},
}
)
// bodyExpression returns the core ClickHouse expression representing the
// conceptual "body" field when JSON body querying is enabled.
//
// NOTE: This intentionally does NOT include an alias so it can be reused
// both in SELECT lists (with an "AS body" suffix) and in WHERE clauses.
func bodyExpression() string {
if !querybuilder.BodyJSONQueryEnabled {
return LogsV2BodyColumn
}
jsonStringExpressions := []string{}
for _, key := range []string{LogsV2BodyJSONColumn, LogsV2BodyPromotedColumn} {
jsonStringExpressions = append(jsonStringExpressions, fmt.Sprintf("toJSONString(%s)", key))
}
jsonMergeExpr := fmt.Sprintf("jsonMergePatch(%s)", strings.Join(jsonStringExpressions, ", "))
return fmt.Sprintf("if(empty(%s), %s, %s)", LogsV2BodyColumn, LogsV2BodyColumn, jsonMergeExpr)
}
func bodyAliasExpression() string {
if !querybuilder.BodyJSONQueryEnabled {
return LogsV2BodyColumn
}
return fmt.Sprintf("%s as body", bodyExpression())
}

View File

@@ -0,0 +1,221 @@
package telemetrylogs
import (
"context"
"fmt"
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
)
// DirectExpressionStrategy handles condition building for:
// 1. Physical Columns (direct access: severity_text, body, etc.)
// 2. Fields inside Physical Columns -> Expressions (like attribute_string['service.name'])
// 3. Simple JSON paths without array traversal
type DirectExpressionStrategy struct {
fm qbtypes.FieldMapper
}
func NewDirectExpressionStrategy(fm qbtypes.FieldMapper) *DirectExpressionStrategy {
return &DirectExpressionStrategy{fm: fm}
}
func (s *DirectExpressionStrategy) CanHandle(ctx context.Context, key *telemetrytypes.TelemetryFieldKey, column *schema.Column) bool {
// Direct expressions can handle everything except:
// 1. JSON columns with array traversal (handled by LambdaExpressionStrategy)
// 2. Composite columns (handled by CompositeColumnStrategy)
// If it's a JSON column with array traversal, we need lambda
if column.Type.GetType() == schema.ColumnTypeEnumJSON && querybuilder.BodyJSONQueryEnabled {
// Check if this requires array traversal (has JSONPlan with non-terminal nodes)
if len(key.JSONPlan) > 0 && !key.JSONPlan[0].IsTerminal {
return false
}
}
return true
}
func (s *DirectExpressionStrategy) BuildCondition(
ctx context.Context,
key *telemetrytypes.TelemetryFieldKey,
operator qbtypes.FilterOperator,
value any,
sb *sqlbuilder.SelectBuilder,
) (string, error) {
column, err := s.fm.ColumnFor(ctx, key)
if err != nil {
return "", err
}
if operator.IsStringSearchOperator() {
value = querybuilder.FormatValueForContains(value)
}
tblFieldName, err := s.fm.FieldFor(ctx, key)
if err != nil {
return "", err
}
// Check if this is a body JSON search - either by FieldContext
if key.FieldContext == telemetrytypes.FieldContextBody {
tblFieldName, value = GetBodyJSONKey(ctx, key, operator, value)
}
tblFieldName, value = querybuilder.DataTypeCollisionHandledFieldName(key, value, tblFieldName, operator)
// Regular operators
return s.applyOperator(sb, tblFieldName, operator, value, column, key, ctx)
}
func (s *DirectExpressionStrategy) applyOperator(
sb *sqlbuilder.SelectBuilder,
tblFieldName string,
operator qbtypes.FilterOperator,
value any,
column *schema.Column,
key *telemetrytypes.TelemetryFieldKey,
ctx context.Context,
) (string, error) {
switch operator {
case qbtypes.FilterOperatorEqual:
return sb.E(tblFieldName, value), nil
case qbtypes.FilterOperatorNotEqual:
return sb.NE(tblFieldName, value), nil
case qbtypes.FilterOperatorGreaterThan:
return sb.G(tblFieldName, value), nil
case qbtypes.FilterOperatorGreaterThanOrEq:
return sb.GE(tblFieldName, value), nil
case qbtypes.FilterOperatorLessThan:
return sb.LT(tblFieldName, value), nil
case qbtypes.FilterOperatorLessThanOrEq:
return sb.LE(tblFieldName, value), nil
case qbtypes.FilterOperatorLike:
return sb.Like(tblFieldName, value), nil
case qbtypes.FilterOperatorNotLike:
return sb.NotLike(tblFieldName, value), nil
case qbtypes.FilterOperatorILike:
return sb.ILike(tblFieldName, value), nil
case qbtypes.FilterOperatorNotILike:
return sb.NotILike(tblFieldName, value), nil
case qbtypes.FilterOperatorContains:
return sb.ILike(tblFieldName, fmt.Sprintf("%%%s%%", value)), nil
case qbtypes.FilterOperatorNotContains:
return sb.NotILike(tblFieldName, fmt.Sprintf("%%%s%%", value)), nil
case qbtypes.FilterOperatorRegexp:
return fmt.Sprintf(`match(%s, %s)`, sqlbuilder.Escape(tblFieldName), sb.Var(value)), nil
case qbtypes.FilterOperatorNotRegexp:
return fmt.Sprintf(`NOT match(%s, %s)`, sqlbuilder.Escape(tblFieldName), sb.Var(value)), nil
case qbtypes.FilterOperatorBetween:
values, ok := value.([]any)
if !ok || len(values) != 2 {
return "", qbtypes.ErrBetweenValues
}
return sb.Between(tblFieldName, values[0], values[1]), nil
case qbtypes.FilterOperatorNotBetween:
values, ok := value.([]any)
if !ok || len(values) != 2 {
return "", qbtypes.ErrBetweenValues
}
return sb.NotBetween(tblFieldName, values[0], values[1]), nil
case qbtypes.FilterOperatorIn:
values, ok := value.([]any)
if !ok {
return "", qbtypes.ErrInValues
}
conditions := []string{}
for _, v := range values {
conditions = append(conditions, sb.E(tblFieldName, v))
}
return sb.Or(conditions...), nil
case qbtypes.FilterOperatorNotIn:
values, ok := value.([]any)
if !ok {
return "", qbtypes.ErrInValues
}
conditions := []string{}
for _, v := range values {
conditions = append(conditions, sb.NE(tblFieldName, v))
}
return sb.And(conditions...), nil
case qbtypes.FilterOperatorExists, qbtypes.FilterOperatorNotExists:
return s.buildExistsCondition(ctx, key, operator, column, sb)
default:
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported operator: %v", operator)
}
}
func (s *DirectExpressionStrategy) buildExistsCondition(
ctx context.Context,
key *telemetrytypes.TelemetryFieldKey,
operator qbtypes.FilterOperator,
column *schema.Column,
sb *sqlbuilder.SelectBuilder,
) (string, error) {
if key.FieldContext == telemetrytypes.FieldContextBody && !querybuilder.BodyJSONQueryEnabled {
if operator == qbtypes.FilterOperatorExists {
return GetBodyJSONKeyForExists(ctx, key, operator, nil), nil
}
return "NOT " + GetBodyJSONKeyForExists(ctx, key, operator, nil), nil
}
var value any
switch column.Type.GetType() {
case schema.ColumnTypeEnumJSON:
tblFieldName, err := s.fm.FieldFor(ctx, key)
if err != nil {
return "", err
}
if operator == qbtypes.FilterOperatorExists {
return sb.IsNotNull(tblFieldName), nil
}
return sb.IsNull(tblFieldName), nil
case schema.ColumnTypeEnumLowCardinality:
switch elementType := column.Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
case schema.ColumnTypeEnumString:
value = ""
if operator == qbtypes.FilterOperatorExists {
return sb.NE(column.Name, value), nil
}
return sb.E(column.Name, value), nil
default:
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for low cardinality column type %s", elementType)
}
case schema.ColumnTypeEnumString:
value = ""
if operator == qbtypes.FilterOperatorExists {
return sb.NE(column.Name, value), nil
}
return sb.E(column.Name, value), nil
case schema.ColumnTypeEnumUInt64, schema.ColumnTypeEnumUInt32, schema.ColumnTypeEnumUInt8:
value = 0
if operator == qbtypes.FilterOperatorExists {
return sb.NE(column.Name, value), nil
}
return sb.E(column.Name, value), nil
case schema.ColumnTypeEnumMap:
keyType := column.Type.(schema.MapColumnType).KeyType
if _, ok := keyType.(schema.LowCardinalityColumnType); !ok {
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "key type %s is not supported for map column type %s", keyType, column.Type)
}
switch valueType := column.Type.(schema.MapColumnType).ValueType; valueType.GetType() {
case schema.ColumnTypeEnumString, schema.ColumnTypeEnumBool, schema.ColumnTypeEnumFloat64:
leftOperand := fmt.Sprintf("mapContains(%s, '%s')", column.Name, key.Name)
if key.Materialized {
leftOperand = telemetrytypes.FieldKeyToMaterializedColumnNameForExists(key)
}
if operator == qbtypes.FilterOperatorExists {
return sb.E(leftOperand, true), nil
}
return sb.NE(leftOperand, true), nil
default:
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for map column type %s", valueType)
}
default:
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for column type %s", column.Type)
}
}

View File

@@ -140,20 +140,15 @@ func (m *fieldMapper) FieldFor(ctx context.Context, key *telemetrytypes.Telemetr
}
return fmt.Sprintf("multiIf(%s.`%s` IS NOT NULL, %s.`%s`::String, mapContains(%s, '%s'), %s, NULL)", column.Name, key.Name, column.Name, key.Name, oldColumn.Name, key.Name, oldKeyName), nil
case telemetrytypes.FieldContextBody:
if querybuilder.BodyJSONQueryEnabled && (strings.Contains(key.Name, telemetrytypes.ArraySep) || strings.Contains(key.Name, telemetrytypes.ArrayAnyIndex)) {
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "FieldFor not supported for the Array Paths: %s", key.Name)
}
if key.JSONDataType == nil {
return "", qbtypes.ErrColumnNotFound
}
fieldExpr := BodyJSONColumnPrefix + fmt.Sprintf("`%s`", key.Name)
expr := fmt.Sprintf("dynamicElement(%s, '%s')", fieldExpr, key.JSONDataType.StringValue())
if key.Materialized {
promotedFieldExpr := BodyPromotedColumnPrefix + fmt.Sprintf("`%s`", key.Name)
expr = fmt.Sprintf("coalesce(%s, %s)", expr, fmt.Sprintf("dynamicElement(%s, '%s')", promotedFieldExpr, key.JSONDataType.StringValue()))
if key.KeyNameContainsArray() && !key.JSONDataType.IsArray {
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "FieldFor not supported for nested fields; only supported for flat paths (e.g. body.status.detail) and paths of Array type: %s(%s)", key.Name, key.FieldDataType)
}
return expr, nil
return m.buildFieldForJSON(key)
default:
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "only resource/body context fields are supported for json columns, got %s", key.FieldContext.String)
}
@@ -240,3 +235,125 @@ func (m *fieldMapper) ColumnExpressionFor(
return fmt.Sprintf("%s AS `%s`", sqlbuilder.Escape(colName), field.Name), nil
}
// buildFieldForJSON builds the field expression for body JSON fields using arrayConcat pattern
func (m *fieldMapper) buildFieldForJSON(key *telemetrytypes.TelemetryFieldKey) (string, error) {
plan := key.JSONPlan
if len(plan) == 0 {
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput,
"Could not find any valid paths for: %s", key.Name)
}
if plan[0].IsTerminal {
node := plan[0]
expr := fmt.Sprintf("dynamicElement(%s, '%s')", node.FieldPath(), node.TerminalConfig.ElemType.StringValue())
if key.Materialized {
if len(plan) < 2 {
return "", errors.Newf(errors.TypeUnexpected, CodePromotedPlanMissing,
"plan length is less than 2 for promoted path: %s", key.Name)
}
// promoted column first then body_json column
// TODO(Piyush): Change this in future for better performance
expr = fmt.Sprintf("coalesce(%s, %s)",
fmt.Sprintf("dynamicElement(%s, '%s')", plan[1].FieldPath(), plan[1].TerminalConfig.ElemType.StringValue()),
expr,
)
}
return expr, nil
}
// Build arrayConcat pattern directly from the tree structure
arrayConcatExpr, err := m.buildArrayConcat(plan)
if err != nil {
return "", err
}
return arrayConcatExpr, nil
}
// buildArrayConcat builds the arrayConcat pattern directly from the tree structure
func (m *fieldMapper) buildArrayConcat(plan telemetrytypes.JSONAccessPlan) (string, error) {
if len(plan) == 0 {
return "", errors.Newf(errors.TypeInternal, CodeGroupByPlanEmpty, "group by plan is empty while building arrayConcat")
}
// Build arrayMap expressions for ALL available branches at the root level
var arrayMapExpressions []string
for _, node := range plan {
for branchType := range node.Branches {
expr, err := m.buildArrayMap(node, branchType)
if err != nil {
return "", err
}
arrayMapExpressions = append(arrayMapExpressions, expr)
}
}
if len(arrayMapExpressions) == 0 {
return "", errors.Newf(errors.TypeInternal, CodeArrayMapExpressionsEmpty, "array map expressions are empty while building arrayConcat")
}
// Build the arrayConcat expression
arrayConcatExpr := fmt.Sprintf("arrayConcat(%s)", strings.Join(arrayMapExpressions, ", "))
// Wrap with arrayFlatten
arrayFlattenExpr := fmt.Sprintf("arrayFlatten(%s)", arrayConcatExpr)
return arrayFlattenExpr, nil
}
// buildArrayMap builds the arrayMap expression for a specific branch, handling all sub-branches
func (m *fieldMapper) buildArrayMap(currentNode *telemetrytypes.JSONAccessNode, branchType telemetrytypes.JSONAccessBranchType) (string, error) {
if currentNode == nil {
return "", errors.Newf(errors.TypeInternal, CodeCurrentNodeNil, "current node is nil while building arrayMap")
}
nextNode := currentNode.Branches[branchType]
if nextNode == nil {
return "", errors.Newf(errors.TypeInternal, CodeNextNodeNil, "next node is nil while building arrayMap")
}
// Build the array expression for this level
var arrayExpr string
if branchType == telemetrytypes.BranchJSON {
// Array(JSON) branch
arrayExpr = fmt.Sprintf("dynamicElement(%s, 'Array(JSON(max_dynamic_types=%d, max_dynamic_paths=%d))')",
currentNode.FieldPath(), currentNode.MaxDynamicTypes, currentNode.MaxDynamicPaths)
} else {
// Array(Dynamic) branch - filter for JSON objects
dynBaseExpr := fmt.Sprintf("dynamicElement(%s, 'Array(Dynamic)')", currentNode.FieldPath())
arrayExpr = fmt.Sprintf("arrayMap(x->assumeNotNull(dynamicElement(x, 'JSON')), arrayFilter(x->(dynamicType(x) = 'JSON'), %s))", dynBaseExpr)
}
// If this is the terminal level, return the simple arrayMap
if nextNode.IsTerminal {
dynamicElementExpr := fmt.Sprintf("dynamicElement(%s, '%s')", nextNode.FieldPath(),
nextNode.TerminalConfig.ElemType.StringValue(),
)
return fmt.Sprintf("arrayMap(%s->%s, %s)", currentNode.Alias(), dynamicElementExpr, arrayExpr), nil
}
// For non-terminal nodes, we need to handle ALL possible branches at the next level
var nestedExpressions []string
for branchType := range nextNode.Branches {
expr, err := m.buildArrayMap(nextNode, branchType)
if err != nil {
return "", err
}
nestedExpressions = append(nestedExpressions, expr)
}
// If we have multiple nested expressions, we need to concat them
var nestedExpr string
if len(nestedExpressions) == 1 {
nestedExpr = nestedExpressions[0]
} else if len(nestedExpressions) > 1 {
nestedExpr = fmt.Sprintf("arrayConcat(%s)", strings.Join(nestedExpressions, ", "))
} else {
return "", errors.Newf(errors.TypeInternal, CodeNestedExpressionsEmpty, "nested expressions are empty while building arrayMap")
}
return fmt.Sprintf("arrayMap(%s->%s, %s)", currentNode.Alias(), nestedExpr, arrayExpr), nil
}

View File

@@ -11,7 +11,7 @@ import (
// TestLikeAndILikeWithoutWildcards_Warns Tests that LIKE/ILIKE without wildcards add warnings and include docs URL
func TestLikeAndILikeWithoutWildcards_Warns(t *testing.T) {
fm := NewFieldMapper()
cb := NewConditionBuilder(fm, nil)
cb := NewConditionBuilder(fm)
keys := buildCompleteFieldKeyMap()
@@ -47,7 +47,7 @@ func TestLikeAndILikeWithoutWildcards_Warns(t *testing.T) {
// TestLikeAndILikeWithWildcards_NoWarn Tests that LIKE/ILIKE with wildcards do not add warnings
func TestLikeAndILikeWithWildcards_NoWarn(t *testing.T) {
fm := NewFieldMapper()
cb := NewConditionBuilder(fm, nil)
cb := NewConditionBuilder(fm)
keys := buildCompleteFieldKeyMap()

View File

@@ -7,7 +7,6 @@ import (
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest"
"github.com/huandu/go-sqlbuilder"
"github.com/stretchr/testify/require"
)
@@ -15,7 +14,7 @@ import (
// TestFilterExprLogsBodyJSON tests a comprehensive set of query patterns for body JSON search
func TestFilterExprLogsBodyJSON(t *testing.T) {
fm := NewFieldMapper()
cb := NewConditionBuilder(fm, telemetrytypestest.NewMockMetadataStore())
cb := NewConditionBuilder(fm)
// Define a comprehensive set of field keys to support all test cases
keys := buildCompleteFieldKeyMap()

View File

@@ -16,7 +16,7 @@ import (
// TestFilterExprLogs tests a comprehensive set of query patterns for logs search
func TestFilterExprLogs(t *testing.T) {
fm := NewFieldMapper()
cb := NewConditionBuilder(fm, nil)
cb := NewConditionBuilder(fm)
// Define a comprehensive set of field keys to support all test cases
keys := buildCompleteFieldKeyMap()
@@ -2423,7 +2423,7 @@ func TestFilterExprLogs(t *testing.T) {
// TestFilterExprLogs tests a comprehensive set of query patterns for logs search
func TestFilterExprLogsConflictNegation(t *testing.T) {
fm := NewFieldMapper()
cb := NewConditionBuilder(fm, nil)
cb := NewConditionBuilder(fm)
// Define a comprehensive set of field keys to support all test cases
keys := buildCompleteFieldKeyMap()

View File

@@ -1,179 +0,0 @@
package telemetrylogs
import (
"context"
"slices"
"strings"
"github.com/SigNoz/signoz/pkg/errors"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
var (
CodePlanIndexOutOfBounds = errors.MustNewCode("plan_index_out_of_bounds")
)
type JSONAccessPlanBuilder struct {
key *telemetrytypes.TelemetryFieldKey
value any
op qbtypes.FilterOperator
parts []string
isPromoted bool
typeCache map[string][]telemetrytypes.JSONDataType
}
// buildPlan recursively builds the path plan tree
func (pb *JSONAccessPlanBuilder) buildPlan(index int, parent *telemetrytypes.JSONAccessNode, isDynArrChild bool) (*telemetrytypes.JSONAccessNode, error) {
if index >= len(pb.parts) {
return nil, errors.NewInvalidInputf(CodePlanIndexOutOfBounds, "index is out of bounds")
}
part := pb.parts[index]
pathSoFar := strings.Join(pb.parts[:index+1], telemetrytypes.ArraySep)
isTerminal := index == len(pb.parts)-1
// Calculate progression parameters based on parent's values
var maxTypes, maxPaths int
if isDynArrChild {
// Child of Dynamic array - reset progression to base values (16, 256)
// This happens when we switch from Array(Dynamic) to Array(JSON)
maxTypes = 16
maxPaths = 256
} else if parent != nil {
// Child of JSON array - use parent's progression divided by 2 and 4
maxTypes = parent.MaxDynamicTypes / 2
maxPaths = parent.MaxDynamicPaths / 4
if maxTypes < 0 {
maxTypes = 0
}
if maxPaths < 0 {
maxPaths = 0
}
}
// Use cached types from the batched metadata query
types := pb.typeCache[pathSoFar]
// Create node for this path segment
node := &telemetrytypes.JSONAccessNode{
Name: part,
IsTerminal: isTerminal,
AvailableTypes: types,
Branches: make(map[telemetrytypes.JSONAccessBranchType]*telemetrytypes.JSONAccessNode),
Parent: parent,
MaxDynamicTypes: maxTypes,
MaxDynamicPaths: maxPaths,
}
hasJSON := slices.Contains(node.AvailableTypes, telemetrytypes.ArrayJSON)
hasDynamic := slices.Contains(node.AvailableTypes, telemetrytypes.ArrayDynamic)
// Configure terminal if this is the last part
if isTerminal {
valueType, _ := inferDataType(pb.value, pb.op, pb.key)
node.TerminalConfig = &telemetrytypes.TerminalConfig{
Key: pb.key,
ElemType: *pb.key.JSONDataType,
ValueType: telemetrytypes.MappingFieldDataTypeToJSONDataType[valueType],
}
} else {
var err error
if hasJSON {
node.Branches[telemetrytypes.BranchJSON], err = pb.buildPlan(index+1, node, false)
if err != nil {
return nil, err
}
}
if hasDynamic {
node.Branches[telemetrytypes.BranchDynamic], err = pb.buildPlan(index+1, node, true)
if err != nil {
return nil, err
}
}
}
return node, nil
}
// PlanJSON builds a tree structure representing the complete JSON path traversal
// that precomputes all possible branches and their types
func PlanJSON(ctx context.Context, key *telemetrytypes.TelemetryFieldKey, op qbtypes.FilterOperator,
value any,
metadataStore telemetrytypes.MetadataStore,
) (telemetrytypes.JSONAccessPlan, error) {
// if path is empty, return nil
if key.Name == "" {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "path is empty")
}
path := strings.ReplaceAll(key.Name, telemetrytypes.ArrayAnyIndex, telemetrytypes.ArraySep)
parts := strings.Split(path, telemetrytypes.ArraySep)
// Pre-fetch JSON types for all path prefixes in a single metadata call to avoid
// multiple small DB queries during plan construction.
// Extract all path prefixes that will be needed during recursive buildPlan calls
selectors := make([]*telemetrytypes.FieldKeySelector, 0, len(parts))
for i := range parts {
pathSoFar := strings.Join(parts[:i+1], telemetrytypes.ArraySep)
selectors = append(selectors, &telemetrytypes.FieldKeySelector{
Name: pathSoFar,
SelectorMatchType: telemetrytypes.FieldSelectorMatchTypeExact,
Signal: telemetrytypes.SignalLogs,
Limit: 1,
})
}
keys, _, err := metadataStore.GetKeysMulti(ctx, selectors)
if err != nil {
return nil, err
}
// Build type cache from the batched results
typeCache := make(map[string][]telemetrytypes.JSONDataType, len(keys))
for name, ks := range keys {
types := make([]telemetrytypes.JSONDataType, 0, len(ks))
for _, k := range ks {
if k.JSONDataType != nil {
types = append(types, *k.JSONDataType)
}
}
typeCache[name] = types
}
pb := &JSONAccessPlanBuilder{
key: key,
op: op,
value: value,
parts: parts,
isPromoted: key.Materialized,
typeCache: typeCache,
}
plans := telemetrytypes.JSONAccessPlan{}
node, err := pb.buildPlan(0,
telemetrytypes.NewRootJSONAccessNode(LogsV2BodyJSONColumn,
32, 0),
false,
)
if err != nil {
return nil, err
}
plans = append(plans, node)
// TODO: PlanJSON requires the Start and End of the Query to select correct column between promoted and body_json using
// creation time in distributed_promoted_paths
if pb.isPromoted {
node, err := pb.buildPlan(0,
telemetrytypes.NewRootJSONAccessNode(LogsV2BodyPromotedColumn,
32, 1024),
true,
)
if err != nil {
return nil, err
}
plans = append(plans, node)
}
return plans, nil
}

View File

@@ -1,10 +1,8 @@
package telemetrylogs
import (
"context"
"fmt"
"slices"
"strings"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/querybuilder"
@@ -22,19 +20,22 @@ var (
CodePromotedPlanMissing = errors.MustNewCode("promoted_plan_missing")
CodeArrayNavigationFailed = errors.MustNewCode("array_navigation_failed")
)
type jsonConditionBuilder struct {
key *telemetrytypes.TelemetryFieldKey
valueType telemetrytypes.JSONDataType
}
func NewJSONConditionBuilder( key *telemetrytypes.TelemetryFieldKey, valueType telemetrytypes.FieldDataType) *jsonConditionBuilder {
return &jsonConditionBuilder{key: key, valueType: telemetrytypes.MappingFieldDataTypeToJSONDataType[valueType]}
}
// BuildCondition builds the full WHERE condition for body_json JSON paths
func (c *conditionBuilder) buildJSONCondition(ctx context.Context, key *telemetrytypes.TelemetryFieldKey,
operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
plan, err := PlanJSON(ctx, key, operator, value, c.metadataStore)
if err != nil {
return "", err
}
func (c *jsonConditionBuilder) buildJSONCondition(operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
conditions := []string{}
for _, plan := range plan {
condition, err := c.emitPlannedCondition(plan, operator, value, sb)
for _, node := range c.key.JSONPlan {
condition, err := c.emitPlannedCondition(node, operator, value, sb)
if err != nil {
return "", err
}
@@ -44,9 +45,9 @@ func (c *conditionBuilder) buildJSONCondition(ctx context.Context, key *telemetr
}
// emitPlannedCondition handles paths with array traversal
func (c *conditionBuilder) emitPlannedCondition(plan *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
func (c *jsonConditionBuilder) emitPlannedCondition(node *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
// Build traversal + terminal recursively per-hop
compiled, err := c.recurseArrayHops(plan, operator, value, sb)
compiled, err := c.recurseArrayHops(node, operator, value, sb)
if err != nil {
return "", err
}
@@ -54,12 +55,12 @@ func (c *conditionBuilder) emitPlannedCondition(plan *telemetrytypes.JSONAccessN
}
// buildTerminalCondition creates the innermost condition
func (c *conditionBuilder) buildTerminalCondition(node *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
func (c *jsonConditionBuilder) buildTerminalCondition(node *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
if node.TerminalConfig.ElemType.IsArray {
conditions := []string{}
// if the value type is not an array
// TODO(piyush): Confirm the Query built for Array case and add testcases for it later
if !node.TerminalConfig.ValueType.IsArray {
if !c.valueType.IsArray {
// if operator is a String search Operator, then we need to build one more String comparison condition along with the Strict match condition
if operator.IsStringSearchOperator() {
formattedValue := querybuilder.FormatValueForContains(value)
@@ -93,7 +94,7 @@ func (c *conditionBuilder) buildTerminalCondition(node *telemetrytypes.JSONAcces
// buildPrimitiveTerminalCondition builds the condition if the terminal node is a primitive type
// it handles the data type collisions and utilizes indexes for the condition if available
func (c *conditionBuilder) buildPrimitiveTerminalCondition(node *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
func (c *jsonConditionBuilder) buildPrimitiveTerminalCondition(node *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
fieldPath := node.FieldPath()
conditions := []string{}
var formattedValue any = value
@@ -164,19 +165,19 @@ func (c *conditionBuilder) buildPrimitiveTerminalCondition(node *telemetrytypes.
}
// buildArrayMembershipCondition handles array membership checks
func (c *conditionBuilder) buildArrayMembershipCondition(node *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
func (c *jsonConditionBuilder) buildArrayMembershipCondition(node *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
arrayPath := node.FieldPath()
localKeyCopy := *node.TerminalConfig.Key
// create typed array out of a dynamic array
filteredDynamicExpr := func() string {
// Change the field data type from []dynamic to the value type
// since we've filtered the value type out of the dynamic array, we need to change the field data corresponding to the value type
localKeyCopy.FieldDataType = telemetrytypes.MappingJSONDataTypeToFieldDataType[telemetrytypes.ScalerTypeToArrayType[node.TerminalConfig.ValueType]]
localKeyCopy.FieldDataType = telemetrytypes.MappingJSONDataTypeToFieldDataType[telemetrytypes.ScalerTypeToArrayType[c.valueType]]
baseArrayDynamicExpr := fmt.Sprintf("dynamicElement(%s, 'Array(Dynamic)')", arrayPath)
return fmt.Sprintf("arrayMap(x->dynamicElement(x, '%s'), arrayFilter(x->(dynamicType(x) = '%s'), %s))",
node.TerminalConfig.ValueType.StringValue(),
node.TerminalConfig.ValueType.StringValue(),
c.valueType.StringValue(),
c.valueType.StringValue(),
baseArrayDynamicExpr)
}
typedArrayExpr := func() string {
@@ -200,7 +201,7 @@ func (c *conditionBuilder) buildArrayMembershipCondition(node *telemetrytypes.JS
}
// recurseArrayHops recursively builds array traversal conditions
func (c *conditionBuilder) recurseArrayHops(current *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
func (c *jsonConditionBuilder) recurseArrayHops(current *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
if current == nil {
return "", errors.NewInternalf(CodeArrayNavigationFailed, "navigation failed, current node is nil")
}
@@ -247,7 +248,7 @@ func (c *conditionBuilder) recurseArrayHops(current *telemetrytypes.JSONAccessNo
return sb.Or(branches...), nil
}
func (c *conditionBuilder) applyOperator(sb *sqlbuilder.SelectBuilder, fieldExpr string, operator qbtypes.FilterOperator, value any) (string, error) {
func (c *jsonConditionBuilder) applyOperator(sb *sqlbuilder.SelectBuilder, fieldExpr string, operator qbtypes.FilterOperator, value any) (string, error) {
switch operator {
case qbtypes.FilterOperatorEqual:
return sb.E(fieldExpr, value), nil
@@ -304,176 +305,6 @@ func (c *conditionBuilder) applyOperator(sb *sqlbuilder.SelectBuilder, fieldExpr
}
}
// GroupByArrayJoinInfo contains information about array joins needed for GroupBy
type GroupByArrayJoinInfo struct {
ArrayJoinClauses []string // ARRAY JOIN clauses to add to FROM clause
TerminalExpr string // Terminal field expression for SELECT/GROUP BY
}
// BuildGroupBy builds GroupBy information for body JSON fields using arrayConcat pattern
//
// BuildGroupBy was designed to be used for group by queries on body JSON fields existings inside arrays but
// currently it is not used anywhere, considering this case suits more to Data Engineering instead of Observability space.
// This code should be removed in future.
func (c *conditionBuilder) BuildGroupBy(ctx context.Context, key *telemetrytypes.TelemetryFieldKey) (*GroupByArrayJoinInfo, error) {
path := strings.TrimPrefix(key.Name, telemetrytypes.BodyJSONStringSearchPrefix)
plan, err := PlanJSON(ctx, key, qbtypes.FilterOperatorExists, nil, c.metadataStore)
if err != nil {
return nil, err
}
if len(plan) == 0 {
return nil, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput,
"Could not find any valid paths for: %s", path)
}
if plan[0].IsTerminal {
node := plan[0]
expr := fmt.Sprintf("dynamicElement(%s, '%s')", node.FieldPath(), node.TerminalConfig.ElemType.StringValue())
if key.Materialized {
if len(plan) < 2 {
return nil, errors.Newf(errors.TypeUnexpected, CodePromotedPlanMissing,
"plan length is less than 2 for promoted path: %s", path)
}
// promoted column first then body_json column
// TODO(Piyush): Change this in future for better performance
expr = fmt.Sprintf("coalesce(%s, %s)",
fmt.Sprintf("dynamicElement(%s, '%s')", plan[1].FieldPath(), plan[1].TerminalConfig.ElemType.StringValue()),
expr,
)
}
return &GroupByArrayJoinInfo{
ArrayJoinClauses: []string{},
TerminalExpr: expr,
}, nil
}
// Build arrayConcat pattern directly from the tree structure
arrayConcatExpr, err := c.buildArrayConcat(plan)
if err != nil {
return nil, err
}
// Create single ARRAY JOIN clause with arrayFlatten
arrayJoinClause := fmt.Sprintf("ARRAY JOIN %s AS `%s`", arrayConcatExpr, key.Name)
return &GroupByArrayJoinInfo{
ArrayJoinClauses: []string{arrayJoinClause},
TerminalExpr: fmt.Sprintf("`%s`", key.Name),
}, nil
}
// buildArrayConcat builds the arrayConcat pattern directly from the tree structure
func (c *conditionBuilder) buildArrayConcat(plan telemetrytypes.JSONAccessPlan) (string, error) {
if len(plan) == 0 {
return "", errors.Newf(errors.TypeInternal, CodeGroupByPlanEmpty, "group by plan is empty while building arrayConcat")
}
// Build arrayMap expressions for ALL available branches at the root level
var arrayMapExpressions []string
for _, node := range plan {
hasJSON := node.Branches[telemetrytypes.BranchJSON] != nil
hasDynamic := node.Branches[telemetrytypes.BranchDynamic] != nil
if hasJSON {
jsonExpr, err := c.buildArrayMap(node, telemetrytypes.BranchJSON)
if err != nil {
return "", err
}
arrayMapExpressions = append(arrayMapExpressions, jsonExpr)
}
if hasDynamic {
dynamicExpr, err := c.buildArrayMap(node, telemetrytypes.BranchDynamic)
if err != nil {
return "", err
}
arrayMapExpressions = append(arrayMapExpressions, dynamicExpr)
}
}
if len(arrayMapExpressions) == 0 {
return "", errors.Newf(errors.TypeInternal, CodeArrayMapExpressionsEmpty, "array map expressions are empty while building arrayConcat")
}
// Build the arrayConcat expression
arrayConcatExpr := fmt.Sprintf("arrayConcat(%s)", strings.Join(arrayMapExpressions, ", "))
// Wrap with arrayFlatten
arrayFlattenExpr := fmt.Sprintf("arrayFlatten(%s)", arrayConcatExpr)
return arrayFlattenExpr, nil
}
// buildArrayMap builds the arrayMap expression for a specific branch, handling all sub-branches
func (c *conditionBuilder) buildArrayMap(currentNode *telemetrytypes.JSONAccessNode, branchType telemetrytypes.JSONAccessBranchType) (string, error) {
if currentNode == nil {
return "", errors.Newf(errors.TypeInternal, CodeCurrentNodeNil, "current node is nil while building arrayMap")
}
nextNode := currentNode.Branches[branchType]
if nextNode == nil {
return "", errors.Newf(errors.TypeInternal, CodeNextNodeNil, "next node is nil while building arrayMap")
}
// Build the array expression for this level
var arrayExpr string
if branchType == telemetrytypes.BranchJSON {
// Array(JSON) branch
arrayExpr = fmt.Sprintf("dynamicElement(%s, 'Array(JSON(max_dynamic_types=%d, max_dynamic_paths=%d))')",
currentNode.FieldPath(), currentNode.MaxDynamicTypes, currentNode.MaxDynamicPaths)
} else {
// Array(Dynamic) branch - filter for JSON objects
dynBaseExpr := fmt.Sprintf("dynamicElement(%s, 'Array(Dynamic)')", currentNode.FieldPath())
arrayExpr = fmt.Sprintf("arrayMap(x->assumeNotNull(dynamicElement(x, 'JSON')), arrayFilter(x->(dynamicType(x) = 'JSON'), %s))", dynBaseExpr)
}
// If this is the terminal level, return the simple arrayMap
if nextNode.IsTerminal {
dynamicElementExpr := fmt.Sprintf("dynamicElement(%s, '%s')", nextNode.FieldPath(),
nextNode.TerminalConfig.ElemType.StringValue(),
)
return fmt.Sprintf("arrayMap(%s->%s, %s)", currentNode.Alias(), dynamicElementExpr, arrayExpr), nil
}
// For non-terminal nodes, we need to handle ALL possible branches at the next level
var nestedExpressions []string
hasJSON := nextNode.Branches[telemetrytypes.BranchJSON] != nil
hasDynamic := nextNode.Branches[telemetrytypes.BranchDynamic] != nil
if hasJSON {
jsonNested, err := c.buildArrayMap(nextNode, telemetrytypes.BranchJSON)
if err != nil {
return "", err
}
nestedExpressions = append(nestedExpressions, jsonNested)
}
if hasDynamic {
dynamicNested, err := c.buildArrayMap(nextNode, telemetrytypes.BranchDynamic)
if err != nil {
return "", err
}
nestedExpressions = append(nestedExpressions, dynamicNested)
}
// If we have multiple nested expressions, we need to concat them
var nestedExpr string
if len(nestedExpressions) == 1 {
nestedExpr = nestedExpressions[0]
} else if len(nestedExpressions) > 1 {
// This shouldn't happen in our current tree structure, but handle it just in case
nestedExpr = fmt.Sprintf("arrayConcat(%s)", strings.Join(nestedExpressions, ", "))
} else {
return "", errors.Newf(errors.TypeInternal, CodeNestedExpressionsEmpty, "nested expressions are empty while building arrayMap")
}
return fmt.Sprintf("arrayMap(%s->%s, %s)", currentNode.Alias(), nestedExpr, arrayExpr), nil
}
func assumeNotNull(column string, elemType telemetrytypes.JSONDataType) string {
return fmt.Sprintf("assumeNotNull(dynamicElement(%s, '%s'))", column, elemType.StringValue())
}

View File

@@ -137,7 +137,7 @@ func TestStmtBuilderTimeSeriesBodyGroupByPromoted(t *testing.T) {
},
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf((dynamicElement(body_json.`user.age`, 'Int64') IS NOT NULL OR dynamicElement(body_json_promoted.`user.age`, 'Int64') IS NOT NULL), toString(coalesce(dynamicElement(body_json.`user.age`, 'Int64'), dynamicElement(body_json_promoted.`user.age`, 'Int64'))), (dynamicElement(body_json.`user.age`, 'String') IS NOT NULL OR dynamicElement(body_json_promoted.`user.age`, 'String') IS NOT NULL), coalesce(dynamicElement(body_json.`user.age`, 'String'), dynamicElement(body_json_promoted.`user.age`, 'String')), NULL)) AS `user.age`, toString(multiIf((dynamicElement(body_json.`user.name`, 'String') IS NOT NULL OR dynamicElement(body_json_promoted.`user.name`, 'String') IS NOT NULL), coalesce(dynamicElement(body_json.`user.name`, 'String'), dynamicElement(body_json_promoted.`user.name`, 'String')), NULL)) AS `user.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? GROUP BY `user.age`, `user.name` ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf((dynamicElement(body_json.`user.age`, 'Int64') IS NOT NULL OR dynamicElement(body_json_promoted.`user.age`, 'Int64') IS NOT NULL), toString(coalesce(dynamicElement(body_json.`user.age`, 'Int64'), dynamicElement(body_json_promoted.`user.age`, 'Int64'))), (dynamicElement(body_json.`user.age`, 'String') IS NOT NULL OR dynamicElement(body_json_promoted.`user.age`, 'String') IS NOT NULL), coalesce(dynamicElement(body_json.`user.age`, 'String'), dynamicElement(body_json_promoted.`user.age`, 'String')), NULL)) AS `user.age`, toString(multiIf((dynamicElement(body_json.`user.name`, 'String') IS NOT NULL OR dynamicElement(body_json_promoted.`user.name`, 'String') IS NOT NULL), coalesce(dynamicElement(body_json.`user.name`, 'String'), dynamicElement(body_json_promoted.`user.name`, 'String')), NULL)) AS `user.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? AND (`user.age`, `user.name`) GLOBAL IN (SELECT `user.age`, `user.name` FROM __limit_cte) GROUP BY ts, `user.age`, `user.name`",
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf((dynamicElement(body_json.`user.age`, 'Int64') IS NOT NULL OR dynamicElement(body_json_promoted.`user.age`, 'Int64') IS NOT NULL), toString(coalesce(dynamicElement(body_json_promoted.`user.age`, 'Int64'), dynamicElement(body_json.`user.age`, 'Int64'))), (dynamicElement(body_json.`user.age`, 'String') IS NOT NULL OR dynamicElement(body_json_promoted.`user.age`, 'String') IS NOT NULL), coalesce(dynamicElement(body_json_promoted.`user.age`, 'String'), dynamicElement(body_json.`user.age`, 'String')), NULL)) AS `user.age`, toString(multiIf((dynamicElement(body_json.`user.name`, 'String') IS NOT NULL OR dynamicElement(body_json_promoted.`user.name`, 'String') IS NOT NULL), coalesce(dynamicElement(body_json_promoted.`user.name`, 'String'), dynamicElement(body_json.`user.name`, 'String')), NULL)) AS `user.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? GROUP BY `user.age`, `user.name` ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf((dynamicElement(body_json.`user.age`, 'Int64') IS NOT NULL OR dynamicElement(body_json_promoted.`user.age`, 'Int64') IS NOT NULL), toString(coalesce(dynamicElement(body_json_promoted.`user.age`, 'Int64'), dynamicElement(body_json.`user.age`, 'Int64'))), (dynamicElement(body_json.`user.age`, 'String') IS NOT NULL OR dynamicElement(body_json_promoted.`user.age`, 'String') IS NOT NULL), coalesce(dynamicElement(body_json_promoted.`user.age`, 'String'), dynamicElement(body_json.`user.age`, 'String')), NULL)) AS `user.age`, toString(multiIf((dynamicElement(body_json.`user.name`, 'String') IS NOT NULL OR dynamicElement(body_json_promoted.`user.name`, 'String') IS NOT NULL), coalesce(dynamicElement(body_json_promoted.`user.name`, 'String'), dynamicElement(body_json.`user.name`, 'String')), NULL)) AS `user.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? AND (`user.age`, `user.name`) GLOBAL IN (SELECT `user.age`, `user.name` FROM __limit_cte) GROUP BY ts, `user.age`, `user.name`",
Args: []any{uint64(1747945619), uint64(1747983448), "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448)},
},
},
@@ -159,6 +159,84 @@ func TestStmtBuilderTimeSeriesBodyGroupByPromoted(t *testing.T) {
}
}
func TestStatementBuilderListQueryBodyHas(t *testing.T) {
enableBodyJSONQuery(t)
defer func() {
disableBodyJSONQuery(t)
}()
statementBuilder := buildJSONTestStatementBuilder(t)
cases := []struct {
name string
requestType qbtypes.RequestType
query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]
expected qbtypes.Statement
expectedErr error
}{
{
name: "Simple has filter",
requestType: qbtypes.RequestTypeRaw,
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Signal: telemetrytypes.SignalLogs,
Filter: &qbtypes.Filter{Expression: "has(body.education[].parameters, 1.65)"},
Limit: 10,
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body, body_json, body_json_promoted, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (has(arrayFlatten(arrayConcat(arrayMap(`body_json.education`->dynamicElement(`body_json.education`.`parameters`, 'Array(Nullable(Float64))'), dynamicElement(body_json.`education`, 'Array(JSON(max_dynamic_types=16, max_dynamic_paths=0))')))), ?) OR has(arrayFlatten(arrayConcat(arrayMap(`body_json.education`->dynamicElement(`body_json.education`.`parameters`, 'Array(Dynamic)'), dynamicElement(body_json.`education`, 'Array(JSON(max_dynamic_types=16, max_dynamic_paths=0))')))), ?)) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), 1.65, 1.65, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
Warnings: []string{
"Key `education[].parameters` is ambiguous, found 2 different combinations of field context / data type: [name=education[].parameters,context=body,datatype=[]float64 name=education[].parameters,context=body,datatype=[]dynamic].",
},
},
expectedErr: nil,
},
{
name: "Flat path hasAll filter",
requestType: qbtypes.RequestTypeRaw,
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Signal: telemetrytypes.SignalLogs,
Filter: &qbtypes.Filter{Expression: "hasAll(body.user.permissions, ['read', 'write'])"},
Limit: 10,
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body, body_json, body_json_promoted, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND hasAll(dynamicElement(body_json.`user.permissions`, 'Array(Nullable(String))'), ?) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), []any{[]any{"read", "write"}}, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
},
expectedErr: nil,
},
{
name: "Nested path hasAny filter",
requestType: qbtypes.RequestTypeRaw,
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Signal: telemetrytypes.SignalLogs,
Filter: &qbtypes.Filter{Expression: "hasAny(education[].awards[].participated[].members, ['Piyush', 'Tushar'])"},
Limit: 10,
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body, body_json, body_json_promoted, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND hasAny(arrayFlatten(arrayConcat(arrayMap(`body_json.education`->arrayConcat(arrayMap(`body_json.education[].awards`->arrayConcat(arrayMap(`body_json.education[].awards[].participated`->dynamicElement(`body_json.education[].awards[].participated`.`members`, 'Array(Nullable(String))'), dynamicElement(`body_json.education[].awards`.`participated`, 'Array(JSON(max_dynamic_types=4, max_dynamic_paths=0))')), arrayMap(`body_json.education[].awards[].participated`->dynamicElement(`body_json.education[].awards[].participated`.`members`, 'Array(Nullable(String))'), arrayMap(x->assumeNotNull(dynamicElement(x, 'JSON')), arrayFilter(x->(dynamicType(x) = 'JSON'), dynamicElement(`body_json.education[].awards`.`participated`, 'Array(Dynamic)'))))), dynamicElement(`body_json.education`.`awards`, 'Array(JSON(max_dynamic_types=8, max_dynamic_paths=0))')), arrayMap(`body_json.education[].awards`->arrayConcat(arrayMap(`body_json.education[].awards[].participated`->dynamicElement(`body_json.education[].awards[].participated`.`members`, 'Array(Nullable(String))'), dynamicElement(`body_json.education[].awards`.`participated`, 'Array(JSON(max_dynamic_types=16, max_dynamic_paths=256))')), arrayMap(`body_json.education[].awards[].participated`->dynamicElement(`body_json.education[].awards[].participated`.`members`, 'Array(Nullable(String))'), arrayMap(x->assumeNotNull(dynamicElement(x, 'JSON')), arrayFilter(x->(dynamicType(x) = 'JSON'), dynamicElement(`body_json.education[].awards`.`participated`, 'Array(Dynamic)'))))), arrayMap(x->assumeNotNull(dynamicElement(x, 'JSON')), arrayFilter(x->(dynamicType(x) = 'JSON'), dynamicElement(`body_json.education`.`awards`, 'Array(Dynamic)'))))), dynamicElement(body_json.`education`, 'Array(JSON(max_dynamic_types=16, max_dynamic_paths=0))')))), ?) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), []any{[]any{"Piyush", "Tushar"}}, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
},
expectedErr: nil,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query, nil)
if c.expectedErr != nil {
require.Error(t, err)
require.Contains(t, err.Error(), c.expectedErr.Error())
} else {
require.NoError(t, err)
require.Equal(t, c.expected.Query, q.Query)
require.Equal(t, c.expected.Args, q.Args)
require.Equal(t, c.expected.Warnings, q.Warnings)
}
})
}
}
func TestStatementBuilderListQueryBody(t *testing.T) {
enableBodyJSONQuery(t)
defer func() {
@@ -674,10 +752,10 @@ func TestStatementBuilderListQueryBodyMessage(t *testing.T) {
}
}
func buildTestTelemetryMetadataStore(promotedPaths ...string) *telemetrytypestest.MockMetadataStore {
func buildTestTelemetryMetadataStore(t *testing.T, promotedPaths ...string) *telemetrytypestest.MockMetadataStore {
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
types, _ := testTypeSet()
types, _ := telemetrytypes.TestJSONTypeSet()
for path, jsonTypes := range types {
promoted := false
@@ -698,6 +776,11 @@ func buildTestTelemetryMetadataStore(promotedPaths ...string) *telemetrytypestes
JSONDataType: &jsonType,
Materialized: promoted,
}
err := key.SetJSONAccessPlan(telemetrytypes.JSONColumnMetadata{
BaseColumn: LogsV2BodyJSONColumn,
PromotedColumn: LogsV2BodyPromotedColumn,
}, types)
require.NoError(t, err)
mockMetadataStore.SetKey(key)
}
}
@@ -705,10 +788,10 @@ func buildTestTelemetryMetadataStore(promotedPaths ...string) *telemetrytypestes
return mockMetadataStore
}
func buildJSONTestStatementBuilder(_ *testing.T, promotedPaths ...string) *logQueryStatementBuilder {
func buildJSONTestStatementBuilder(t *testing.T, promotedPaths ...string) *logQueryStatementBuilder {
mockMetadataStore := buildTestTelemetryMetadataStore(t, promotedPaths...)
fm := NewFieldMapper()
mockMetadataStore := buildTestTelemetryMetadataStore(promotedPaths...)
cb := NewConditionBuilder(fm, mockMetadataStore)
cb := NewConditionBuilder(fm)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
resourceFilterStmtBuilder := resourcefilter.NewLogResourceFilterStatementBuilder(

View File

@@ -40,14 +40,14 @@ func parseStrValue(valueStr string, operator qbtypes.FilterOperator) (telemetryt
return valueType, parsedValue
}
func inferDataType(value any, operator qbtypes.FilterOperator, key *telemetrytypes.TelemetryFieldKey) (telemetrytypes.FieldDataType, any) {
func InferDataType(value any, operator qbtypes.FilterOperator, key *telemetrytypes.TelemetryFieldKey) (telemetrytypes.FieldDataType, any) {
// check if the value is a int, float, string, bool
valueType := telemetrytypes.FieldDataTypeUnspecified
switch v := value.(type) {
case []any:
// take the first element and infer the type
if len(v) > 0 {
valueType, _ = inferDataType(v[0], operator, key)
valueType, _ = InferDataType(v[0], operator, key)
}
return valueType, v
case uint8, uint16, uint32, uint64, int, int8, int16, int32, int64:
@@ -84,7 +84,7 @@ func getBodyJSONPath(key *telemetrytypes.TelemetryFieldKey) string {
}
func GetBodyJSONKey(_ context.Context, key *telemetrytypes.TelemetryFieldKey, operator qbtypes.FilterOperator, value any) (string, any) {
dataType, value := inferDataType(value, operator, key)
dataType, value := InferDataType(value, operator, key)
// for array types, we need to extract the value from the JSON_QUERY
if dataType == telemetrytypes.FieldDataTypeArrayInt64 ||

View File

@@ -0,0 +1,57 @@
package telemetrylogs
import (
"context"
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
)
// LambdaExpressionStrategy handles condition building for JSON fields
// that require nested lambda functions (arrayExists, arrayMap) because
// ClickHouse doesn't allow flat predicates for fields inside arrays.
type LambdaExpressionStrategy struct {
fm qbtypes.FieldMapper
}
func NewLambdaExpressionStrategy(fm qbtypes.FieldMapper) *LambdaExpressionStrategy {
return &LambdaExpressionStrategy{fm: fm}
}
func (s *LambdaExpressionStrategy) CanHandle(ctx context.Context, key *telemetrytypes.TelemetryFieldKey, column *schema.Column) bool {
// Lambda expressions are needed for JSON columns with array traversal
if column.Type.GetType() != schema.ColumnTypeEnumJSON {
return false
}
if !querybuilder.BodyJSONQueryEnabled {
return false
}
// Check if this requires array traversal (has JSONPlan with non-terminal nodes)
if len(key.JSONPlan) == 0 {
return false
}
// If the first node is terminal, we can use direct expression
if key.JSONPlan[0].IsTerminal {
return false
}
return true
}
func (s *LambdaExpressionStrategy) BuildCondition(
ctx context.Context,
key *telemetrytypes.TelemetryFieldKey,
operator qbtypes.FilterOperator,
value any,
sb *sqlbuilder.SelectBuilder,
) (string, error) {
valueType, value := InferDataType(value, operator, key)
jsonBuilder := NewJSONConditionBuilder(key, valueType)
return jsonBuilder.buildJSONCondition(operator, value, sb)
}

View File

@@ -0,0 +1,88 @@
package telemetrylogs
import (
"fmt"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/huandu/go-sqlbuilder"
)
// logicalFieldMetadata defines how a conceptual/logical field (like "body")
// is projected and how specific operators are translated into conditions.
//
// This allows us to treat fields that are backed by multiple physical columns
// (e.g. body, body_json, body_json_promoted) in a single, centralized place
// without scattering ad-hoc condition logic across builders.
type logicalFieldMetadata struct {
FieldExpressions []string
// Conditions optionally overrides how specific operators are translated
// into SQL conditions for this logical field. If an operator is not present
// here, we fall back to the standard condition-building flow.
Conditions qbtypes.OperatorToConditionRegistry
}
// buildBodyLogicalFieldMetadata creates the metadata for the "body" logical field.
// This function is used to avoid circular dependencies when condition functions
// need to access FieldExpressions.
func buildBodyLogicalFieldMetadata() logicalFieldMetadata {
// Capture field expressions in a local variable to avoid accessing the registry
// from within condition functions (which would create a circular dependency)
fieldExpressions := []string{
LogsV2BodyColumn,
}
if querybuilder.BodyJSONQueryEnabled {
fieldExpressions = append(fieldExpressions, jsonMergeExpr())
}
return logicalFieldMetadata{
FieldExpressions: fieldExpressions,
Conditions: qbtypes.OperatorToConditionRegistry{
// body Exists / body != ""
qbtypes.FilterOperatorExists: func(sb *sqlbuilder.SelectBuilder, _ any) (string, error) {
conditions := []string{
sb.Equal(emptyFunctionWrapped(LogsV2BodyColumn), false),
}
if !querybuilder.BodyJSONQueryEnabled {
return conditions[0], nil
}
conditions = append(conditions, sb.Equal(emptyFunctionWrapped(LogsV2BodyJSONColumn), false))
conditions = append(conditions, sb.Equal(emptyFunctionWrapped(LogsV2BodyPromotedColumn), false))
// Any of the representations being non-empty counts as "exists".
return sb.Or(conditions...), nil
},
// body Not Exists / body == ""
qbtypes.FilterOperatorNotExists: func(sb *sqlbuilder.SelectBuilder, _ any) (string, error) {
conditions := []string{
sb.Equal(emptyFunctionWrapped(LogsV2BodyColumn), true),
}
if !querybuilder.BodyJSONQueryEnabled {
return conditions[0], nil
}
conditions = append(conditions, sb.Equal(emptyFunctionWrapped(LogsV2BodyJSONColumn), true))
conditions = append(conditions, sb.Equal(emptyFunctionWrapped(LogsV2BodyPromotedColumn), true))
// All representations must be empty for "not exists".
return sb.And(conditions...), nil
},
qbtypes.FilterOperatorRegexp: func(sb *sqlbuilder.SelectBuilder, value any) (string, error) {
conditions := []string{}
for _, expression := range fieldExpressions {
conditions = append(conditions, fmt.Sprintf(`match(LOWER(%s), LOWER(%s))`, expression, sb.Var(value)))
}
return sb.Or(conditions...), nil
},
qbtypes.FilterOperatorNotRegexp: func(sb *sqlbuilder.SelectBuilder, value any) (string, error) {
conditions := []string{}
for _, expression := range fieldExpressions {
conditions = append(conditions, fmt.Sprintf(`NOT match(LOWER(%s), LOWER(%s))`, expression, sb.Var(value)))
}
return sb.Or(conditions...), nil
},
},
}
}
var logicalFieldRegistry = map[string]logicalFieldMetadata{
"body": buildBodyLogicalFieldMetadata(),
}

View File

@@ -179,6 +179,7 @@ func (b *logQueryStatementBuilder) adjustKeys(ctx context.Context, keys map[stri
if len(fieldKeys) == 1 {
k.JSONDataType = fieldKeys[0].JSONDataType
k.Materialized = fieldKeys[0].Materialized
k.JSONPlan = fieldKeys[0].JSONPlan
}
k.Indexes = fieldKeys[0].Indexes
@@ -253,11 +254,7 @@ func (b *logQueryStatementBuilder) buildListQuery(
sb.SelectMore(LogsV2SeverityNumberColumn)
sb.SelectMore(LogsV2ScopeNameColumn)
sb.SelectMore(LogsV2ScopeVersionColumn)
sb.SelectMore(LogsV2BodyColumn)
if querybuilder.BodyJSONQueryEnabled {
sb.SelectMore(LogsV2BodyJSONColumn)
sb.SelectMore(LogsV2BodyPromotedColumn)
}
sb.SelectMore(bodyAliasExpression())
sb.SelectMore(LogsV2AttributesStringColumn)
sb.SelectMore(LogsV2AttributesNumberColumn)
sb.SelectMore(LogsV2AttributesBoolColumn)

View File

@@ -195,10 +195,10 @@ func TestStatementBuilderTimeSeries(t *testing.T) {
},
}
fm := NewFieldMapper()
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
cb := NewConditionBuilder(fm, mockMetadataStore)
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
@@ -315,10 +315,10 @@ func TestStatementBuilderListQuery(t *testing.T) {
},
}
fm := NewFieldMapper()
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
cb := NewConditionBuilder(fm, mockMetadataStore)
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
@@ -423,10 +423,10 @@ func TestStatementBuilderListQueryResourceTests(t *testing.T) {
},
}
fm := NewFieldMapper()
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
cb := NewConditionBuilder(fm, mockMetadataStore)
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
@@ -499,10 +499,10 @@ func TestStatementBuilderTimeSeriesBodyGroupBy(t *testing.T) {
},
}
fm := NewFieldMapper()
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
cb := NewConditionBuilder(fm, mockMetadataStore)
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
@@ -594,10 +594,10 @@ func TestStatementBuilderListQueryServiceCollision(t *testing.T) {
},
}
fm := NewFieldMapper()
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMapCollision()
cb := NewConditionBuilder(fm, mockMetadataStore)
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)

View File

@@ -38,7 +38,7 @@ var (
CodeFailedToAppendPath = errors.MustNewCode("failed_to_append_path_promoted_paths")
)
// GetBodyJSONPaths extracts body JSON paths from the path_types table
// fetchBodyJSONPaths extracts body JSON paths from the path_types table
// This function can be used by both JSONQueryBuilder and metadata extraction
// uniquePathLimit: 0 for no limit, >0 for maximum number of unique paths to return
// - For startup load: set to 10000 to get top 10k unique paths
@@ -46,14 +46,12 @@ var (
// - For metadata API: set to desired pagination limit
//
// searchOperator: LIKE for pattern matching, EQUAL for exact match
// Returns: (paths, error)
func (t *telemetryMetaStore) getBodyJSONPaths(ctx context.Context,
fieldKeySelectors []*telemetrytypes.FieldKeySelector) ([]*telemetrytypes.TelemetryFieldKey, bool, error) {
func (t *telemetryMetaStore) fetchBodyJSONPaths(ctx context.Context,
fieldKeySelectors []*telemetrytypes.FieldKeySelector) ([]*telemetrytypes.TelemetryFieldKey, []string, bool, error) {
query, args, limit := buildGetBodyJSONPathsQuery(fieldKeySelectors)
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
return nil, false, errors.WrapInternalf(err, CodeFailExtractBodyJSONKeys, "failed to extract body JSON keys")
return nil, nil, false, errors.WrapInternalf(err, CodeFailExtractBodyJSONKeys, "failed to extract body JSON keys")
}
defer rows.Close()
@@ -67,7 +65,7 @@ func (t *telemetryMetaStore) getBodyJSONPaths(ctx context.Context,
err = rows.Scan(&path, &typesArray, &lastSeen)
if err != nil {
return nil, false, errors.WrapInternalf(err, CodeFailExtractBodyJSONKeys, "failed to scan body JSON key row")
return nil, nil, false, errors.WrapInternalf(err, CodeFailExtractBodyJSONKeys, "failed to scan body JSON key row")
}
for _, typ := range typesArray {
@@ -89,7 +87,18 @@ func (t *telemetryMetaStore) getBodyJSONPaths(ctx context.Context,
rowCount++
}
if rows.Err() != nil {
return nil, false, errors.WrapInternalf(rows.Err(), CodeFailIterateBodyJSONKeys, "error iterating body JSON keys")
return nil, nil, false, errors.WrapInternalf(rows.Err(), CodeFailIterateBodyJSONKeys, "error iterating body JSON keys")
}
return fieldKeys, paths, rowCount <= limit, nil
}
func (t *telemetryMetaStore) buildBodyJSONPaths(ctx context.Context,
fieldKeySelectors []*telemetrytypes.FieldKeySelector) ([]*telemetrytypes.TelemetryFieldKey, bool, error) {
fieldKeys, paths, finished, err := t.fetchBodyJSONPaths(ctx, fieldKeySelectors)
if err != nil {
return nil, false, err
}
promoted, err := t.GetPromotedPaths(ctx, paths...)
@@ -107,7 +116,34 @@ func (t *telemetryMetaStore) getBodyJSONPaths(ctx context.Context,
fieldKey.Indexes = indexes[fieldKey.Name]
}
return fieldKeys, rowCount <= limit, nil
return fieldKeys, finished, t.buildJSONPlans(ctx, fieldKeys)
}
func (t *telemetryMetaStore) buildJSONPlans(ctx context.Context, keys []*telemetrytypes.TelemetryFieldKey) error {
parentSelectors := make([]*telemetrytypes.FieldKeySelector, 0, len(keys))
for _, key := range keys {
parentSelectors = append(parentSelectors, key.ArrayParentSelectors()...)
}
parentKeys, _, _, err := t.fetchBodyJSONPaths(ctx, parentSelectors)
if err != nil {
return err
}
typeCache := make(map[string][]telemetrytypes.JSONDataType)
for _, key := range parentKeys {
typeCache[key.Name] = append(typeCache[key.Name], *key.JSONDataType)
}
// build plans for keys now
for _, key := range keys {
err = key.SetJSONAccessPlan(t.jsonColumnMetadata[telemetrytypes.SignalLogs][telemetrytypes.FieldContextBody], typeCache)
if err != nil {
return err
}
}
return nil
}
func buildGetBodyJSONPathsQuery(fieldKeySelectors []*telemetrytypes.FieldKeySelector) (string, []any, int) {

View File

@@ -50,8 +50,9 @@ type telemetryMetaStore struct {
relatedMetadataDBName string
relatedMetadataTblName string
fm qbtypes.FieldMapper
conditionBuilder qbtypes.ConditionBuilder
fm qbtypes.FieldMapper
conditionBuilder qbtypes.ConditionBuilder
jsonColumnMetadata map[telemetrytypes.Signal]map[telemetrytypes.FieldContext]telemetrytypes.JSONColumnMetadata
}
func escapeForLike(s string) string {
@@ -97,6 +98,14 @@ func NewTelemetryMetaStore(
logResourceKeysTblName: logResourceKeysTblName,
relatedMetadataDBName: relatedMetadataDBName,
relatedMetadataTblName: relatedMetadataTblName,
jsonColumnMetadata: map[telemetrytypes.Signal]map[telemetrytypes.FieldContext]telemetrytypes.JSONColumnMetadata{
telemetrytypes.SignalLogs: {
telemetrytypes.FieldContextBody: telemetrytypes.JSONColumnMetadata{
BaseColumn: telemetrylogs.LogsV2BodyJSONColumn,
PromotedColumn: telemetrylogs.LogsV2BodyPromotedColumn,
},
},
},
}
fm := NewFieldMapper()
@@ -574,7 +583,7 @@ func (t *telemetryMetaStore) getLogsKeys(ctx context.Context, fieldKeySelectors
}
if querybuilder.BodyJSONQueryEnabled {
bodyJSONPaths, finished, err := t.getBodyJSONPaths(ctx, fieldKeySelectors) // LIKE for pattern matching
bodyJSONPaths, finished, err := t.buildBodyJSONPaths(ctx, fieldKeySelectors) // LIKE for pattern matching
if err != nil {
t.logger.ErrorContext(ctx, "failed to extract body JSON paths", "error", err)
}

View File

@@ -23,7 +23,8 @@ const (
// e.g., "body.status" where "body." is the prefix
BodyJSONStringSearchPrefix = "body."
ArraySep = jsontypeexporter.ArraySeparator
ArrayAnyIndex = "[*]."
// TODO(Piyush): Remove once we've migrated to the new array syntax
ArrayAnyIndex = "[*]."
)
type TelemetryFieldKey struct {
@@ -35,10 +36,46 @@ type TelemetryFieldKey struct {
FieldDataType FieldDataType `json:"fieldDataType,omitempty"`
JSONDataType *JSONDataType `json:"-"`
JSONPlan JSONAccessPlan `json:"-"`
Indexes []JSONDataTypeIndex `json:"-"`
Materialized bool `json:"-"` // refers to promoted in case of body.... fields
}
func (f *TelemetryFieldKey) KeyNameContainsArray() bool {
return strings.Contains(f.Name, ArraySep) || strings.Contains(f.Name, ArrayAnyIndex)
}
// ArrayPathSegments returns just the individual segments of the path
// e.g., "education[].awards[].type" -> ["education", "awards", "type"]
func (f *TelemetryFieldKey) ArrayPathSegments() []string {
return strings.Split(strings.ReplaceAll(f.Name, ArrayAnyIndex, ArraySep), ArraySep)
}
func (f *TelemetryFieldKey) ArrayParentPaths() []string {
parts := f.ArrayPathSegments()
paths := make([]string, 0, len(parts))
for i := range parts {
paths = append(paths, strings.Join(parts[:i+1], ArraySep))
}
return paths
}
func (f *TelemetryFieldKey) ArrayParentSelectors() []*FieldKeySelector {
paths := f.ArrayParentPaths()
selectors := make([]*FieldKeySelector, 0, len(paths))
for i := range paths {
selectors = append(selectors, &FieldKeySelector{
Name: paths[i],
SelectorMatchType: FieldSelectorMatchTypeExact,
Signal: f.Signal,
FieldContext: f.FieldContext,
Limit: 1,
})
}
return selectors
}
func (f TelemetryFieldKey) String() string {
var sb strings.Builder
sb.WriteString(fmt.Sprintf("name=%s", f.Name))

View File

@@ -102,6 +102,10 @@ func (f FieldDataType) CHDataType() string {
return "String"
}
func (f FieldDataType) IsArray() bool {
return strings.HasPrefix(f.StringValue(), "[]") || strings.HasSuffix(f.StringValue(), "[]")
}
// UnmarshalJSON implements the json.Unmarshaler interface
func (f *FieldDataType) UnmarshalJSON(data []byte) error {
var str string

View File

@@ -2,9 +2,11 @@ package telemetrytypes
import (
"fmt"
"slices"
"strings"
"github.com/SigNoz/signoz-otel-collector/exporter/jsontypeexporter"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/valuer"
)
@@ -15,14 +17,20 @@ type JSONAccessBranchType struct {
var (
BranchJSON = JSONAccessBranchType{valuer.NewString("json")}
BranchDynamic = JSONAccessBranchType{valuer.NewString("dynamic")}
CodePlanIndexOutOfBounds = errors.MustNewCode("plan_index_out_of_bounds")
)
type JSONColumnMetadata struct {
BaseColumn string
PromotedColumn string
}
type JSONAccessPlan = []*JSONAccessNode
type TerminalConfig struct {
Key *TelemetryFieldKey
ElemType JSONDataType
ValueType JSONDataType
Key *TelemetryFieldKey
ElemType JSONDataType
}
// Node is now a tree structure representing the complete JSON path traversal
@@ -80,3 +88,127 @@ func (n *JSONAccessNode) FieldPath() string {
key := "`" + n.Name + "`"
return n.Parent.Alias() + "." + key
}
type planBuilder struct {
key *TelemetryFieldKey
paths []string // cumulative paths for type cache lookups
segments []string // individual path segments for node names
isPromoted bool
typeCache map[string][]JSONDataType
}
// buildPlan recursively builds the path plan tree
func (pb *planBuilder) buildPlan(index int, parent *JSONAccessNode, isDynArrChild bool) (*JSONAccessNode, error) {
if index >= len(pb.paths) {
return nil, errors.NewInvalidInputf(CodePlanIndexOutOfBounds, "index is out of bounds")
}
pathSoFar := pb.paths[index] // cumulative path for type cache lookup
segmentName := pb.segments[index] // segment name for node
isTerminal := index == len(pb.paths)-1
// Calculate progression parameters based on parent's values
var maxTypes, maxPaths int
if isDynArrChild {
// Child of Dynamic array - reset progression to base values (16, 256)
// This happens when we switch from Array(Dynamic) to Array(JSON)
maxTypes = 16
maxPaths = 256
} else if parent != nil {
// Child of JSON array - use parent's progression divided by 2 and 4
maxTypes = parent.MaxDynamicTypes / 2
maxPaths = parent.MaxDynamicPaths / 4
if maxTypes < 0 {
maxTypes = 0
}
if maxPaths < 0 {
maxPaths = 0
}
}
// Use cached types from the batched metadata query
types, ok := pb.typeCache[pathSoFar]
if !ok {
return nil, errors.NewInternalf(errors.CodeInvalidInput, "types missing for path %s", pathSoFar)
}
// Create node for this path segment
node := &JSONAccessNode{
Name: segmentName,
IsTerminal: isTerminal,
AvailableTypes: types,
Branches: make(map[JSONAccessBranchType]*JSONAccessNode),
Parent: parent,
MaxDynamicTypes: maxTypes,
MaxDynamicPaths: maxPaths,
}
hasJSON := slices.Contains(node.AvailableTypes, ArrayJSON)
hasDynamic := slices.Contains(node.AvailableTypes, ArrayDynamic)
// Configure terminal if this is the last part
if isTerminal {
node.TerminalConfig = &TerminalConfig{
Key: pb.key,
ElemType: *pb.key.JSONDataType,
}
} else {
var err error
if hasJSON {
node.Branches[BranchJSON], err = pb.buildPlan(index+1, node, false)
if err != nil {
return nil, err
}
}
if hasDynamic {
node.Branches[BranchDynamic], err = pb.buildPlan(index+1, node, true)
if err != nil {
return nil, err
}
}
}
return node, nil
}
// buildJSONAccessPlan builds a tree structure representing the complete JSON path traversal
// that precomputes all possible branches and their types
func (key *TelemetryFieldKey) SetJSONAccessPlan(columnInfo JSONColumnMetadata, typeCache map[string][]JSONDataType,
) error {
// if path is empty, return nil
if key.Name == "" {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "path is empty")
}
pb := &planBuilder{
key: key,
paths: key.ArrayParentPaths(),
segments: key.ArrayPathSegments(),
isPromoted: key.Materialized,
typeCache: typeCache,
}
node, err := pb.buildPlan(0,
NewRootJSONAccessNode(columnInfo.BaseColumn,
32, 0),
false,
)
if err != nil {
return err
}
key.JSONPlan = append(key.JSONPlan, node)
if pb.isPromoted {
node, err := pb.buildPlan(0,
NewRootJSONAccessNode(columnInfo.PromotedColumn,
32, 1024),
true,
)
if err != nil {
return err
}
key.JSONPlan = append(key.JSONPlan, node)
}
return nil
}

View File

@@ -1,12 +1,8 @@
package telemetrylogs
package telemetrytypes
import (
"context"
"testing"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"
)
@@ -16,8 +12,8 @@ import (
// ============================================================================
// makeKey creates a TelemetryFieldKey for testing
func makeKey(name string, dataType telemetrytypes.JSONDataType, materialized bool) *telemetrytypes.TelemetryFieldKey {
return &telemetrytypes.TelemetryFieldKey{
func makeKey(name string, dataType JSONDataType, materialized bool) *TelemetryFieldKey {
return &TelemetryFieldKey{
Name: name,
JSONDataType: &dataType,
Materialized: materialized,
@@ -38,14 +34,13 @@ type jsonAccessTestNode struct {
MaxDynamicTypes int `yaml:"maxDynamicTypes,omitempty"`
MaxDynamicPaths int `yaml:"maxDynamicPaths,omitempty"`
ElemType string `yaml:"elemType,omitempty"`
ValueType string `yaml:"valueType,omitempty"`
AvailableTypes []string `yaml:"availableTypes,omitempty"`
Branches map[string]*jsonAccessTestNode `yaml:"branches,omitempty"`
}
// toTestNode converts a JSONAccessNode tree into jsonAccessTestNode so that
// it can be serialized to YAML for easy visual comparison in tests.
func toTestNode(n *telemetrytypes.JSONAccessNode) *jsonAccessTestNode {
func toTestNode(n *JSONAccessNode) *jsonAccessTestNode {
if n == nil {
return nil
}
@@ -74,7 +69,6 @@ func toTestNode(n *telemetrytypes.JSONAccessNode) *jsonAccessTestNode {
// Terminal config
if n.TerminalConfig != nil {
out.ElemType = n.TerminalConfig.ElemType.StringValue()
out.ValueType = n.TerminalConfig.ValueType.StringValue()
}
// Branches
@@ -90,7 +84,7 @@ func toTestNode(n *telemetrytypes.JSONAccessNode) *jsonAccessTestNode {
// plansToYAML converts a slice of JSONAccessNode plans to a YAML string that
// can be compared against a per-test expectedTree.
func plansToYAML(t *testing.T, plans []*telemetrytypes.JSONAccessNode) string {
func plansToYAML(t *testing.T, plans []*JSONAccessNode) string {
t.Helper()
testNodes := make([]*jsonAccessTestNode, 0, len(plans))
@@ -110,17 +104,17 @@ func plansToYAML(t *testing.T, plans []*telemetrytypes.JSONAccessNode) string {
func TestNode_Alias(t *testing.T) {
tests := []struct {
name string
node *telemetrytypes.JSONAccessNode
node *JSONAccessNode
expected string
}{
{
name: "Root node returns name as-is",
node: telemetrytypes.NewRootJSONAccessNode(LogsV2BodyJSONColumn, 32, 0),
expected: LogsV2BodyJSONColumn,
node: NewRootJSONAccessNode("body_json", 32, 0),
expected: "body_json",
},
{
name: "Node without parent returns backticked name",
node: &telemetrytypes.JSONAccessNode{
node: &JSONAccessNode{
Name: "user",
Parent: nil,
},
@@ -128,36 +122,36 @@ func TestNode_Alias(t *testing.T) {
},
{
name: "Node with root parent uses dot separator",
node: &telemetrytypes.JSONAccessNode{
node: &JSONAccessNode{
Name: "age",
Parent: telemetrytypes.NewRootJSONAccessNode(LogsV2BodyJSONColumn, 32, 0),
Parent: NewRootJSONAccessNode("body_json", 32, 0),
},
expected: "`" + LogsV2BodyJSONColumn + ".age`",
expected: "`" + "body_json" + ".age`",
},
{
name: "Node with non-root parent uses array separator",
node: &telemetrytypes.JSONAccessNode{
node: &JSONAccessNode{
Name: "name",
Parent: &telemetrytypes.JSONAccessNode{
Parent: &JSONAccessNode{
Name: "education",
Parent: telemetrytypes.NewRootJSONAccessNode(LogsV2BodyJSONColumn, 32, 0),
Parent: NewRootJSONAccessNode("body_json", 32, 0),
},
},
expected: "`" + LogsV2BodyJSONColumn + ".education[].name`",
expected: "`" + "body_json" + ".education[].name`",
},
{
name: "Nested array path with multiple levels",
node: &telemetrytypes.JSONAccessNode{
node: &JSONAccessNode{
Name: "type",
Parent: &telemetrytypes.JSONAccessNode{
Parent: &JSONAccessNode{
Name: "awards",
Parent: &telemetrytypes.JSONAccessNode{
Parent: &JSONAccessNode{
Name: "education",
Parent: telemetrytypes.NewRootJSONAccessNode(LogsV2BodyJSONColumn, 32, 0),
Parent: NewRootJSONAccessNode("body_json", 32, 0),
},
},
},
expected: "`" + LogsV2BodyJSONColumn + ".education[].awards[].type`",
expected: "`" + "body_json" + ".education[].awards[].type`",
},
}
@@ -172,49 +166,49 @@ func TestNode_Alias(t *testing.T) {
func TestNode_FieldPath(t *testing.T) {
tests := []struct {
name string
node *telemetrytypes.JSONAccessNode
node *JSONAccessNode
expected string
}{
{
name: "Simple field path from root",
node: &telemetrytypes.JSONAccessNode{
node: &JSONAccessNode{
Name: "user",
Parent: telemetrytypes.NewRootJSONAccessNode(LogsV2BodyJSONColumn, 32, 0),
Parent: NewRootJSONAccessNode("body_json", 32, 0),
},
// FieldPath() always wraps the field name in backticks
expected: LogsV2BodyJSONColumn + ".`user`",
expected: "body_json" + ".`user`",
},
{
name: "Field path with backtick-required key",
node: &telemetrytypes.JSONAccessNode{
node: &JSONAccessNode{
Name: "user-name", // requires backtick
Parent: telemetrytypes.NewRootJSONAccessNode(LogsV2BodyJSONColumn, 32, 0),
Parent: NewRootJSONAccessNode("body_json", 32, 0),
},
expected: LogsV2BodyJSONColumn + ".`user-name`",
expected: "body_json" + ".`user-name`",
},
{
name: "Nested field path",
node: &telemetrytypes.JSONAccessNode{
node: &JSONAccessNode{
Name: "age",
Parent: &telemetrytypes.JSONAccessNode{
Parent: &JSONAccessNode{
Name: "user",
Parent: telemetrytypes.NewRootJSONAccessNode(LogsV2BodyJSONColumn, 32, 0),
Parent: NewRootJSONAccessNode("body_json", 32, 0),
},
},
// FieldPath() always wraps the field name in backticks
expected: "`" + LogsV2BodyJSONColumn + ".user`.`age`",
expected: "`" + "body_json" + ".user`.`age`",
},
{
name: "Array element field path",
node: &telemetrytypes.JSONAccessNode{
node: &JSONAccessNode{
Name: "name",
Parent: &telemetrytypes.JSONAccessNode{
Parent: &JSONAccessNode{
Name: "education",
Parent: telemetrytypes.NewRootJSONAccessNode(LogsV2BodyJSONColumn, 32, 0),
Parent: NewRootJSONAccessNode("body_json", 32, 0),
},
},
// FieldPath() always wraps the field name in backticks
expected: "`" + LogsV2BodyJSONColumn + ".education`.`name`",
expected: "`" + "body_json" + ".education`.`name`",
},
}
@@ -231,17 +225,17 @@ func TestNode_FieldPath(t *testing.T) {
// ============================================================================
func TestPlanJSON_BasicStructure(t *testing.T) {
_, metadataStore := testTypeSet()
types, _ := TestJSONTypeSet()
tests := []struct {
name string
key *telemetrytypes.TelemetryFieldKey
key *TelemetryFieldKey
expectErr bool
expectedYAML string
}{
{
name: "Simple path not promoted",
key: makeKey("user.name", telemetrytypes.String, false),
key: makeKey("user.name", String, false),
expectedYAML: `
- name: user.name
column: body_json
@@ -250,12 +244,11 @@ func TestPlanJSON_BasicStructure(t *testing.T) {
maxDynamicTypes: 16
isTerminal: true
elemType: String
valueType: String
`,
},
{
name: "Simple path promoted",
key: makeKey("user.name", telemetrytypes.String, true),
key: makeKey("user.name", String, true),
expectedYAML: `
- name: user.name
column: body_json
@@ -264,7 +257,6 @@ func TestPlanJSON_BasicStructure(t *testing.T) {
maxDynamicTypes: 16
isTerminal: true
elemType: String
valueType: String
- name: user.name
column: body_json_promoted
availableTypes:
@@ -273,12 +265,11 @@ func TestPlanJSON_BasicStructure(t *testing.T) {
maxDynamicPaths: 256
isTerminal: true
elemType: String
valueType: String
`,
},
{
name: "Empty path returns error",
key: makeKey("", telemetrytypes.String, false),
key: makeKey("", String, false),
expectErr: true,
expectedYAML: "",
},
@@ -286,21 +277,24 @@ func TestPlanJSON_BasicStructure(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
plans, err := PlanJSON(context.Background(), tt.key, qbtypes.FilterOperatorEqual, "John", metadataStore)
err := tt.key.SetJSONAccessPlan(JSONColumnMetadata{
BaseColumn: "body_json",
PromotedColumn: "body_json_promoted",
}, types)
if tt.expectErr {
require.Error(t, err)
require.Nil(t, plans)
require.Nil(t, tt.key.JSONPlan)
return
}
require.NoError(t, err)
got := plansToYAML(t, plans)
got := plansToYAML(t, tt.key.JSONPlan)
require.YAMLEq(t, tt.expectedYAML, got)
})
}
}
func TestPlanJSON_ArrayPaths(t *testing.T) {
_, metadataStore := testTypeSet()
types, _ := TestJSONTypeSet()
tests := []struct {
name string
@@ -324,7 +318,6 @@ func TestPlanJSON_ArrayPaths(t *testing.T) {
maxDynamicTypes: 8
isTerminal: true
elemType: String
valueType: String
`,
},
{
@@ -351,7 +344,6 @@ func TestPlanJSON_ArrayPaths(t *testing.T) {
maxDynamicTypes: 4
isTerminal: true
elemType: String
valueType: String
dynamic:
name: type
availableTypes:
@@ -360,7 +352,6 @@ func TestPlanJSON_ArrayPaths(t *testing.T) {
maxDynamicPaths: 256
isTerminal: true
elemType: String
valueType: String
`,
},
{
@@ -408,7 +399,6 @@ func TestPlanJSON_ArrayPaths(t *testing.T) {
- String
isTerminal: true
elemType: String
valueType: String
`,
},
{
@@ -428,34 +418,38 @@ func TestPlanJSON_ArrayPaths(t *testing.T) {
maxDynamicTypes: 8
isTerminal: true
elemType: String
valueType: String
`,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
key := makeKey(tt.path, telemetrytypes.String, false)
plans, err := PlanJSON(context.Background(), key, qbtypes.FilterOperatorEqual, "John", metadataStore)
key := makeKey(tt.path, String, false)
err := key.SetJSONAccessPlan(JSONColumnMetadata{
BaseColumn: "body_json",
PromotedColumn: "body_json_promoted",
}, types)
require.NoError(t, err)
require.NotNil(t, plans)
require.Len(t, plans, 1)
got := plansToYAML(t, plans)
require.NotNil(t, key.JSONPlan)
require.Len(t, key.JSONPlan, 1)
got := plansToYAML(t, key.JSONPlan)
require.YAMLEq(t, tt.expectedYAML, got)
})
}
}
func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) {
_, metadataStore := testTypeSet()
types, _ := TestJSONTypeSet()
path := "education[].awards[].type"
value := "sports"
t.Run("Non-promoted plan", func(t *testing.T) {
key := makeKey(path, telemetrytypes.String, false)
plans, err := PlanJSON(context.Background(), key, qbtypes.FilterOperatorEqual, value, metadataStore)
key := makeKey(path, String, false)
err := key.SetJSONAccessPlan(JSONColumnMetadata{
BaseColumn: "body_json",
PromotedColumn: "body_json_promoted",
}, types)
require.NoError(t, err)
require.Len(t, plans, 1)
require.Len(t, key.JSONPlan, 1)
expectedYAML := `
- name: education
@@ -478,7 +472,6 @@ func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) {
maxDynamicTypes: 4
isTerminal: true
elemType: String
valueType: String
dynamic:
name: type
availableTypes:
@@ -487,17 +480,19 @@ func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) {
maxDynamicPaths: 256
isTerminal: true
elemType: String
valueType: String
`
got := plansToYAML(t, plans)
got := plansToYAML(t, key.JSONPlan)
require.YAMLEq(t, expectedYAML, got)
})
t.Run("Promoted plan", func(t *testing.T) {
key := makeKey(path, telemetrytypes.String, true)
plans, err := PlanJSON(context.Background(), key, qbtypes.FilterOperatorEqual, value, metadataStore)
key := makeKey(path, String, true)
err := key.SetJSONAccessPlan(JSONColumnMetadata{
BaseColumn: "body_json",
PromotedColumn: "body_json_promoted",
}, types)
require.NoError(t, err)
require.Len(t, plans, 2)
require.Len(t, key.JSONPlan, 2)
expectedYAML := `
- name: education
@@ -520,7 +515,6 @@ func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) {
maxDynamicTypes: 4
isTerminal: true
elemType: String
valueType: String
dynamic:
name: type
availableTypes:
@@ -529,7 +523,6 @@ func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) {
maxDynamicPaths: 256
isTerminal: true
elemType: String
valueType: String
- name: education
column: body_json_promoted
availableTypes:
@@ -553,7 +546,6 @@ func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) {
maxDynamicPaths: 16
isTerminal: true
elemType: String
valueType: String
dynamic:
name: type
availableTypes:
@@ -562,39 +554,29 @@ func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) {
maxDynamicPaths: 256
isTerminal: true
elemType: String
valueType: String
`
got := plansToYAML(t, plans)
got := plansToYAML(t, key.JSONPlan)
require.YAMLEq(t, expectedYAML, got)
})
}
func TestPlanJSON_EdgeCases(t *testing.T) {
_, metadataStore := testTypeSet()
types, _ := TestJSONTypeSet()
tests := []struct {
name string
path string
value any
expectedYAML string
expectErr bool
}{
{
name: "Path with no available types",
path: "unknown.path",
value: "test",
expectedYAML: `
- name: unknown.path
column: body_json
maxDynamicTypes: 16
isTerminal: true
elemType: String
valueType: String
`,
name: "Path with no available types",
path: "unknown.path",
expectErr: true,
},
{
name: "Very deep nesting - validates progression doesn't go negative",
path: "interests[].entities[].reviews[].entries[].metadata[].positions[].name",
value: "Engineer",
expectedYAML: `
- name: interests
column: body_json
@@ -637,13 +619,11 @@ func TestPlanJSON_EdgeCases(t *testing.T) {
- String
isTerminal: true
elemType: String
valueType: String
`,
},
{
name: "Path with mixed scalar and array types",
path: "education[].type",
value: "high_school",
expectedYAML: `
- name: education
column: body_json
@@ -659,13 +639,11 @@ func TestPlanJSON_EdgeCases(t *testing.T) {
maxDynamicTypes: 8
isTerminal: true
elemType: String
valueType: String
`,
},
{
name: "Exists with only array types available",
path: "education",
value: nil,
expectedYAML: `
- name: education
column: body_json
@@ -681,29 +659,39 @@ func TestPlanJSON_EdgeCases(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Choose key type based on path; operator does not affect the tree shape asserted here.
keyType := telemetrytypes.String
keyType := String
switch tt.path {
case "education":
keyType = telemetrytypes.ArrayJSON
keyType = ArrayJSON
case "education[].type":
keyType = telemetrytypes.String
keyType = String
}
key := makeKey(tt.path, keyType, false)
plans, err := PlanJSON(context.Background(), key, qbtypes.FilterOperatorEqual, tt.value, metadataStore)
err := key.SetJSONAccessPlan(JSONColumnMetadata{
BaseColumn: "body_json",
PromotedColumn: "body_json_promoted",
}, types)
if tt.expectErr {
require.Error(t, err)
return
}
require.NoError(t, err)
got := plansToYAML(t, plans)
got := plansToYAML(t, key.JSONPlan)
require.YAMLEq(t, tt.expectedYAML, got)
})
}
}
func TestPlanJSON_TreeStructure(t *testing.T) {
_, metadataStore := testTypeSet()
types, _ := TestJSONTypeSet()
path := "education[].awards[].participated[].team[].branch"
key := makeKey(path, telemetrytypes.String, false)
plans, err := PlanJSON(context.Background(), key, qbtypes.FilterOperatorEqual, "John", metadataStore)
key := makeKey(path, String, false)
err := key.SetJSONAccessPlan(JSONColumnMetadata{
BaseColumn: "body_json",
PromotedColumn: "body_json_promoted",
}, types)
require.NoError(t, err)
require.Len(t, plans, 1)
require.Len(t, key.JSONPlan, 1)
expectedYAML := `
- name: education
@@ -739,7 +727,6 @@ func TestPlanJSON_TreeStructure(t *testing.T) {
maxDynamicTypes: 1
isTerminal: true
elemType: String
valueType: String
dynamic:
name: team
availableTypes:
@@ -755,7 +742,6 @@ func TestPlanJSON_TreeStructure(t *testing.T) {
maxDynamicPaths: 64
isTerminal: true
elemType: String
valueType: String
dynamic:
name: participated
availableTypes:
@@ -779,7 +765,6 @@ func TestPlanJSON_TreeStructure(t *testing.T) {
maxDynamicPaths: 16
isTerminal: true
elemType: String
valueType: String
dynamic:
name: team
availableTypes:
@@ -795,89 +780,8 @@ func TestPlanJSON_TreeStructure(t *testing.T) {
maxDynamicPaths: 64
isTerminal: true
elemType: String
valueType: String
`
got := plansToYAML(t, plans)
got := plansToYAML(t, key.JSONPlan)
require.YAMLEq(t, expectedYAML, got)
}
// ============================================================================
// Test Data Setup
// ============================================================================
// testTypeSet returns a map of path->types and a mock MetadataStore for testing
// This represents the type information available in the test JSON structure
func testTypeSet() (map[string][]telemetrytypes.JSONDataType, telemetrytypes.MetadataStore) {
types := map[string][]telemetrytypes.JSONDataType{
"user.name": {telemetrytypes.String},
"user.age": {telemetrytypes.Int64, telemetrytypes.String},
"user.height": {telemetrytypes.Float64},
"education": {telemetrytypes.ArrayJSON},
"education[].name": {telemetrytypes.String},
"education[].type": {telemetrytypes.String, telemetrytypes.Int64},
"education[].internal_type": {telemetrytypes.String},
"education[].metadata.location": {telemetrytypes.String},
"education[].parameters": {telemetrytypes.ArrayFloat64, telemetrytypes.ArrayDynamic},
"education[].duration": {telemetrytypes.String},
"education[].mode": {telemetrytypes.String},
"education[].year": {telemetrytypes.Int64},
"education[].field": {telemetrytypes.String},
"education[].awards": {telemetrytypes.ArrayDynamic, telemetrytypes.ArrayJSON},
"education[].awards[].name": {telemetrytypes.String},
"education[].awards[].rank": {telemetrytypes.Int64},
"education[].awards[].medal": {telemetrytypes.String},
"education[].awards[].type": {telemetrytypes.String},
"education[].awards[].semester": {telemetrytypes.Int64},
"education[].awards[].participated": {telemetrytypes.ArrayDynamic, telemetrytypes.ArrayJSON},
"education[].awards[].participated[].type": {telemetrytypes.String},
"education[].awards[].participated[].field": {telemetrytypes.String},
"education[].awards[].participated[].project_type": {telemetrytypes.String},
"education[].awards[].participated[].project_name": {telemetrytypes.String},
"education[].awards[].participated[].race_type": {telemetrytypes.String},
"education[].awards[].participated[].team_based": {telemetrytypes.Bool},
"education[].awards[].participated[].team_name": {telemetrytypes.String},
"education[].awards[].participated[].team": {telemetrytypes.ArrayJSON},
"education[].awards[].participated[].team[].name": {telemetrytypes.String},
"education[].awards[].participated[].team[].branch": {telemetrytypes.String},
"education[].awards[].participated[].team[].semester": {telemetrytypes.Int64},
"interests": {telemetrytypes.ArrayJSON},
"interests[].type": {telemetrytypes.String},
"interests[].entities": {telemetrytypes.ArrayJSON},
"interests[].entities.application_date": {telemetrytypes.String},
"interests[].entities[].reviews": {telemetrytypes.ArrayJSON},
"interests[].entities[].reviews[].given_by": {telemetrytypes.String},
"interests[].entities[].reviews[].remarks": {telemetrytypes.String},
"interests[].entities[].reviews[].weight": {telemetrytypes.Float64},
"interests[].entities[].reviews[].passed": {telemetrytypes.Bool},
"interests[].entities[].reviews[].type": {telemetrytypes.String},
"interests[].entities[].reviews[].analysis_type": {telemetrytypes.Int64},
"interests[].entities[].reviews[].entries": {telemetrytypes.ArrayJSON},
"interests[].entities[].reviews[].entries[].subject": {telemetrytypes.String},
"interests[].entities[].reviews[].entries[].status": {telemetrytypes.String},
"interests[].entities[].reviews[].entries[].metadata": {telemetrytypes.ArrayJSON},
"interests[].entities[].reviews[].entries[].metadata[].company": {telemetrytypes.String},
"interests[].entities[].reviews[].entries[].metadata[].experience": {telemetrytypes.Int64},
"interests[].entities[].reviews[].entries[].metadata[].unit": {telemetrytypes.String},
"interests[].entities[].reviews[].entries[].metadata[].positions": {telemetrytypes.ArrayJSON},
"interests[].entities[].reviews[].entries[].metadata[].positions[].name": {telemetrytypes.String},
"interests[].entities[].reviews[].entries[].metadata[].positions[].duration": {telemetrytypes.Int64, telemetrytypes.Float64},
"interests[].entities[].reviews[].entries[].metadata[].positions[].unit": {telemetrytypes.String},
"interests[].entities[].reviews[].entries[].metadata[].positions[].ratings": {telemetrytypes.ArrayInt64, telemetrytypes.ArrayString},
"message": {telemetrytypes.String},
}
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
for path, dataTypes := range types {
for _, dataType := range dataTypes {
mockMetadataStore.SetKey(&telemetrytypes.TelemetryFieldKey{
Name: path,
JSONDataType: &dataType,
Signal: telemetrytypes.SignalLogs,
FieldContext: telemetrytypes.FieldContextBody,
FieldDataType: telemetrytypes.MappingJSONDataTypeToFieldDataType[dataType],
})
}
}
return types, mockMetadataStore
}

View File

@@ -0,0 +1,71 @@
package telemetrytypes
// ============================================================================
// Test JSON Type Set Data Setup
// ============================================================================
// TestJSONTypeSet returns a map of path->types for testing
// This represents the type information available in the test JSON structure
func TestJSONTypeSet() (map[string][]JSONDataType, MetadataStore) {
types := map[string][]JSONDataType{
"user.name": {String},
"user.permissions": {ArrayString},
"user.age": {Int64, String},
"user.height": {Float64},
"education": {ArrayJSON},
"education[].name": {String},
"education[].type": {String, Int64},
"education[].internal_type": {String},
"education[].metadata.location": {String},
"education[].parameters": {ArrayFloat64, ArrayDynamic},
"education[].duration": {String},
"education[].mode": {String},
"education[].year": {Int64},
"education[].field": {String},
"education[].awards": {ArrayDynamic, ArrayJSON},
"education[].awards[].name": {String},
"education[].awards[].rank": {Int64},
"education[].awards[].medal": {String},
"education[].awards[].type": {String},
"education[].awards[].semester": {Int64},
"education[].awards[].participated": {ArrayDynamic, ArrayJSON},
"education[].awards[].participated[].type": {String},
"education[].awards[].participated[].field": {String},
"education[].awards[].participated[].project_type": {String},
"education[].awards[].participated[].project_name": {String},
"education[].awards[].participated[].race_type": {String},
"education[].awards[].participated[].team_based": {Bool},
"education[].awards[].participated[].team_name": {String},
"education[].awards[].participated[].team": {ArrayJSON},
"education[].awards[].participated[].members": {ArrayString},
"education[].awards[].participated[].team[].name": {String},
"education[].awards[].participated[].team[].branch": {String},
"education[].awards[].participated[].team[].semester": {Int64},
"interests": {ArrayJSON},
"interests[].type": {String},
"interests[].entities": {ArrayJSON},
"interests[].entities.application_date": {String},
"interests[].entities[].reviews": {ArrayJSON},
"interests[].entities[].reviews[].given_by": {String},
"interests[].entities[].reviews[].remarks": {String},
"interests[].entities[].reviews[].weight": {Float64},
"interests[].entities[].reviews[].passed": {Bool},
"interests[].entities[].reviews[].type": {String},
"interests[].entities[].reviews[].analysis_type": {Int64},
"interests[].entities[].reviews[].entries": {ArrayJSON},
"interests[].entities[].reviews[].entries[].subject": {String},
"interests[].entities[].reviews[].entries[].status": {String},
"interests[].entities[].reviews[].entries[].metadata": {ArrayJSON},
"interests[].entities[].reviews[].entries[].metadata[].company": {String},
"interests[].entities[].reviews[].entries[].metadata[].experience": {Int64},
"interests[].entities[].reviews[].entries[].metadata[].unit": {String},
"interests[].entities[].reviews[].entries[].metadata[].positions": {ArrayJSON},
"interests[].entities[].reviews[].entries[].metadata[].positions[].name": {String},
"interests[].entities[].reviews[].entries[].metadata[].positions[].duration": {Int64, Float64},
"interests[].entities[].reviews[].entries[].metadata[].positions[].unit": {String},
"interests[].entities[].reviews[].entries[].metadata[].positions[].ratings": {ArrayInt64, ArrayString},
"message": {String},
}
return types, nil
}