From 7274d51236132066e154b2190ad2918ee2be1185 Mon Sep 17 00:00:00 2001 From: Piyush Singariya Date: Thu, 29 Jan 2026 14:53:54 +0530 Subject: [PATCH] feat: has function support New JSON QB (#10050) * feat: has JSON QB * fix: tests expected queries and values * fix: ignored .vscode in gitignore * fix: tests GroupBy * revert: gitignore change * fix: build json plans in metadata * fix: empty filteredArrays condition * fix: tests * fix: tests * fix: json qb test fix * fix: review based on tushar * fix: changes based on review from Srikanth * fix: remove unnecessary bool checking * fix: removed comment * chore: var renamed * fix: merge conflict * test: fix * fix: tests * fix: go test flakiness --------- Co-authored-by: Srikanth Chekuri --- .gitignore | 1 - pkg/querier/signozquerier/provider.go | 2 +- pkg/querybuilder/collision.go | 5 +- pkg/querybuilder/where_clause_visitor.go | 27 +- pkg/telemetrylogs/condition_builder.go | 10 +- pkg/telemetrylogs/condition_builder_test.go | 9 +- pkg/telemetrylogs/field_mapper.go | 138 ++++++++- .../filter_expr_like_warning_test.go | 4 +- .../filter_expr_logs_body_json_test.go | 3 +- pkg/telemetrylogs/filter_expr_logs_test.go | 4 +- pkg/telemetrylogs/json_access_pb.go | 179 ----------- pkg/telemetrylogs/json_condition_builder.go | 217 ++----------- pkg/telemetrylogs/json_stmt_builder_test.go | 95 +++++- pkg/telemetrylogs/json_string.go | 6 +- pkg/telemetrylogs/stmt_builder_test.go | 22 +- pkg/telemetrymetadata/body_json_metadata.go | 54 +++- pkg/telemetrymetadata/metadata.go | 15 +- pkg/types/telemetrytypes/field.go | 39 ++- pkg/types/telemetrytypes/field_datatype.go | 4 + pkg/types/telemetrytypes/json_access_plan.go | 145 ++++++++- .../telemetrytypes/json_access_plan_test.go} | 288 ++++++------------ pkg/types/telemetrytypes/test_data.go | 71 +++++ 22 files changed, 707 insertions(+), 631 deletions(-) delete mode 100644 pkg/telemetrylogs/json_access_pb.go rename pkg/{telemetrylogs/json_access_pb_test.go => types/telemetrytypes/json_access_plan_test.go} (59%) create mode 100644 pkg/types/telemetrytypes/test_data.go diff --git a/.gitignore b/.gitignore index 3589864696..dbe82abd10 100644 --- a/.gitignore +++ b/.gitignore @@ -57,7 +57,6 @@ bin/ .local/ */query-service/queries.active ee/query-service/db - # e2e e2e/node_modules/ diff --git a/pkg/querier/signozquerier/provider.go b/pkg/querier/signozquerier/provider.go index f19f6a1d27..77c15cfc65 100644 --- a/pkg/querier/signozquerier/provider.go +++ b/pkg/querier/signozquerier/provider.go @@ -105,7 +105,7 @@ func newProvider( // Create log statement builder logFieldMapper := telemetrylogs.NewFieldMapper() - logConditionBuilder := telemetrylogs.NewConditionBuilder(logFieldMapper, telemetryMetadataStore) + logConditionBuilder := telemetrylogs.NewConditionBuilder(logFieldMapper) logResourceFilterStmtBuilder := resourcefilter.NewLogResourceFilterStatementBuilder( settings, resourceFilterFieldMapper, diff --git a/pkg/querybuilder/collision.go b/pkg/querybuilder/collision.go index 5d6dc450a3..e0e1686131 100644 --- a/pkg/querybuilder/collision.go +++ b/pkg/querybuilder/collision.go @@ -152,7 +152,7 @@ func AdjustKey(key *telemetrytypes.TelemetryFieldKey, keys map[string][]*telemet key.JSONDataType = intrinsicOrCalculatedField.JSONDataType key.Indexes = intrinsicOrCalculatedField.Indexes key.Materialized = intrinsicOrCalculatedField.Materialized - + key.JSONPlan = intrinsicOrCalculatedField.JSONPlan return actions } @@ -204,7 +204,8 @@ func AdjustKey(key *telemetrytypes.TelemetryFieldKey, keys map[string][]*telemet key.JSONDataType = matchingKey.JSONDataType key.Indexes = matchingKey.Indexes key.Materialized = matchingKey.Materialized - + key.JSONPlan = matchingKey.JSONPlan + return actions } else { // multiple matching keys, set materialized only if all the keys are materialized diff --git a/pkg/querybuilder/where_clause_visitor.go b/pkg/querybuilder/where_clause_visitor.go index f2e757a426..93301e0d7a 100644 --- a/pkg/querybuilder/where_clause_visitor.go +++ b/pkg/querybuilder/where_clause_visitor.go @@ -690,6 +690,22 @@ func (v *filterExpressionVisitor) VisitFunctionCall(ctx *grammar.FunctionCallCon v.errors = append(v.errors, fmt.Sprintf("function `%s` expects key parameter to be a field key", functionName)) return "" } + + // filter arrays from keys + if BodyJSONQueryEnabled && functionName != "hasToken" { + filteredKeys := []*telemetrytypes.TelemetryFieldKey{} + for _, key := range keys { + if key.FieldDataType.IsArray() { + filteredKeys = append(filteredKeys, key) + } + } + if len(filteredKeys) == 0 { + v.errors = append(v.errors, fmt.Sprintf("function `%s` expects key parameter to be an array field; no array fields found", functionName)) + return "" + } + keys = filteredKeys + } + value := params[1:] var conds []string for _, key := range keys { @@ -716,7 +732,16 @@ func (v *filterExpressionVisitor) VisitFunctionCall(ctx *grammar.FunctionCallCon } else { // this is that all other functions only support array fields if key.FieldContext == telemetrytypes.FieldContextBody { - fieldName, _ = v.jsonKeyToKey(context.Background(), key, qbtypes.FilterOperatorUnknown, value) + var err error + if BodyJSONQueryEnabled { + fieldName, err = v.fieldMapper.FieldFor(context.Background(), key) + if err != nil { + v.errors = append(v.errors, fmt.Sprintf("failed to get field name for key %s: %s", key.Name, err.Error())) + return "" + } + } else { + fieldName, _ = v.jsonKeyToKey(context.Background(), key, qbtypes.FilterOperatorUnknown, value) + } } else { // TODO(add docs for json body search) if v.mainErrorURL == "" { diff --git a/pkg/telemetrylogs/condition_builder.go b/pkg/telemetrylogs/condition_builder.go index c3a0e23402..3106786b2d 100644 --- a/pkg/telemetrylogs/condition_builder.go +++ b/pkg/telemetrylogs/condition_builder.go @@ -16,12 +16,11 @@ import ( ) type conditionBuilder struct { - fm qbtypes.FieldMapper - metadataStore telemetrytypes.MetadataStore + fm qbtypes.FieldMapper } -func NewConditionBuilder(fm qbtypes.FieldMapper, metadataStore telemetrytypes.MetadataStore) *conditionBuilder { - return &conditionBuilder{fm: fm, metadataStore: metadataStore} +func NewConditionBuilder(fm qbtypes.FieldMapper) *conditionBuilder { + return &conditionBuilder{fm: fm} } func (c *conditionBuilder) conditionFor( @@ -37,7 +36,8 @@ func (c *conditionBuilder) conditionFor( } if column.IsJSONColumn() && querybuilder.BodyJSONQueryEnabled { - cond, err := c.buildJSONCondition(ctx, key, operator, value, sb) + valueType, value := InferDataType(value, operator, key) + cond, err := NewJSONConditionBuilder(key, valueType).buildJSONCondition(operator, value, sb) if err != nil { return "", err } diff --git a/pkg/telemetrylogs/condition_builder_test.go b/pkg/telemetrylogs/condition_builder_test.go index 40514c2625..739b21fb31 100644 --- a/pkg/telemetrylogs/condition_builder_test.go +++ b/pkg/telemetrylogs/condition_builder_test.go @@ -373,8 +373,7 @@ func TestConditionFor(t *testing.T) { } fm := NewFieldMapper() - mockMetadataStore := buildTestTelemetryMetadataStore() - conditionBuilder := NewConditionBuilder(fm, mockMetadataStore) + conditionBuilder := NewConditionBuilder(fm) for _, tc := range testCases { sb := sqlbuilder.NewSelectBuilder() @@ -427,8 +426,7 @@ func TestConditionForMultipleKeys(t *testing.T) { } fm := NewFieldMapper() - mockMetadataStore := buildTestTelemetryMetadataStore() - conditionBuilder := NewConditionBuilder(fm, mockMetadataStore) + conditionBuilder := NewConditionBuilder(fm) for _, tc := range testCases { sb := sqlbuilder.NewSelectBuilder() @@ -687,8 +685,7 @@ func TestConditionForJSONBodySearch(t *testing.T) { } fm := NewFieldMapper() - mockMetadataStore := buildTestTelemetryMetadataStore() - conditionBuilder := NewConditionBuilder(fm, mockMetadataStore) + conditionBuilder := NewConditionBuilder(fm) for _, tc := range testCases { sb := sqlbuilder.NewSelectBuilder() diff --git a/pkg/telemetrylogs/field_mapper.go b/pkg/telemetrylogs/field_mapper.go index 05386aefb6..c06c03261d 100644 --- a/pkg/telemetrylogs/field_mapper.go +++ b/pkg/telemetrylogs/field_mapper.go @@ -140,20 +140,15 @@ func (m *fieldMapper) FieldFor(ctx context.Context, key *telemetrytypes.Telemetr } return fmt.Sprintf("multiIf(%s.`%s` IS NOT NULL, %s.`%s`::String, mapContains(%s, '%s'), %s, NULL)", column.Name, key.Name, column.Name, key.Name, oldColumn.Name, key.Name, oldKeyName), nil case telemetrytypes.FieldContextBody: - if querybuilder.BodyJSONQueryEnabled && (strings.Contains(key.Name, telemetrytypes.ArraySep) || strings.Contains(key.Name, telemetrytypes.ArrayAnyIndex)) { - return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "FieldFor not supported for the Array Paths: %s", key.Name) - } if key.JSONDataType == nil { return "", qbtypes.ErrColumnNotFound } - fieldExpr := BodyJSONColumnPrefix + fmt.Sprintf("`%s`", key.Name) - expr := fmt.Sprintf("dynamicElement(%s, '%s')", fieldExpr, key.JSONDataType.StringValue()) - if key.Materialized { - promotedFieldExpr := BodyPromotedColumnPrefix + fmt.Sprintf("`%s`", key.Name) - expr = fmt.Sprintf("coalesce(%s, %s)", expr, fmt.Sprintf("dynamicElement(%s, '%s')", promotedFieldExpr, key.JSONDataType.StringValue())) + if key.KeyNameContainsArray() && !key.JSONDataType.IsArray { + return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "FieldFor not supported for nested fields; only supported for flat paths (e.g. body.status.detail) and paths of Array type: %s(%s)", key.Name, key.FieldDataType) } - return expr, nil + + return m.buildFieldForJSON(key) default: return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "only resource/body context fields are supported for json columns, got %s", key.FieldContext.String) } @@ -240,3 +235,128 @@ func (m *fieldMapper) ColumnExpressionFor( return fmt.Sprintf("%s AS `%s`", sqlbuilder.Escape(colName), field.Name), nil } + +// buildFieldForJSON builds the field expression for body JSON fields using arrayConcat pattern +func (m *fieldMapper) buildFieldForJSON(key *telemetrytypes.TelemetryFieldKey) (string, error) { + plan := key.JSONPlan + if len(plan) == 0 { + return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, + "Could not find any valid paths for: %s", key.Name) + } + + if plan[0].IsTerminal { + node := plan[0] + + expr := fmt.Sprintf("dynamicElement(%s, '%s')", node.FieldPath(), node.TerminalConfig.ElemType.StringValue()) + if key.Materialized { + if len(plan) < 2 { + return "", errors.Newf(errors.TypeUnexpected, CodePromotedPlanMissing, + "plan length is less than 2 for promoted path: %s", key.Name) + } + + // promoted column first then body_json column + // TODO(Piyush): Change this in future for better performance + expr = fmt.Sprintf("coalesce(%s, %s)", + fmt.Sprintf("dynamicElement(%s, '%s')", plan[1].FieldPath(), plan[1].TerminalConfig.ElemType.StringValue()), + expr, + ) + } + + return expr, nil + } + + // Build arrayConcat pattern directly from the tree structure + arrayConcatExpr, err := m.buildArrayConcat(plan) + if err != nil { + return "", err + } + + return arrayConcatExpr, nil +} + +// buildArrayConcat builds the arrayConcat pattern directly from the tree structure +func (m *fieldMapper) buildArrayConcat(plan telemetrytypes.JSONAccessPlan) (string, error) { + if len(plan) == 0 { + return "", errors.Newf(errors.TypeInternal, CodeGroupByPlanEmpty, "group by plan is empty while building arrayConcat") + } + + // Build arrayMap expressions for ALL available branches at the root level. + // Iterate branches in deterministic order (JSON then Dynamic) so generated SQL + // is stable across environments; map iteration order is random in Go. + var arrayMapExpressions []string + for _, node := range plan { + for _, branchType := range node.BranchesInOrder() { + expr, err := m.buildArrayMap(node, branchType) + if err != nil { + return "", err + } + arrayMapExpressions = append(arrayMapExpressions, expr) + } + } + if len(arrayMapExpressions) == 0 { + return "", errors.Newf(errors.TypeInternal, CodeArrayMapExpressionsEmpty, "array map expressions are empty while building arrayConcat") + } + + // Build the arrayConcat expression + arrayConcatExpr := fmt.Sprintf("arrayConcat(%s)", strings.Join(arrayMapExpressions, ", ")) + + // Wrap with arrayFlatten + arrayFlattenExpr := fmt.Sprintf("arrayFlatten(%s)", arrayConcatExpr) + + return arrayFlattenExpr, nil +} + +// buildArrayMap builds the arrayMap expression for a specific branch, handling all sub-branches +func (m *fieldMapper) buildArrayMap(currentNode *telemetrytypes.JSONAccessNode, branchType telemetrytypes.JSONAccessBranchType) (string, error) { + if currentNode == nil { + return "", errors.Newf(errors.TypeInternal, CodeCurrentNodeNil, "current node is nil while building arrayMap") + } + + childNode := currentNode.Branches[branchType] + if childNode == nil { + return "", errors.Newf(errors.TypeInternal, CodeChildNodeNil, "child node is nil while building arrayMap") + } + + // Build the array expression for this level + var arrayExpr string + if branchType == telemetrytypes.BranchJSON { + // Array(JSON) branch + arrayExpr = fmt.Sprintf("dynamicElement(%s, 'Array(JSON(max_dynamic_types=%d, max_dynamic_paths=%d))')", + currentNode.FieldPath(), currentNode.MaxDynamicTypes, currentNode.MaxDynamicPaths) + } else { + // Array(Dynamic) branch - filter for JSON objects + dynBaseExpr := fmt.Sprintf("dynamicElement(%s, 'Array(Dynamic)')", currentNode.FieldPath()) + arrayExpr = fmt.Sprintf("arrayMap(x->assumeNotNull(dynamicElement(x, 'JSON')), arrayFilter(x->(dynamicType(x) = 'JSON'), %s))", dynBaseExpr) + } + + // If this is the terminal level, return the simple arrayMap + if childNode.IsTerminal { + dynamicElementExpr := fmt.Sprintf("dynamicElement(%s, '%s')", childNode.FieldPath(), + childNode.TerminalConfig.ElemType.StringValue(), + ) + return fmt.Sprintf("arrayMap(%s->%s, %s)", currentNode.Alias(), dynamicElementExpr, arrayExpr), nil + } + + // For non-terminal nodes, we need to handle ALL possible branches at the next level. + // Use deterministic branch order so generated SQL is stable across environments. + var nestedExpressions []string + for _, branchType := range childNode.BranchesInOrder() { + expr, err := m.buildArrayMap(childNode, branchType) + if err != nil { + return "", err + } + nestedExpressions = append(nestedExpressions, expr) + } + + // If we have multiple nested expressions, we need to concat them + var nestedExpr string + if len(nestedExpressions) == 1 { + nestedExpr = nestedExpressions[0] + } else if len(nestedExpressions) > 1 { + nestedExpr = fmt.Sprintf("arrayConcat(%s)", strings.Join(nestedExpressions, ", ")) + } else { + return "", errors.Newf(errors.TypeInternal, CodeNestedExpressionsEmpty, "nested expressions are empty while building arrayMap") + } + + return fmt.Sprintf("arrayMap(%s->%s, %s)", currentNode.Alias(), nestedExpr, arrayExpr), nil +} diff --git a/pkg/telemetrylogs/filter_expr_like_warning_test.go b/pkg/telemetrylogs/filter_expr_like_warning_test.go index 5562b3ad29..9898495ea9 100644 --- a/pkg/telemetrylogs/filter_expr_like_warning_test.go +++ b/pkg/telemetrylogs/filter_expr_like_warning_test.go @@ -11,7 +11,7 @@ import ( // TestLikeAndILikeWithoutWildcards_Warns Tests that LIKE/ILIKE without wildcards add warnings and include docs URL func TestLikeAndILikeWithoutWildcards_Warns(t *testing.T) { fm := NewFieldMapper() - cb := NewConditionBuilder(fm, nil) + cb := NewConditionBuilder(fm) keys := buildCompleteFieldKeyMap() @@ -47,7 +47,7 @@ func TestLikeAndILikeWithoutWildcards_Warns(t *testing.T) { // TestLikeAndILikeWithWildcards_NoWarn Tests that LIKE/ILIKE with wildcards do not add warnings func TestLikeAndILikeWithWildcards_NoWarn(t *testing.T) { fm := NewFieldMapper() - cb := NewConditionBuilder(fm, nil) + cb := NewConditionBuilder(fm) keys := buildCompleteFieldKeyMap() diff --git a/pkg/telemetrylogs/filter_expr_logs_body_json_test.go b/pkg/telemetrylogs/filter_expr_logs_body_json_test.go index ca881ac7fd..037deb44ae 100644 --- a/pkg/telemetrylogs/filter_expr_logs_body_json_test.go +++ b/pkg/telemetrylogs/filter_expr_logs_body_json_test.go @@ -7,7 +7,6 @@ import ( "github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest" "github.com/SigNoz/signoz/pkg/querybuilder" "github.com/SigNoz/signoz/pkg/types/telemetrytypes" - "github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest" "github.com/huandu/go-sqlbuilder" "github.com/stretchr/testify/require" ) @@ -15,7 +14,7 @@ import ( // TestFilterExprLogsBodyJSON tests a comprehensive set of query patterns for body JSON search func TestFilterExprLogsBodyJSON(t *testing.T) { fm := NewFieldMapper() - cb := NewConditionBuilder(fm, telemetrytypestest.NewMockMetadataStore()) + cb := NewConditionBuilder(fm) // Define a comprehensive set of field keys to support all test cases keys := buildCompleteFieldKeyMap() diff --git a/pkg/telemetrylogs/filter_expr_logs_test.go b/pkg/telemetrylogs/filter_expr_logs_test.go index 1172eb4fbd..4d8eb3ed91 100644 --- a/pkg/telemetrylogs/filter_expr_logs_test.go +++ b/pkg/telemetrylogs/filter_expr_logs_test.go @@ -16,7 +16,7 @@ import ( // TestFilterExprLogs tests a comprehensive set of query patterns for logs search func TestFilterExprLogs(t *testing.T) { fm := NewFieldMapper() - cb := NewConditionBuilder(fm, nil) + cb := NewConditionBuilder(fm) // Define a comprehensive set of field keys to support all test cases keys := buildCompleteFieldKeyMap() @@ -2423,7 +2423,7 @@ func TestFilterExprLogs(t *testing.T) { // TestFilterExprLogs tests a comprehensive set of query patterns for logs search func TestFilterExprLogsConflictNegation(t *testing.T) { fm := NewFieldMapper() - cb := NewConditionBuilder(fm, nil) + cb := NewConditionBuilder(fm) // Define a comprehensive set of field keys to support all test cases keys := buildCompleteFieldKeyMap() diff --git a/pkg/telemetrylogs/json_access_pb.go b/pkg/telemetrylogs/json_access_pb.go deleted file mode 100644 index aece538aab..0000000000 --- a/pkg/telemetrylogs/json_access_pb.go +++ /dev/null @@ -1,179 +0,0 @@ -package telemetrylogs - -import ( - "context" - "slices" - "strings" - - "github.com/SigNoz/signoz/pkg/errors" - qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" - "github.com/SigNoz/signoz/pkg/types/telemetrytypes" -) - -var ( - CodePlanIndexOutOfBounds = errors.MustNewCode("plan_index_out_of_bounds") -) - -type JSONAccessPlanBuilder struct { - key *telemetrytypes.TelemetryFieldKey - value any - op qbtypes.FilterOperator - parts []string - isPromoted bool - typeCache map[string][]telemetrytypes.JSONDataType -} - -// buildPlan recursively builds the path plan tree -func (pb *JSONAccessPlanBuilder) buildPlan(index int, parent *telemetrytypes.JSONAccessNode, isDynArrChild bool) (*telemetrytypes.JSONAccessNode, error) { - if index >= len(pb.parts) { - return nil, errors.NewInvalidInputf(CodePlanIndexOutOfBounds, "index is out of bounds") - } - - part := pb.parts[index] - pathSoFar := strings.Join(pb.parts[:index+1], telemetrytypes.ArraySep) - isTerminal := index == len(pb.parts)-1 - - // Calculate progression parameters based on parent's values - var maxTypes, maxPaths int - if isDynArrChild { - // Child of Dynamic array - reset progression to base values (16, 256) - // This happens when we switch from Array(Dynamic) to Array(JSON) - maxTypes = 16 - maxPaths = 256 - } else if parent != nil { - // Child of JSON array - use parent's progression divided by 2 and 4 - maxTypes = parent.MaxDynamicTypes / 2 - maxPaths = parent.MaxDynamicPaths / 4 - if maxTypes < 0 { - maxTypes = 0 - } - if maxPaths < 0 { - maxPaths = 0 - } - } - - // Use cached types from the batched metadata query - types := pb.typeCache[pathSoFar] - - // Create node for this path segment - node := &telemetrytypes.JSONAccessNode{ - Name: part, - IsTerminal: isTerminal, - AvailableTypes: types, - Branches: make(map[telemetrytypes.JSONAccessBranchType]*telemetrytypes.JSONAccessNode), - Parent: parent, - MaxDynamicTypes: maxTypes, - MaxDynamicPaths: maxPaths, - } - - hasJSON := slices.Contains(node.AvailableTypes, telemetrytypes.ArrayJSON) - hasDynamic := slices.Contains(node.AvailableTypes, telemetrytypes.ArrayDynamic) - - // Configure terminal if this is the last part - if isTerminal { - valueType, _ := inferDataType(pb.value, pb.op, pb.key) - node.TerminalConfig = &telemetrytypes.TerminalConfig{ - Key: pb.key, - ElemType: *pb.key.JSONDataType, - ValueType: telemetrytypes.MappingFieldDataTypeToJSONDataType[valueType], - } - } else { - var err error - if hasJSON { - node.Branches[telemetrytypes.BranchJSON], err = pb.buildPlan(index+1, node, false) - if err != nil { - return nil, err - } - } - if hasDynamic { - node.Branches[telemetrytypes.BranchDynamic], err = pb.buildPlan(index+1, node, true) - if err != nil { - return nil, err - } - } - } - - return node, nil -} - -// PlanJSON builds a tree structure representing the complete JSON path traversal -// that precomputes all possible branches and their types -func PlanJSON(ctx context.Context, key *telemetrytypes.TelemetryFieldKey, op qbtypes.FilterOperator, - value any, - metadataStore telemetrytypes.MetadataStore, -) (telemetrytypes.JSONAccessPlan, error) { - // if path is empty, return nil - if key.Name == "" { - return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "path is empty") - } - - path := strings.ReplaceAll(key.Name, telemetrytypes.ArrayAnyIndex, telemetrytypes.ArraySep) - parts := strings.Split(path, telemetrytypes.ArraySep) - - // Pre-fetch JSON types for all path prefixes in a single metadata call to avoid - // multiple small DB queries during plan construction. - // Extract all path prefixes that will be needed during recursive buildPlan calls - selectors := make([]*telemetrytypes.FieldKeySelector, 0, len(parts)) - for i := range parts { - pathSoFar := strings.Join(parts[:i+1], telemetrytypes.ArraySep) - selectors = append(selectors, &telemetrytypes.FieldKeySelector{ - Name: pathSoFar, - SelectorMatchType: telemetrytypes.FieldSelectorMatchTypeExact, - Signal: telemetrytypes.SignalLogs, - Limit: 1, - }) - } - - keys, _, err := metadataStore.GetKeysMulti(ctx, selectors) - if err != nil { - return nil, err - } - - // Build type cache from the batched results - typeCache := make(map[string][]telemetrytypes.JSONDataType, len(keys)) - for name, ks := range keys { - types := make([]telemetrytypes.JSONDataType, 0, len(ks)) - for _, k := range ks { - if k.JSONDataType != nil { - types = append(types, *k.JSONDataType) - } - } - typeCache[name] = types - } - - pb := &JSONAccessPlanBuilder{ - key: key, - op: op, - value: value, - parts: parts, - isPromoted: key.Materialized, - typeCache: typeCache, - } - plans := telemetrytypes.JSONAccessPlan{} - - node, err := pb.buildPlan(0, - telemetrytypes.NewRootJSONAccessNode(LogsV2BodyJSONColumn, - 32, 0), - false, - ) - if err != nil { - return nil, err - } - plans = append(plans, node) - - // TODO: PlanJSON requires the Start and End of the Query to select correct column between promoted and body_json using - // creation time in distributed_promoted_paths - if pb.isPromoted { - node, err := pb.buildPlan(0, - telemetrytypes.NewRootJSONAccessNode(LogsV2BodyPromotedColumn, - 32, 1024), - true, - ) - if err != nil { - return nil, err - } - plans = append(plans, node) - } - - return plans, nil -} diff --git a/pkg/telemetrylogs/json_condition_builder.go b/pkg/telemetrylogs/json_condition_builder.go index 4bcff50da6..cc11867755 100644 --- a/pkg/telemetrylogs/json_condition_builder.go +++ b/pkg/telemetrylogs/json_condition_builder.go @@ -1,10 +1,8 @@ package telemetrylogs import ( - "context" "fmt" "slices" - "strings" "github.com/SigNoz/signoz/pkg/errors" "github.com/SigNoz/signoz/pkg/querybuilder" @@ -15,7 +13,7 @@ import ( var ( CodeCurrentNodeNil = errors.MustNewCode("current_node_nil") - CodeNextNodeNil = errors.MustNewCode("next_node_nil") + CodeChildNodeNil = errors.MustNewCode("child_node_nil") CodeNestedExpressionsEmpty = errors.MustNewCode("nested_expressions_empty") CodeGroupByPlanEmpty = errors.MustNewCode("group_by_plan_empty") CodeArrayMapExpressionsEmpty = errors.MustNewCode("array_map_expressions_empty") @@ -23,18 +21,21 @@ var ( CodeArrayNavigationFailed = errors.MustNewCode("array_navigation_failed") ) -// BuildCondition builds the full WHERE condition for body_json JSON paths -func (c *conditionBuilder) buildJSONCondition(ctx context.Context, key *telemetrytypes.TelemetryFieldKey, - operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) { +type jsonConditionBuilder struct { + key *telemetrytypes.TelemetryFieldKey + valueType telemetrytypes.JSONDataType +} - plan, err := PlanJSON(ctx, key, operator, value, c.metadataStore) - if err != nil { - return "", err - } +func NewJSONConditionBuilder(key *telemetrytypes.TelemetryFieldKey, valueType telemetrytypes.FieldDataType) *jsonConditionBuilder { + return &jsonConditionBuilder{key: key, valueType: telemetrytypes.MappingFieldDataTypeToJSONDataType[valueType]} +} + +// BuildCondition builds the full WHERE condition for body_json JSON paths +func (c *jsonConditionBuilder) buildJSONCondition(operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) { conditions := []string{} - for _, plan := range plan { - condition, err := c.emitPlannedCondition(plan, operator, value, sb) + for _, node := range c.key.JSONPlan { + condition, err := c.emitPlannedCondition(node, operator, value, sb) if err != nil { return "", err } @@ -44,9 +45,9 @@ func (c *conditionBuilder) buildJSONCondition(ctx context.Context, key *telemetr } // emitPlannedCondition handles paths with array traversal -func (c *conditionBuilder) emitPlannedCondition(plan *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) { +func (c *jsonConditionBuilder) emitPlannedCondition(node *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) { // Build traversal + terminal recursively per-hop - compiled, err := c.recurseArrayHops(plan, operator, value, sb) + compiled, err := c.recurseArrayHops(node, operator, value, sb) if err != nil { return "", err } @@ -54,12 +55,12 @@ func (c *conditionBuilder) emitPlannedCondition(plan *telemetrytypes.JSONAccessN } // buildTerminalCondition creates the innermost condition -func (c *conditionBuilder) buildTerminalCondition(node *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) { +func (c *jsonConditionBuilder) buildTerminalCondition(node *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) { if node.TerminalConfig.ElemType.IsArray { conditions := []string{} // if the value type is not an array // TODO(piyush): Confirm the Query built for Array case and add testcases for it later - if !node.TerminalConfig.ValueType.IsArray { + if !c.valueType.IsArray { // if operator is a String search Operator, then we need to build one more String comparison condition along with the Strict match condition if operator.IsStringSearchOperator() { formattedValue := querybuilder.FormatValueForContains(value) @@ -93,7 +94,7 @@ func (c *conditionBuilder) buildTerminalCondition(node *telemetrytypes.JSONAcces // buildPrimitiveTerminalCondition builds the condition if the terminal node is a primitive type // it handles the data type collisions and utilizes indexes for the condition if available -func (c *conditionBuilder) buildPrimitiveTerminalCondition(node *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) { +func (c *jsonConditionBuilder) buildPrimitiveTerminalCondition(node *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) { fieldPath := node.FieldPath() conditions := []string{} var formattedValue any = value @@ -164,19 +165,19 @@ func (c *conditionBuilder) buildPrimitiveTerminalCondition(node *telemetrytypes. } // buildArrayMembershipCondition handles array membership checks -func (c *conditionBuilder) buildArrayMembershipCondition(node *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) { +func (c *jsonConditionBuilder) buildArrayMembershipCondition(node *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) { arrayPath := node.FieldPath() localKeyCopy := *node.TerminalConfig.Key // create typed array out of a dynamic array filteredDynamicExpr := func() string { // Change the field data type from []dynamic to the value type // since we've filtered the value type out of the dynamic array, we need to change the field data corresponding to the value type - localKeyCopy.FieldDataType = telemetrytypes.MappingJSONDataTypeToFieldDataType[telemetrytypes.ScalerTypeToArrayType[node.TerminalConfig.ValueType]] + localKeyCopy.FieldDataType = telemetrytypes.MappingJSONDataTypeToFieldDataType[telemetrytypes.ScalerTypeToArrayType[c.valueType]] baseArrayDynamicExpr := fmt.Sprintf("dynamicElement(%s, 'Array(Dynamic)')", arrayPath) return fmt.Sprintf("arrayMap(x->dynamicElement(x, '%s'), arrayFilter(x->(dynamicType(x) = '%s'), %s))", - node.TerminalConfig.ValueType.StringValue(), - node.TerminalConfig.ValueType.StringValue(), + c.valueType.StringValue(), + c.valueType.StringValue(), baseArrayDynamicExpr) } typedArrayExpr := func() string { @@ -200,7 +201,7 @@ func (c *conditionBuilder) buildArrayMembershipCondition(node *telemetrytypes.JS } // recurseArrayHops recursively builds array traversal conditions -func (c *conditionBuilder) recurseArrayHops(current *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) { +func (c *jsonConditionBuilder) recurseArrayHops(current *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) { if current == nil { return "", errors.NewInternalf(CodeArrayNavigationFailed, "navigation failed, current node is nil") } @@ -247,7 +248,7 @@ func (c *conditionBuilder) recurseArrayHops(current *telemetrytypes.JSONAccessNo return sb.Or(branches...), nil } -func (c *conditionBuilder) applyOperator(sb *sqlbuilder.SelectBuilder, fieldExpr string, operator qbtypes.FilterOperator, value any) (string, error) { +func (c *jsonConditionBuilder) applyOperator(sb *sqlbuilder.SelectBuilder, fieldExpr string, operator qbtypes.FilterOperator, value any) (string, error) { switch operator { case qbtypes.FilterOperatorEqual: return sb.E(fieldExpr, value), nil @@ -304,176 +305,6 @@ func (c *conditionBuilder) applyOperator(sb *sqlbuilder.SelectBuilder, fieldExpr } } -// GroupByArrayJoinInfo contains information about array joins needed for GroupBy -type GroupByArrayJoinInfo struct { - ArrayJoinClauses []string // ARRAY JOIN clauses to add to FROM clause - TerminalExpr string // Terminal field expression for SELECT/GROUP BY -} - -// BuildGroupBy builds GroupBy information for body JSON fields using arrayConcat pattern -// -// BuildGroupBy was designed to be used for group by queries on body JSON fields existings inside arrays but -// currently it is not used anywhere, considering this case suits more to Data Engineering instead of Observability space. -// This code should be removed in future. -func (c *conditionBuilder) BuildGroupBy(ctx context.Context, key *telemetrytypes.TelemetryFieldKey) (*GroupByArrayJoinInfo, error) { - path := strings.TrimPrefix(key.Name, telemetrytypes.BodyJSONStringSearchPrefix) - - plan, err := PlanJSON(ctx, key, qbtypes.FilterOperatorExists, nil, c.metadataStore) - if err != nil { - return nil, err - } - - if len(plan) == 0 { - return nil, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, - "Could not find any valid paths for: %s", path) - } - - if plan[0].IsTerminal { - node := plan[0] - - expr := fmt.Sprintf("dynamicElement(%s, '%s')", node.FieldPath(), node.TerminalConfig.ElemType.StringValue()) - if key.Materialized { - if len(plan) < 2 { - return nil, errors.Newf(errors.TypeUnexpected, CodePromotedPlanMissing, - "plan length is less than 2 for promoted path: %s", path) - } - - // promoted column first then body_json column - // TODO(Piyush): Change this in future for better performance - expr = fmt.Sprintf("coalesce(%s, %s)", - fmt.Sprintf("dynamicElement(%s, '%s')", plan[1].FieldPath(), plan[1].TerminalConfig.ElemType.StringValue()), - expr, - ) - } - - return &GroupByArrayJoinInfo{ - ArrayJoinClauses: []string{}, - TerminalExpr: expr, - }, nil - } - - // Build arrayConcat pattern directly from the tree structure - arrayConcatExpr, err := c.buildArrayConcat(plan) - if err != nil { - return nil, err - } - - // Create single ARRAY JOIN clause with arrayFlatten - arrayJoinClause := fmt.Sprintf("ARRAY JOIN %s AS `%s`", arrayConcatExpr, key.Name) - - return &GroupByArrayJoinInfo{ - ArrayJoinClauses: []string{arrayJoinClause}, - TerminalExpr: fmt.Sprintf("`%s`", key.Name), - }, nil -} - -// buildArrayConcat builds the arrayConcat pattern directly from the tree structure -func (c *conditionBuilder) buildArrayConcat(plan telemetrytypes.JSONAccessPlan) (string, error) { - if len(plan) == 0 { - return "", errors.Newf(errors.TypeInternal, CodeGroupByPlanEmpty, "group by plan is empty while building arrayConcat") - } - - // Build arrayMap expressions for ALL available branches at the root level - var arrayMapExpressions []string - for _, node := range plan { - hasJSON := node.Branches[telemetrytypes.BranchJSON] != nil - hasDynamic := node.Branches[telemetrytypes.BranchDynamic] != nil - - if hasJSON { - jsonExpr, err := c.buildArrayMap(node, telemetrytypes.BranchJSON) - if err != nil { - return "", err - } - arrayMapExpressions = append(arrayMapExpressions, jsonExpr) - } - - if hasDynamic { - dynamicExpr, err := c.buildArrayMap(node, telemetrytypes.BranchDynamic) - if err != nil { - return "", err - } - arrayMapExpressions = append(arrayMapExpressions, dynamicExpr) - } - } - if len(arrayMapExpressions) == 0 { - return "", errors.Newf(errors.TypeInternal, CodeArrayMapExpressionsEmpty, "array map expressions are empty while building arrayConcat") - } - - // Build the arrayConcat expression - arrayConcatExpr := fmt.Sprintf("arrayConcat(%s)", strings.Join(arrayMapExpressions, ", ")) - - // Wrap with arrayFlatten - arrayFlattenExpr := fmt.Sprintf("arrayFlatten(%s)", arrayConcatExpr) - - return arrayFlattenExpr, nil -} - -// buildArrayMap builds the arrayMap expression for a specific branch, handling all sub-branches -func (c *conditionBuilder) buildArrayMap(currentNode *telemetrytypes.JSONAccessNode, branchType telemetrytypes.JSONAccessBranchType) (string, error) { - if currentNode == nil { - return "", errors.Newf(errors.TypeInternal, CodeCurrentNodeNil, "current node is nil while building arrayMap") - } - - nextNode := currentNode.Branches[branchType] - if nextNode == nil { - return "", errors.Newf(errors.TypeInternal, CodeNextNodeNil, "next node is nil while building arrayMap") - } - - // Build the array expression for this level - var arrayExpr string - if branchType == telemetrytypes.BranchJSON { - // Array(JSON) branch - arrayExpr = fmt.Sprintf("dynamicElement(%s, 'Array(JSON(max_dynamic_types=%d, max_dynamic_paths=%d))')", - currentNode.FieldPath(), currentNode.MaxDynamicTypes, currentNode.MaxDynamicPaths) - } else { - // Array(Dynamic) branch - filter for JSON objects - dynBaseExpr := fmt.Sprintf("dynamicElement(%s, 'Array(Dynamic)')", currentNode.FieldPath()) - arrayExpr = fmt.Sprintf("arrayMap(x->assumeNotNull(dynamicElement(x, 'JSON')), arrayFilter(x->(dynamicType(x) = 'JSON'), %s))", dynBaseExpr) - } - - // If this is the terminal level, return the simple arrayMap - if nextNode.IsTerminal { - dynamicElementExpr := fmt.Sprintf("dynamicElement(%s, '%s')", nextNode.FieldPath(), - nextNode.TerminalConfig.ElemType.StringValue(), - ) - return fmt.Sprintf("arrayMap(%s->%s, %s)", currentNode.Alias(), dynamicElementExpr, arrayExpr), nil - } - - // For non-terminal nodes, we need to handle ALL possible branches at the next level - var nestedExpressions []string - hasJSON := nextNode.Branches[telemetrytypes.BranchJSON] != nil - hasDynamic := nextNode.Branches[telemetrytypes.BranchDynamic] != nil - - if hasJSON { - jsonNested, err := c.buildArrayMap(nextNode, telemetrytypes.BranchJSON) - if err != nil { - return "", err - } - nestedExpressions = append(nestedExpressions, jsonNested) - } - - if hasDynamic { - dynamicNested, err := c.buildArrayMap(nextNode, telemetrytypes.BranchDynamic) - if err != nil { - return "", err - } - nestedExpressions = append(nestedExpressions, dynamicNested) - } - - // If we have multiple nested expressions, we need to concat them - var nestedExpr string - if len(nestedExpressions) == 1 { - nestedExpr = nestedExpressions[0] - } else if len(nestedExpressions) > 1 { - // This shouldn't happen in our current tree structure, but handle it just in case - nestedExpr = fmt.Sprintf("arrayConcat(%s)", strings.Join(nestedExpressions, ", ")) - } else { - return "", errors.Newf(errors.TypeInternal, CodeNestedExpressionsEmpty, "nested expressions are empty while building arrayMap") - } - - return fmt.Sprintf("arrayMap(%s->%s, %s)", currentNode.Alias(), nestedExpr, arrayExpr), nil -} - func assumeNotNull(column string, elemType telemetrytypes.JSONDataType) string { return fmt.Sprintf("assumeNotNull(dynamicElement(%s, '%s'))", column, elemType.StringValue()) } diff --git a/pkg/telemetrylogs/json_stmt_builder_test.go b/pkg/telemetrylogs/json_stmt_builder_test.go index df9b387bc7..44c967e887 100644 --- a/pkg/telemetrylogs/json_stmt_builder_test.go +++ b/pkg/telemetrylogs/json_stmt_builder_test.go @@ -137,7 +137,7 @@ func TestStmtBuilderTimeSeriesBodyGroupByPromoted(t *testing.T) { }, }, expected: qbtypes.Statement{ - Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf((dynamicElement(body_json.`user.age`, 'Int64') IS NOT NULL OR dynamicElement(body_json_promoted.`user.age`, 'Int64') IS NOT NULL), toString(coalesce(dynamicElement(body_json.`user.age`, 'Int64'), dynamicElement(body_json_promoted.`user.age`, 'Int64'))), (dynamicElement(body_json.`user.age`, 'String') IS NOT NULL OR dynamicElement(body_json_promoted.`user.age`, 'String') IS NOT NULL), coalesce(dynamicElement(body_json.`user.age`, 'String'), dynamicElement(body_json_promoted.`user.age`, 'String')), NULL)) AS `user.age`, toString(multiIf((dynamicElement(body_json.`user.name`, 'String') IS NOT NULL OR dynamicElement(body_json_promoted.`user.name`, 'String') IS NOT NULL), coalesce(dynamicElement(body_json.`user.name`, 'String'), dynamicElement(body_json_promoted.`user.name`, 'String')), NULL)) AS `user.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? GROUP BY `user.age`, `user.name` ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf((dynamicElement(body_json.`user.age`, 'Int64') IS NOT NULL OR dynamicElement(body_json_promoted.`user.age`, 'Int64') IS NOT NULL), toString(coalesce(dynamicElement(body_json.`user.age`, 'Int64'), dynamicElement(body_json_promoted.`user.age`, 'Int64'))), (dynamicElement(body_json.`user.age`, 'String') IS NOT NULL OR dynamicElement(body_json_promoted.`user.age`, 'String') IS NOT NULL), coalesce(dynamicElement(body_json.`user.age`, 'String'), dynamicElement(body_json_promoted.`user.age`, 'String')), NULL)) AS `user.age`, toString(multiIf((dynamicElement(body_json.`user.name`, 'String') IS NOT NULL OR dynamicElement(body_json_promoted.`user.name`, 'String') IS NOT NULL), coalesce(dynamicElement(body_json.`user.name`, 'String'), dynamicElement(body_json_promoted.`user.name`, 'String')), NULL)) AS `user.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? AND (`user.age`, `user.name`) GLOBAL IN (SELECT `user.age`, `user.name` FROM __limit_cte) GROUP BY ts, `user.age`, `user.name`", + Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf((dynamicElement(body_json.`user.age`, 'Int64') IS NOT NULL OR dynamicElement(body_json_promoted.`user.age`, 'Int64') IS NOT NULL), toString(coalesce(dynamicElement(body_json_promoted.`user.age`, 'Int64'), dynamicElement(body_json.`user.age`, 'Int64'))), (dynamicElement(body_json.`user.age`, 'String') IS NOT NULL OR dynamicElement(body_json_promoted.`user.age`, 'String') IS NOT NULL), coalesce(dynamicElement(body_json_promoted.`user.age`, 'String'), dynamicElement(body_json.`user.age`, 'String')), NULL)) AS `user.age`, toString(multiIf((dynamicElement(body_json.`user.name`, 'String') IS NOT NULL OR dynamicElement(body_json_promoted.`user.name`, 'String') IS NOT NULL), coalesce(dynamicElement(body_json_promoted.`user.name`, 'String'), dynamicElement(body_json.`user.name`, 'String')), NULL)) AS `user.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? GROUP BY `user.age`, `user.name` ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf((dynamicElement(body_json.`user.age`, 'Int64') IS NOT NULL OR dynamicElement(body_json_promoted.`user.age`, 'Int64') IS NOT NULL), toString(coalesce(dynamicElement(body_json_promoted.`user.age`, 'Int64'), dynamicElement(body_json.`user.age`, 'Int64'))), (dynamicElement(body_json.`user.age`, 'String') IS NOT NULL OR dynamicElement(body_json_promoted.`user.age`, 'String') IS NOT NULL), coalesce(dynamicElement(body_json_promoted.`user.age`, 'String'), dynamicElement(body_json.`user.age`, 'String')), NULL)) AS `user.age`, toString(multiIf((dynamicElement(body_json.`user.name`, 'String') IS NOT NULL OR dynamicElement(body_json_promoted.`user.name`, 'String') IS NOT NULL), coalesce(dynamicElement(body_json_promoted.`user.name`, 'String'), dynamicElement(body_json.`user.name`, 'String')), NULL)) AS `user.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? AND (`user.age`, `user.name`) GLOBAL IN (SELECT `user.age`, `user.name` FROM __limit_cte) GROUP BY ts, `user.age`, `user.name`", Args: []any{uint64(1747945619), uint64(1747983448), "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448)}, }, }, @@ -159,6 +159,84 @@ func TestStmtBuilderTimeSeriesBodyGroupByPromoted(t *testing.T) { } } +func TestStatementBuilderListQueryBodyHas(t *testing.T) { + enableBodyJSONQuery(t) + defer func() { + disableBodyJSONQuery(t) + }() + + statementBuilder := buildJSONTestStatementBuilder(t) + cases := []struct { + name string + requestType qbtypes.RequestType + query qbtypes.QueryBuilderQuery[qbtypes.LogAggregation] + expected qbtypes.Statement + expectedErr error + }{ + { + name: "Simple has filter", + requestType: qbtypes.RequestTypeRaw, + query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{ + Signal: telemetrytypes.SignalLogs, + Filter: &qbtypes.Filter{Expression: "has(body.education[].parameters, 1.65)"}, + Limit: 10, + }, + expected: qbtypes.Statement{ + Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body, body_json, body_json_promoted, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (has(arrayFlatten(arrayConcat(arrayMap(`body_json.education`->dynamicElement(`body_json.education`.`parameters`, 'Array(Nullable(Float64))'), dynamicElement(body_json.`education`, 'Array(JSON(max_dynamic_types=16, max_dynamic_paths=0))')))), ?) OR has(arrayFlatten(arrayConcat(arrayMap(`body_json.education`->dynamicElement(`body_json.education`.`parameters`, 'Array(Dynamic)'), dynamicElement(body_json.`education`, 'Array(JSON(max_dynamic_types=16, max_dynamic_paths=0))')))), ?)) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?", + Args: []any{uint64(1747945619), uint64(1747983448), 1.65, 1.65, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10}, + Warnings: []string{ + "Key `education[].parameters` is ambiguous, found 2 different combinations of field context / data type: [name=education[].parameters,context=body,datatype=[]float64,jsondatatype=Array(Nullable(Float64)) name=education[].parameters,context=body,datatype=[]dynamic,jsondatatype=Array(Dynamic)].", + }, + }, + expectedErr: nil, + }, + { + name: "Flat path hasAll filter", + requestType: qbtypes.RequestTypeRaw, + query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{ + Signal: telemetrytypes.SignalLogs, + Filter: &qbtypes.Filter{Expression: "hasAll(body.user.permissions, ['read', 'write'])"}, + Limit: 10, + }, + expected: qbtypes.Statement{ + Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body, body_json, body_json_promoted, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND hasAll(dynamicElement(body_json.`user.permissions`, 'Array(Nullable(String))'), ?) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?", + Args: []any{uint64(1747945619), uint64(1747983448), []any{[]any{"read", "write"}}, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10}, + }, + expectedErr: nil, + }, + { + name: "Nested path hasAny filter", + requestType: qbtypes.RequestTypeRaw, + query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{ + Signal: telemetrytypes.SignalLogs, + Filter: &qbtypes.Filter{Expression: "hasAny(education[].awards[].participated[].members, ['Piyush', 'Tushar'])"}, + Limit: 10, + }, + expected: qbtypes.Statement{ + Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body, body_json, body_json_promoted, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND hasAny(arrayFlatten(arrayConcat(arrayMap(`body_json.education`->arrayConcat(arrayMap(`body_json.education[].awards`->arrayConcat(arrayMap(`body_json.education[].awards[].participated`->dynamicElement(`body_json.education[].awards[].participated`.`members`, 'Array(Nullable(String))'), dynamicElement(`body_json.education[].awards`.`participated`, 'Array(JSON(max_dynamic_types=4, max_dynamic_paths=0))')), arrayMap(`body_json.education[].awards[].participated`->dynamicElement(`body_json.education[].awards[].participated`.`members`, 'Array(Nullable(String))'), arrayMap(x->assumeNotNull(dynamicElement(x, 'JSON')), arrayFilter(x->(dynamicType(x) = 'JSON'), dynamicElement(`body_json.education[].awards`.`participated`, 'Array(Dynamic)'))))), dynamicElement(`body_json.education`.`awards`, 'Array(JSON(max_dynamic_types=8, max_dynamic_paths=0))')), arrayMap(`body_json.education[].awards`->arrayConcat(arrayMap(`body_json.education[].awards[].participated`->dynamicElement(`body_json.education[].awards[].participated`.`members`, 'Array(Nullable(String))'), dynamicElement(`body_json.education[].awards`.`participated`, 'Array(JSON(max_dynamic_types=16, max_dynamic_paths=256))')), arrayMap(`body_json.education[].awards[].participated`->dynamicElement(`body_json.education[].awards[].participated`.`members`, 'Array(Nullable(String))'), arrayMap(x->assumeNotNull(dynamicElement(x, 'JSON')), arrayFilter(x->(dynamicType(x) = 'JSON'), dynamicElement(`body_json.education[].awards`.`participated`, 'Array(Dynamic)'))))), arrayMap(x->assumeNotNull(dynamicElement(x, 'JSON')), arrayFilter(x->(dynamicType(x) = 'JSON'), dynamicElement(`body_json.education`.`awards`, 'Array(Dynamic)'))))), dynamicElement(body_json.`education`, 'Array(JSON(max_dynamic_types=16, max_dynamic_paths=0))')))), ?) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?", + Args: []any{uint64(1747945619), uint64(1747983448), []any{[]any{"Piyush", "Tushar"}}, "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10}, + }, + expectedErr: nil, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + + q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query, nil) + if c.expectedErr != nil { + require.Error(t, err) + require.Contains(t, err.Error(), c.expectedErr.Error()) + } else { + require.NoError(t, err) + require.Equal(t, c.expected.Query, q.Query) + require.Equal(t, c.expected.Args, q.Args) + require.Equal(t, c.expected.Warnings, q.Warnings) + } + }) + } +} + func TestStatementBuilderListQueryBody(t *testing.T) { enableBodyJSONQuery(t) defer func() { @@ -674,10 +752,10 @@ func TestStatementBuilderListQueryBodyMessage(t *testing.T) { } } -func buildTestTelemetryMetadataStore(promotedPaths ...string) *telemetrytypestest.MockMetadataStore { +func buildTestTelemetryMetadataStore(t *testing.T, promotedPaths ...string) *telemetrytypestest.MockMetadataStore { mockMetadataStore := telemetrytypestest.NewMockMetadataStore() - types, _ := testTypeSet() + types, _ := telemetrytypes.TestJSONTypeSet() for path, jsonTypes := range types { promoted := false @@ -698,6 +776,11 @@ func buildTestTelemetryMetadataStore(promotedPaths ...string) *telemetrytypestes JSONDataType: &jsonType, Materialized: promoted, } + err := key.SetJSONAccessPlan(telemetrytypes.JSONColumnMetadata{ + BaseColumn: LogsV2BodyJSONColumn, + PromotedColumn: LogsV2BodyPromotedColumn, + }, types) + require.NoError(t, err) mockMetadataStore.SetKey(key) } } @@ -705,10 +788,10 @@ func buildTestTelemetryMetadataStore(promotedPaths ...string) *telemetrytypestes return mockMetadataStore } -func buildJSONTestStatementBuilder(_ *testing.T, promotedPaths ...string) *logQueryStatementBuilder { +func buildJSONTestStatementBuilder(t *testing.T, promotedPaths ...string) *logQueryStatementBuilder { + mockMetadataStore := buildTestTelemetryMetadataStore(t, promotedPaths...) fm := NewFieldMapper() - mockMetadataStore := buildTestTelemetryMetadataStore(promotedPaths...) - cb := NewConditionBuilder(fm, mockMetadataStore) + cb := NewConditionBuilder(fm) aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil) resourceFilterStmtBuilder := resourcefilter.NewLogResourceFilterStatementBuilder( diff --git a/pkg/telemetrylogs/json_string.go b/pkg/telemetrylogs/json_string.go index c81243548c..f98b79bd75 100644 --- a/pkg/telemetrylogs/json_string.go +++ b/pkg/telemetrylogs/json_string.go @@ -40,14 +40,14 @@ func parseStrValue(valueStr string, operator qbtypes.FilterOperator) (telemetryt return valueType, parsedValue } -func inferDataType(value any, operator qbtypes.FilterOperator, key *telemetrytypes.TelemetryFieldKey) (telemetrytypes.FieldDataType, any) { +func InferDataType(value any, operator qbtypes.FilterOperator, key *telemetrytypes.TelemetryFieldKey) (telemetrytypes.FieldDataType, any) { // check if the value is a int, float, string, bool valueType := telemetrytypes.FieldDataTypeUnspecified switch v := value.(type) { case []any: // take the first element and infer the type if len(v) > 0 { - valueType, _ = inferDataType(v[0], operator, key) + valueType, _ = InferDataType(v[0], operator, key) } return valueType, v case uint8, uint16, uint32, uint64, int, int8, int16, int32, int64: @@ -84,7 +84,7 @@ func getBodyJSONPath(key *telemetrytypes.TelemetryFieldKey) string { } func GetBodyJSONKey(_ context.Context, key *telemetrytypes.TelemetryFieldKey, operator qbtypes.FilterOperator, value any) (string, any) { - dataType, value := inferDataType(value, operator, key) + dataType, value := InferDataType(value, operator, key) // for array types, we need to extract the value from the JSON_QUERY if dataType == telemetrytypes.FieldDataTypeArrayInt64 || diff --git a/pkg/telemetrylogs/stmt_builder_test.go b/pkg/telemetrylogs/stmt_builder_test.go index 69415e18da..2f3f15023a 100644 --- a/pkg/telemetrylogs/stmt_builder_test.go +++ b/pkg/telemetrylogs/stmt_builder_test.go @@ -195,10 +195,10 @@ func TestStatementBuilderTimeSeries(t *testing.T) { }, } - fm := NewFieldMapper() mockMetadataStore := telemetrytypestest.NewMockMetadataStore() mockMetadataStore.KeysMap = buildCompleteFieldKeyMap() - cb := NewConditionBuilder(fm, mockMetadataStore) + fm := NewFieldMapper() + cb := NewConditionBuilder(fm) aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil) @@ -315,10 +315,10 @@ func TestStatementBuilderListQuery(t *testing.T) { }, } - fm := NewFieldMapper() mockMetadataStore := telemetrytypestest.NewMockMetadataStore() mockMetadataStore.KeysMap = buildCompleteFieldKeyMap() - cb := NewConditionBuilder(fm, mockMetadataStore) + fm := NewFieldMapper() + cb := NewConditionBuilder(fm) aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil) @@ -423,10 +423,10 @@ func TestStatementBuilderListQueryResourceTests(t *testing.T) { }, } - fm := NewFieldMapper() mockMetadataStore := telemetrytypestest.NewMockMetadataStore() mockMetadataStore.KeysMap = buildCompleteFieldKeyMap() - cb := NewConditionBuilder(fm, mockMetadataStore) + fm := NewFieldMapper() + cb := NewConditionBuilder(fm) aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil) @@ -499,10 +499,10 @@ func TestStatementBuilderTimeSeriesBodyGroupBy(t *testing.T) { }, } - fm := NewFieldMapper() mockMetadataStore := telemetrytypestest.NewMockMetadataStore() mockMetadataStore.KeysMap = buildCompleteFieldKeyMap() - cb := NewConditionBuilder(fm, mockMetadataStore) + fm := NewFieldMapper() + cb := NewConditionBuilder(fm) aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil) @@ -594,10 +594,10 @@ func TestStatementBuilderListQueryServiceCollision(t *testing.T) { }, } - fm := NewFieldMapper() mockMetadataStore := telemetrytypestest.NewMockMetadataStore() mockMetadataStore.KeysMap = buildCompleteFieldKeyMapCollision() - cb := NewConditionBuilder(fm, mockMetadataStore) + fm := NewFieldMapper() + cb := NewConditionBuilder(fm) aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil) @@ -819,7 +819,7 @@ func TestAdjustKey(t *testing.T) { fm := NewFieldMapper() mockMetadataStore := telemetrytypestest.NewMockMetadataStore() mockMetadataStore.KeysMap = buildCompleteFieldKeyMapCollision() - cb := NewConditionBuilder(fm, mockMetadataStore) + cb := NewConditionBuilder(fm) aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil) diff --git a/pkg/telemetrymetadata/body_json_metadata.go b/pkg/telemetrymetadata/body_json_metadata.go index c9bc2a4b10..32de6d7493 100644 --- a/pkg/telemetrymetadata/body_json_metadata.go +++ b/pkg/telemetrymetadata/body_json_metadata.go @@ -38,7 +38,7 @@ var ( CodeFailedToAppendPath = errors.MustNewCode("failed_to_append_path_promoted_paths") ) -// GetBodyJSONPaths extracts body JSON paths from the path_types table +// fetchBodyJSONPaths extracts body JSON paths from the path_types table // This function can be used by both JSONQueryBuilder and metadata extraction // uniquePathLimit: 0 for no limit, >0 for maximum number of unique paths to return // - For startup load: set to 10000 to get top 10k unique paths @@ -46,14 +46,12 @@ var ( // - For metadata API: set to desired pagination limit // // searchOperator: LIKE for pattern matching, EQUAL for exact match -// Returns: (paths, error) -func (t *telemetryMetaStore) getBodyJSONPaths(ctx context.Context, - fieldKeySelectors []*telemetrytypes.FieldKeySelector) ([]*telemetrytypes.TelemetryFieldKey, bool, error) { - +func (t *telemetryMetaStore) fetchBodyJSONPaths(ctx context.Context, + fieldKeySelectors []*telemetrytypes.FieldKeySelector) ([]*telemetrytypes.TelemetryFieldKey, []string, bool, error) { query, args, limit := buildGetBodyJSONPathsQuery(fieldKeySelectors) rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...) if err != nil { - return nil, false, errors.WrapInternalf(err, CodeFailExtractBodyJSONKeys, "failed to extract body JSON keys") + return nil, nil, false, errors.WrapInternalf(err, CodeFailExtractBodyJSONKeys, "failed to extract body JSON keys") } defer rows.Close() @@ -67,7 +65,7 @@ func (t *telemetryMetaStore) getBodyJSONPaths(ctx context.Context, err = rows.Scan(&path, &typesArray, &lastSeen) if err != nil { - return nil, false, errors.WrapInternalf(err, CodeFailExtractBodyJSONKeys, "failed to scan body JSON key row") + return nil, nil, false, errors.WrapInternalf(err, CodeFailExtractBodyJSONKeys, "failed to scan body JSON key row") } for _, typ := range typesArray { @@ -89,7 +87,18 @@ func (t *telemetryMetaStore) getBodyJSONPaths(ctx context.Context, rowCount++ } if rows.Err() != nil { - return nil, false, errors.WrapInternalf(rows.Err(), CodeFailIterateBodyJSONKeys, "error iterating body JSON keys") + return nil, nil, false, errors.WrapInternalf(rows.Err(), CodeFailIterateBodyJSONKeys, "error iterating body JSON keys") + } + + return fieldKeys, paths, rowCount <= limit, nil +} + +func (t *telemetryMetaStore) buildBodyJSONPaths(ctx context.Context, + fieldKeySelectors []*telemetrytypes.FieldKeySelector) ([]*telemetrytypes.TelemetryFieldKey, bool, error) { + + fieldKeys, paths, finished, err := t.fetchBodyJSONPaths(ctx, fieldKeySelectors) + if err != nil { + return nil, false, err } promoted, err := t.GetPromotedPaths(ctx, paths...) @@ -107,7 +116,34 @@ func (t *telemetryMetaStore) getBodyJSONPaths(ctx context.Context, fieldKey.Indexes = indexes[fieldKey.Name] } - return fieldKeys, rowCount <= limit, nil + return fieldKeys, finished, t.buildJSONPlans(ctx, fieldKeys) +} + +func (t *telemetryMetaStore) buildJSONPlans(ctx context.Context, keys []*telemetrytypes.TelemetryFieldKey) error { + parentSelectors := make([]*telemetrytypes.FieldKeySelector, 0, len(keys)) + for _, key := range keys { + parentSelectors = append(parentSelectors, key.ArrayParentSelectors()...) + } + + parentKeys, _, _, err := t.fetchBodyJSONPaths(ctx, parentSelectors) + if err != nil { + return err + } + + typeCache := make(map[string][]telemetrytypes.JSONDataType) + for _, key := range parentKeys { + typeCache[key.Name] = append(typeCache[key.Name], *key.JSONDataType) + } + + // build plans for keys now + for _, key := range keys { + err = key.SetJSONAccessPlan(t.jsonColumnMetadata[telemetrytypes.SignalLogs][telemetrytypes.FieldContextBody], typeCache) + if err != nil { + return err + } + } + + return nil } func buildGetBodyJSONPathsQuery(fieldKeySelectors []*telemetrytypes.FieldKeySelector) (string, []any, int) { diff --git a/pkg/telemetrymetadata/metadata.go b/pkg/telemetrymetadata/metadata.go index e4c627247c..f0f725012c 100644 --- a/pkg/telemetrymetadata/metadata.go +++ b/pkg/telemetrymetadata/metadata.go @@ -49,8 +49,9 @@ type telemetryMetaStore struct { relatedMetadataDBName string relatedMetadataTblName string - fm qbtypes.FieldMapper - conditionBuilder qbtypes.ConditionBuilder + fm qbtypes.FieldMapper + conditionBuilder qbtypes.ConditionBuilder + jsonColumnMetadata map[telemetrytypes.Signal]map[telemetrytypes.FieldContext]telemetrytypes.JSONColumnMetadata } func escapeForLike(s string) string { @@ -96,6 +97,14 @@ func NewTelemetryMetaStore( logResourceKeysTblName: logResourceKeysTblName, relatedMetadataDBName: relatedMetadataDBName, relatedMetadataTblName: relatedMetadataTblName, + jsonColumnMetadata: map[telemetrytypes.Signal]map[telemetrytypes.FieldContext]telemetrytypes.JSONColumnMetadata{ + telemetrytypes.SignalLogs: { + telemetrytypes.FieldContextBody: telemetrytypes.JSONColumnMetadata{ + BaseColumn: telemetrylogs.LogsV2BodyJSONColumn, + PromotedColumn: telemetrylogs.LogsV2BodyPromotedColumn, + }, + }, + }, } fm := NewFieldMapper() @@ -547,7 +556,7 @@ func (t *telemetryMetaStore) getLogsKeys(ctx context.Context, fieldKeySelectors } if querybuilder.BodyJSONQueryEnabled { - bodyJSONPaths, finished, err := t.getBodyJSONPaths(ctx, fieldKeySelectors) // LIKE for pattern matching + bodyJSONPaths, finished, err := t.buildBodyJSONPaths(ctx, fieldKeySelectors) // LIKE for pattern matching if err != nil { t.logger.ErrorContext(ctx, "failed to extract body JSON paths", "error", err) } diff --git a/pkg/types/telemetrytypes/field.go b/pkg/types/telemetrytypes/field.go index f45209533d..b0470748c4 100644 --- a/pkg/types/telemetrytypes/field.go +++ b/pkg/types/telemetrytypes/field.go @@ -23,7 +23,8 @@ const ( // e.g., "body.status" where "body." is the prefix BodyJSONStringSearchPrefix = "body." ArraySep = jsontypeexporter.ArraySeparator - ArrayAnyIndex = "[*]." + // TODO(Piyush): Remove once we've migrated to the new array syntax + ArrayAnyIndex = "[*]." ) type TelemetryFieldKey struct { @@ -35,10 +36,46 @@ type TelemetryFieldKey struct { FieldDataType FieldDataType `json:"fieldDataType,omitempty"` JSONDataType *JSONDataType `json:"-"` + JSONPlan JSONAccessPlan `json:"-"` Indexes []JSONDataTypeIndex `json:"-"` Materialized bool `json:"-"` // refers to promoted in case of body.... fields } +func (f *TelemetryFieldKey) KeyNameContainsArray() bool { + return strings.Contains(f.Name, ArraySep) || strings.Contains(f.Name, ArrayAnyIndex) +} + +// ArrayPathSegments returns just the individual segments of the path +// e.g., "education[].awards[].type" -> ["education", "awards", "type"] +func (f *TelemetryFieldKey) ArrayPathSegments() []string { + return strings.Split(strings.ReplaceAll(f.Name, ArrayAnyIndex, ArraySep), ArraySep) +} + +func (f *TelemetryFieldKey) ArrayParentPaths() []string { + parts := f.ArrayPathSegments() + paths := make([]string, 0, len(parts)) + for i := range parts { + paths = append(paths, strings.Join(parts[:i+1], ArraySep)) + } + return paths +} + +func (f *TelemetryFieldKey) ArrayParentSelectors() []*FieldKeySelector { + paths := f.ArrayParentPaths() + selectors := make([]*FieldKeySelector, 0, len(paths)) + for i := range paths { + selectors = append(selectors, &FieldKeySelector{ + Name: paths[i], + SelectorMatchType: FieldSelectorMatchTypeExact, + Signal: f.Signal, + FieldContext: f.FieldContext, + Limit: 1, + }) + } + + return selectors +} + func (f TelemetryFieldKey) String() string { var sb strings.Builder sb.WriteString(fmt.Sprintf("name=%s", f.Name)) diff --git a/pkg/types/telemetrytypes/field_datatype.go b/pkg/types/telemetrytypes/field_datatype.go index 27afa25cf4..c1b1cb0640 100644 --- a/pkg/types/telemetrytypes/field_datatype.go +++ b/pkg/types/telemetrytypes/field_datatype.go @@ -102,6 +102,10 @@ func (f FieldDataType) CHDataType() string { return "String" } +func (f FieldDataType) IsArray() bool { + return strings.HasPrefix(f.StringValue(), "[]") || strings.HasSuffix(f.StringValue(), "[]") +} + // UnmarshalJSON implements the json.Unmarshaler interface func (f *FieldDataType) UnmarshalJSON(data []byte) error { var str string diff --git a/pkg/types/telemetrytypes/json_access_plan.go b/pkg/types/telemetrytypes/json_access_plan.go index b4234ba8e7..03a5454b06 100644 --- a/pkg/types/telemetrytypes/json_access_plan.go +++ b/pkg/types/telemetrytypes/json_access_plan.go @@ -2,9 +2,12 @@ package telemetrytypes import ( "fmt" + "maps" + "slices" "strings" "github.com/SigNoz/signoz-otel-collector/exporter/jsontypeexporter" + "github.com/SigNoz/signoz/pkg/errors" "github.com/SigNoz/signoz/pkg/valuer" ) @@ -15,14 +18,20 @@ type JSONAccessBranchType struct { var ( BranchJSON = JSONAccessBranchType{valuer.NewString("json")} BranchDynamic = JSONAccessBranchType{valuer.NewString("dynamic")} + + CodePlanIndexOutOfBounds = errors.MustNewCode("plan_index_out_of_bounds") ) +type JSONColumnMetadata struct { + BaseColumn string + PromotedColumn string +} + type JSONAccessPlan = []*JSONAccessNode type TerminalConfig struct { - Key *TelemetryFieldKey - ElemType JSONDataType - ValueType JSONDataType + Key *TelemetryFieldKey + ElemType JSONDataType } // Node is now a tree structure representing the complete JSON path traversal @@ -80,3 +89,133 @@ func (n *JSONAccessNode) FieldPath() string { key := "`" + n.Name + "`" return n.Parent.Alias() + "." + key } + +func (n *JSONAccessNode) BranchesInOrder() []JSONAccessBranchType { + return slices.SortedFunc(maps.Keys(n.Branches), func(a, b JSONAccessBranchType) int { + return strings.Compare(b.StringValue(), a.StringValue()) + }) +} + +type planBuilder struct { + key *TelemetryFieldKey + paths []string // cumulative paths for type cache lookups + segments []string // individual path segments for node names + isPromoted bool + typeCache map[string][]JSONDataType +} + +// buildPlan recursively builds the path plan tree +func (pb *planBuilder) buildPlan(index int, parent *JSONAccessNode, isDynArrChild bool) (*JSONAccessNode, error) { + if index >= len(pb.paths) { + return nil, errors.NewInvalidInputf(CodePlanIndexOutOfBounds, "index is out of bounds") + } + + pathSoFar := pb.paths[index] // cumulative path for type cache lookup + segmentName := pb.segments[index] // segment name for node + isTerminal := index == len(pb.paths)-1 + + // Calculate progression parameters based on parent's values + var maxTypes, maxPaths int + if isDynArrChild { + // Child of Dynamic array - reset progression to base values (16, 256) + // This happens when we switch from Array(Dynamic) to Array(JSON) + maxTypes = 16 + maxPaths = 256 + } else if parent != nil { + // Child of JSON array - use parent's progression divided by 2 and 4 + maxTypes = parent.MaxDynamicTypes / 2 + maxPaths = parent.MaxDynamicPaths / 4 + if maxTypes < 0 { + maxTypes = 0 + } + if maxPaths < 0 { + maxPaths = 0 + } + } + + // Use cached types from the batched metadata query + types, ok := pb.typeCache[pathSoFar] + if !ok { + return nil, errors.NewInternalf(errors.CodeInvalidInput, "types missing for path %s", pathSoFar) + } + + // Create node for this path segment + node := &JSONAccessNode{ + Name: segmentName, + IsTerminal: isTerminal, + AvailableTypes: types, + Branches: make(map[JSONAccessBranchType]*JSONAccessNode), + Parent: parent, + MaxDynamicTypes: maxTypes, + MaxDynamicPaths: maxPaths, + } + + hasJSON := slices.Contains(node.AvailableTypes, ArrayJSON) + hasDynamic := slices.Contains(node.AvailableTypes, ArrayDynamic) + + // Configure terminal if this is the last part + if isTerminal { + node.TerminalConfig = &TerminalConfig{ + Key: pb.key, + ElemType: *pb.key.JSONDataType, + } + } else { + var err error + if hasJSON { + node.Branches[BranchJSON], err = pb.buildPlan(index+1, node, false) + if err != nil { + return nil, err + } + } + if hasDynamic { + node.Branches[BranchDynamic], err = pb.buildPlan(index+1, node, true) + if err != nil { + return nil, err + } + } + } + + return node, nil +} + +// buildJSONAccessPlan builds a tree structure representing the complete JSON path traversal +// that precomputes all possible branches and their types +func (key *TelemetryFieldKey) SetJSONAccessPlan(columnInfo JSONColumnMetadata, typeCache map[string][]JSONDataType, +) error { + // if path is empty, return nil + if key.Name == "" { + return errors.NewInvalidInputf(errors.CodeInvalidInput, "path is empty") + } + + pb := &planBuilder{ + key: key, + paths: key.ArrayParentPaths(), + segments: key.ArrayPathSegments(), + isPromoted: key.Materialized, + typeCache: typeCache, + } + + node, err := pb.buildPlan(0, + NewRootJSONAccessNode(columnInfo.BaseColumn, + 32, 0), + false, + ) + if err != nil { + return err + } + key.JSONPlan = append(key.JSONPlan, node) + + if pb.isPromoted { + node, err := pb.buildPlan(0, + NewRootJSONAccessNode(columnInfo.PromotedColumn, + 32, 1024), + true, + ) + if err != nil { + return err + } + key.JSONPlan = append(key.JSONPlan, node) + } + + return nil +} diff --git a/pkg/telemetrylogs/json_access_pb_test.go b/pkg/types/telemetrytypes/json_access_plan_test.go similarity index 59% rename from pkg/telemetrylogs/json_access_pb_test.go rename to pkg/types/telemetrytypes/json_access_plan_test.go index 5e5a9332dc..fbf3049218 100644 --- a/pkg/telemetrylogs/json_access_pb_test.go +++ b/pkg/types/telemetrytypes/json_access_plan_test.go @@ -1,12 +1,8 @@ -package telemetrylogs +package telemetrytypes import ( - "context" "testing" - qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" - "github.com/SigNoz/signoz/pkg/types/telemetrytypes" - "github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest" "github.com/stretchr/testify/require" "gopkg.in/yaml.v3" ) @@ -16,8 +12,8 @@ import ( // ============================================================================ // makeKey creates a TelemetryFieldKey for testing -func makeKey(name string, dataType telemetrytypes.JSONDataType, materialized bool) *telemetrytypes.TelemetryFieldKey { - return &telemetrytypes.TelemetryFieldKey{ +func makeKey(name string, dataType JSONDataType, materialized bool) *TelemetryFieldKey { + return &TelemetryFieldKey{ Name: name, JSONDataType: &dataType, Materialized: materialized, @@ -38,14 +34,13 @@ type jsonAccessTestNode struct { MaxDynamicTypes int `yaml:"maxDynamicTypes,omitempty"` MaxDynamicPaths int `yaml:"maxDynamicPaths,omitempty"` ElemType string `yaml:"elemType,omitempty"` - ValueType string `yaml:"valueType,omitempty"` AvailableTypes []string `yaml:"availableTypes,omitempty"` Branches map[string]*jsonAccessTestNode `yaml:"branches,omitempty"` } // toTestNode converts a JSONAccessNode tree into jsonAccessTestNode so that // it can be serialized to YAML for easy visual comparison in tests. -func toTestNode(n *telemetrytypes.JSONAccessNode) *jsonAccessTestNode { +func toTestNode(n *JSONAccessNode) *jsonAccessTestNode { if n == nil { return nil } @@ -74,7 +69,6 @@ func toTestNode(n *telemetrytypes.JSONAccessNode) *jsonAccessTestNode { // Terminal config if n.TerminalConfig != nil { out.ElemType = n.TerminalConfig.ElemType.StringValue() - out.ValueType = n.TerminalConfig.ValueType.StringValue() } // Branches @@ -90,7 +84,7 @@ func toTestNode(n *telemetrytypes.JSONAccessNode) *jsonAccessTestNode { // plansToYAML converts a slice of JSONAccessNode plans to a YAML string that // can be compared against a per-test expectedTree. -func plansToYAML(t *testing.T, plans []*telemetrytypes.JSONAccessNode) string { +func plansToYAML(t *testing.T, plans []*JSONAccessNode) string { t.Helper() testNodes := make([]*jsonAccessTestNode, 0, len(plans)) @@ -110,17 +104,17 @@ func plansToYAML(t *testing.T, plans []*telemetrytypes.JSONAccessNode) string { func TestNode_Alias(t *testing.T) { tests := []struct { name string - node *telemetrytypes.JSONAccessNode + node *JSONAccessNode expected string }{ { name: "Root node returns name as-is", - node: telemetrytypes.NewRootJSONAccessNode(LogsV2BodyJSONColumn, 32, 0), - expected: LogsV2BodyJSONColumn, + node: NewRootJSONAccessNode("body_json", 32, 0), + expected: "body_json", }, { name: "Node without parent returns backticked name", - node: &telemetrytypes.JSONAccessNode{ + node: &JSONAccessNode{ Name: "user", Parent: nil, }, @@ -128,36 +122,36 @@ func TestNode_Alias(t *testing.T) { }, { name: "Node with root parent uses dot separator", - node: &telemetrytypes.JSONAccessNode{ + node: &JSONAccessNode{ Name: "age", - Parent: telemetrytypes.NewRootJSONAccessNode(LogsV2BodyJSONColumn, 32, 0), + Parent: NewRootJSONAccessNode("body_json", 32, 0), }, - expected: "`" + LogsV2BodyJSONColumn + ".age`", + expected: "`" + "body_json" + ".age`", }, { name: "Node with non-root parent uses array separator", - node: &telemetrytypes.JSONAccessNode{ + node: &JSONAccessNode{ Name: "name", - Parent: &telemetrytypes.JSONAccessNode{ + Parent: &JSONAccessNode{ Name: "education", - Parent: telemetrytypes.NewRootJSONAccessNode(LogsV2BodyJSONColumn, 32, 0), + Parent: NewRootJSONAccessNode("body_json", 32, 0), }, }, - expected: "`" + LogsV2BodyJSONColumn + ".education[].name`", + expected: "`" + "body_json" + ".education[].name`", }, { name: "Nested array path with multiple levels", - node: &telemetrytypes.JSONAccessNode{ + node: &JSONAccessNode{ Name: "type", - Parent: &telemetrytypes.JSONAccessNode{ + Parent: &JSONAccessNode{ Name: "awards", - Parent: &telemetrytypes.JSONAccessNode{ + Parent: &JSONAccessNode{ Name: "education", - Parent: telemetrytypes.NewRootJSONAccessNode(LogsV2BodyJSONColumn, 32, 0), + Parent: NewRootJSONAccessNode("body_json", 32, 0), }, }, }, - expected: "`" + LogsV2BodyJSONColumn + ".education[].awards[].type`", + expected: "`" + "body_json" + ".education[].awards[].type`", }, } @@ -172,49 +166,49 @@ func TestNode_Alias(t *testing.T) { func TestNode_FieldPath(t *testing.T) { tests := []struct { name string - node *telemetrytypes.JSONAccessNode + node *JSONAccessNode expected string }{ { name: "Simple field path from root", - node: &telemetrytypes.JSONAccessNode{ + node: &JSONAccessNode{ Name: "user", - Parent: telemetrytypes.NewRootJSONAccessNode(LogsV2BodyJSONColumn, 32, 0), + Parent: NewRootJSONAccessNode("body_json", 32, 0), }, // FieldPath() always wraps the field name in backticks - expected: LogsV2BodyJSONColumn + ".`user`", + expected: "body_json" + ".`user`", }, { name: "Field path with backtick-required key", - node: &telemetrytypes.JSONAccessNode{ + node: &JSONAccessNode{ Name: "user-name", // requires backtick - Parent: telemetrytypes.NewRootJSONAccessNode(LogsV2BodyJSONColumn, 32, 0), + Parent: NewRootJSONAccessNode("body_json", 32, 0), }, - expected: LogsV2BodyJSONColumn + ".`user-name`", + expected: "body_json" + ".`user-name`", }, { name: "Nested field path", - node: &telemetrytypes.JSONAccessNode{ + node: &JSONAccessNode{ Name: "age", - Parent: &telemetrytypes.JSONAccessNode{ + Parent: &JSONAccessNode{ Name: "user", - Parent: telemetrytypes.NewRootJSONAccessNode(LogsV2BodyJSONColumn, 32, 0), + Parent: NewRootJSONAccessNode("body_json", 32, 0), }, }, // FieldPath() always wraps the field name in backticks - expected: "`" + LogsV2BodyJSONColumn + ".user`.`age`", + expected: "`" + "body_json" + ".user`.`age`", }, { name: "Array element field path", - node: &telemetrytypes.JSONAccessNode{ + node: &JSONAccessNode{ Name: "name", - Parent: &telemetrytypes.JSONAccessNode{ + Parent: &JSONAccessNode{ Name: "education", - Parent: telemetrytypes.NewRootJSONAccessNode(LogsV2BodyJSONColumn, 32, 0), + Parent: NewRootJSONAccessNode("body_json", 32, 0), }, }, // FieldPath() always wraps the field name in backticks - expected: "`" + LogsV2BodyJSONColumn + ".education`.`name`", + expected: "`" + "body_json" + ".education`.`name`", }, } @@ -231,17 +225,17 @@ func TestNode_FieldPath(t *testing.T) { // ============================================================================ func TestPlanJSON_BasicStructure(t *testing.T) { - _, metadataStore := testTypeSet() + types, _ := TestJSONTypeSet() tests := []struct { name string - key *telemetrytypes.TelemetryFieldKey + key *TelemetryFieldKey expectErr bool expectedYAML string }{ { name: "Simple path not promoted", - key: makeKey("user.name", telemetrytypes.String, false), + key: makeKey("user.name", String, false), expectedYAML: ` - name: user.name column: body_json @@ -250,12 +244,11 @@ func TestPlanJSON_BasicStructure(t *testing.T) { maxDynamicTypes: 16 isTerminal: true elemType: String - valueType: String `, }, { name: "Simple path promoted", - key: makeKey("user.name", telemetrytypes.String, true), + key: makeKey("user.name", String, true), expectedYAML: ` - name: user.name column: body_json @@ -264,7 +257,6 @@ func TestPlanJSON_BasicStructure(t *testing.T) { maxDynamicTypes: 16 isTerminal: true elemType: String - valueType: String - name: user.name column: body_json_promoted availableTypes: @@ -273,12 +265,11 @@ func TestPlanJSON_BasicStructure(t *testing.T) { maxDynamicPaths: 256 isTerminal: true elemType: String - valueType: String `, }, { name: "Empty path returns error", - key: makeKey("", telemetrytypes.String, false), + key: makeKey("", String, false), expectErr: true, expectedYAML: "", }, @@ -286,21 +277,24 @@ func TestPlanJSON_BasicStructure(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - plans, err := PlanJSON(context.Background(), tt.key, qbtypes.FilterOperatorEqual, "John", metadataStore) + err := tt.key.SetJSONAccessPlan(JSONColumnMetadata{ + BaseColumn: "body_json", + PromotedColumn: "body_json_promoted", + }, types) if tt.expectErr { require.Error(t, err) - require.Nil(t, plans) + require.Nil(t, tt.key.JSONPlan) return } require.NoError(t, err) - got := plansToYAML(t, plans) + got := plansToYAML(t, tt.key.JSONPlan) require.YAMLEq(t, tt.expectedYAML, got) }) } } func TestPlanJSON_ArrayPaths(t *testing.T) { - _, metadataStore := testTypeSet() + types, _ := TestJSONTypeSet() tests := []struct { name string @@ -324,7 +318,6 @@ func TestPlanJSON_ArrayPaths(t *testing.T) { maxDynamicTypes: 8 isTerminal: true elemType: String - valueType: String `, }, { @@ -351,7 +344,6 @@ func TestPlanJSON_ArrayPaths(t *testing.T) { maxDynamicTypes: 4 isTerminal: true elemType: String - valueType: String dynamic: name: type availableTypes: @@ -360,7 +352,6 @@ func TestPlanJSON_ArrayPaths(t *testing.T) { maxDynamicPaths: 256 isTerminal: true elemType: String - valueType: String `, }, { @@ -408,7 +399,6 @@ func TestPlanJSON_ArrayPaths(t *testing.T) { - String isTerminal: true elemType: String - valueType: String `, }, { @@ -428,34 +418,38 @@ func TestPlanJSON_ArrayPaths(t *testing.T) { maxDynamicTypes: 8 isTerminal: true elemType: String - valueType: String `, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - key := makeKey(tt.path, telemetrytypes.String, false) - plans, err := PlanJSON(context.Background(), key, qbtypes.FilterOperatorEqual, "John", metadataStore) + key := makeKey(tt.path, String, false) + err := key.SetJSONAccessPlan(JSONColumnMetadata{ + BaseColumn: "body_json", + PromotedColumn: "body_json_promoted", + }, types) require.NoError(t, err) - require.NotNil(t, plans) - require.Len(t, plans, 1) - got := plansToYAML(t, plans) + require.NotNil(t, key.JSONPlan) + require.Len(t, key.JSONPlan, 1) + got := plansToYAML(t, key.JSONPlan) require.YAMLEq(t, tt.expectedYAML, got) }) } } func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) { - _, metadataStore := testTypeSet() + types, _ := TestJSONTypeSet() path := "education[].awards[].type" - value := "sports" t.Run("Non-promoted plan", func(t *testing.T) { - key := makeKey(path, telemetrytypes.String, false) - plans, err := PlanJSON(context.Background(), key, qbtypes.FilterOperatorEqual, value, metadataStore) + key := makeKey(path, String, false) + err := key.SetJSONAccessPlan(JSONColumnMetadata{ + BaseColumn: "body_json", + PromotedColumn: "body_json_promoted", + }, types) require.NoError(t, err) - require.Len(t, plans, 1) + require.Len(t, key.JSONPlan, 1) expectedYAML := ` - name: education @@ -478,7 +472,6 @@ func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) { maxDynamicTypes: 4 isTerminal: true elemType: String - valueType: String dynamic: name: type availableTypes: @@ -487,17 +480,19 @@ func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) { maxDynamicPaths: 256 isTerminal: true elemType: String - valueType: String ` - got := plansToYAML(t, plans) + got := plansToYAML(t, key.JSONPlan) require.YAMLEq(t, expectedYAML, got) }) t.Run("Promoted plan", func(t *testing.T) { - key := makeKey(path, telemetrytypes.String, true) - plans, err := PlanJSON(context.Background(), key, qbtypes.FilterOperatorEqual, value, metadataStore) + key := makeKey(path, String, true) + err := key.SetJSONAccessPlan(JSONColumnMetadata{ + BaseColumn: "body_json", + PromotedColumn: "body_json_promoted", + }, types) require.NoError(t, err) - require.Len(t, plans, 2) + require.Len(t, key.JSONPlan, 2) expectedYAML := ` - name: education @@ -520,7 +515,6 @@ func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) { maxDynamicTypes: 4 isTerminal: true elemType: String - valueType: String dynamic: name: type availableTypes: @@ -529,7 +523,6 @@ func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) { maxDynamicPaths: 256 isTerminal: true elemType: String - valueType: String - name: education column: body_json_promoted availableTypes: @@ -553,7 +546,6 @@ func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) { maxDynamicPaths: 16 isTerminal: true elemType: String - valueType: String dynamic: name: type availableTypes: @@ -562,39 +554,29 @@ func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) { maxDynamicPaths: 256 isTerminal: true elemType: String - valueType: String ` - got := plansToYAML(t, plans) + got := plansToYAML(t, key.JSONPlan) require.YAMLEq(t, expectedYAML, got) }) } func TestPlanJSON_EdgeCases(t *testing.T) { - _, metadataStore := testTypeSet() + types, _ := TestJSONTypeSet() tests := []struct { name string path string - value any expectedYAML string + expectErr bool }{ { - name: "Path with no available types", - path: "unknown.path", - value: "test", - expectedYAML: ` -- name: unknown.path - column: body_json - maxDynamicTypes: 16 - isTerminal: true - elemType: String - valueType: String -`, + name: "Path with no available types", + path: "unknown.path", + expectErr: true, }, { name: "Very deep nesting - validates progression doesn't go negative", path: "interests[].entities[].reviews[].entries[].metadata[].positions[].name", - value: "Engineer", expectedYAML: ` - name: interests column: body_json @@ -637,13 +619,11 @@ func TestPlanJSON_EdgeCases(t *testing.T) { - String isTerminal: true elemType: String - valueType: String `, }, { name: "Path with mixed scalar and array types", path: "education[].type", - value: "high_school", expectedYAML: ` - name: education column: body_json @@ -659,13 +639,11 @@ func TestPlanJSON_EdgeCases(t *testing.T) { maxDynamicTypes: 8 isTerminal: true elemType: String - valueType: String `, }, { name: "Exists with only array types available", path: "education", - value: nil, expectedYAML: ` - name: education column: body_json @@ -681,29 +659,39 @@ func TestPlanJSON_EdgeCases(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { // Choose key type based on path; operator does not affect the tree shape asserted here. - keyType := telemetrytypes.String + keyType := String switch tt.path { case "education": - keyType = telemetrytypes.ArrayJSON + keyType = ArrayJSON case "education[].type": - keyType = telemetrytypes.String + keyType = String } key := makeKey(tt.path, keyType, false) - plans, err := PlanJSON(context.Background(), key, qbtypes.FilterOperatorEqual, tt.value, metadataStore) + err := key.SetJSONAccessPlan(JSONColumnMetadata{ + BaseColumn: "body_json", + PromotedColumn: "body_json_promoted", + }, types) + if tt.expectErr { + require.Error(t, err) + return + } require.NoError(t, err) - got := plansToYAML(t, plans) + got := plansToYAML(t, key.JSONPlan) require.YAMLEq(t, tt.expectedYAML, got) }) } } func TestPlanJSON_TreeStructure(t *testing.T) { - _, metadataStore := testTypeSet() + types, _ := TestJSONTypeSet() path := "education[].awards[].participated[].team[].branch" - key := makeKey(path, telemetrytypes.String, false) - plans, err := PlanJSON(context.Background(), key, qbtypes.FilterOperatorEqual, "John", metadataStore) + key := makeKey(path, String, false) + err := key.SetJSONAccessPlan(JSONColumnMetadata{ + BaseColumn: "body_json", + PromotedColumn: "body_json_promoted", + }, types) require.NoError(t, err) - require.Len(t, plans, 1) + require.Len(t, key.JSONPlan, 1) expectedYAML := ` - name: education @@ -739,7 +727,6 @@ func TestPlanJSON_TreeStructure(t *testing.T) { maxDynamicTypes: 1 isTerminal: true elemType: String - valueType: String dynamic: name: team availableTypes: @@ -755,7 +742,6 @@ func TestPlanJSON_TreeStructure(t *testing.T) { maxDynamicPaths: 64 isTerminal: true elemType: String - valueType: String dynamic: name: participated availableTypes: @@ -779,7 +765,6 @@ func TestPlanJSON_TreeStructure(t *testing.T) { maxDynamicPaths: 16 isTerminal: true elemType: String - valueType: String dynamic: name: team availableTypes: @@ -795,89 +780,8 @@ func TestPlanJSON_TreeStructure(t *testing.T) { maxDynamicPaths: 64 isTerminal: true elemType: String - valueType: String ` - got := plansToYAML(t, plans) + got := plansToYAML(t, key.JSONPlan) require.YAMLEq(t, expectedYAML, got) } - -// ============================================================================ -// Test Data Setup -// ============================================================================ - -// testTypeSet returns a map of path->types and a mock MetadataStore for testing -// This represents the type information available in the test JSON structure -func testTypeSet() (map[string][]telemetrytypes.JSONDataType, telemetrytypes.MetadataStore) { - types := map[string][]telemetrytypes.JSONDataType{ - "user.name": {telemetrytypes.String}, - "user.age": {telemetrytypes.Int64, telemetrytypes.String}, - "user.height": {telemetrytypes.Float64}, - "education": {telemetrytypes.ArrayJSON}, - "education[].name": {telemetrytypes.String}, - "education[].type": {telemetrytypes.String, telemetrytypes.Int64}, - "education[].internal_type": {telemetrytypes.String}, - "education[].metadata.location": {telemetrytypes.String}, - "education[].parameters": {telemetrytypes.ArrayFloat64, telemetrytypes.ArrayDynamic}, - "education[].duration": {telemetrytypes.String}, - "education[].mode": {telemetrytypes.String}, - "education[].year": {telemetrytypes.Int64}, - "education[].field": {telemetrytypes.String}, - "education[].awards": {telemetrytypes.ArrayDynamic, telemetrytypes.ArrayJSON}, - "education[].awards[].name": {telemetrytypes.String}, - "education[].awards[].rank": {telemetrytypes.Int64}, - "education[].awards[].medal": {telemetrytypes.String}, - "education[].awards[].type": {telemetrytypes.String}, - "education[].awards[].semester": {telemetrytypes.Int64}, - "education[].awards[].participated": {telemetrytypes.ArrayDynamic, telemetrytypes.ArrayJSON}, - "education[].awards[].participated[].type": {telemetrytypes.String}, - "education[].awards[].participated[].field": {telemetrytypes.String}, - "education[].awards[].participated[].project_type": {telemetrytypes.String}, - "education[].awards[].participated[].project_name": {telemetrytypes.String}, - "education[].awards[].participated[].race_type": {telemetrytypes.String}, - "education[].awards[].participated[].team_based": {telemetrytypes.Bool}, - "education[].awards[].participated[].team_name": {telemetrytypes.String}, - "education[].awards[].participated[].team": {telemetrytypes.ArrayJSON}, - "education[].awards[].participated[].team[].name": {telemetrytypes.String}, - "education[].awards[].participated[].team[].branch": {telemetrytypes.String}, - "education[].awards[].participated[].team[].semester": {telemetrytypes.Int64}, - "interests": {telemetrytypes.ArrayJSON}, - "interests[].type": {telemetrytypes.String}, - "interests[].entities": {telemetrytypes.ArrayJSON}, - "interests[].entities.application_date": {telemetrytypes.String}, - "interests[].entities[].reviews": {telemetrytypes.ArrayJSON}, - "interests[].entities[].reviews[].given_by": {telemetrytypes.String}, - "interests[].entities[].reviews[].remarks": {telemetrytypes.String}, - "interests[].entities[].reviews[].weight": {telemetrytypes.Float64}, - "interests[].entities[].reviews[].passed": {telemetrytypes.Bool}, - "interests[].entities[].reviews[].type": {telemetrytypes.String}, - "interests[].entities[].reviews[].analysis_type": {telemetrytypes.Int64}, - "interests[].entities[].reviews[].entries": {telemetrytypes.ArrayJSON}, - "interests[].entities[].reviews[].entries[].subject": {telemetrytypes.String}, - "interests[].entities[].reviews[].entries[].status": {telemetrytypes.String}, - "interests[].entities[].reviews[].entries[].metadata": {telemetrytypes.ArrayJSON}, - "interests[].entities[].reviews[].entries[].metadata[].company": {telemetrytypes.String}, - "interests[].entities[].reviews[].entries[].metadata[].experience": {telemetrytypes.Int64}, - "interests[].entities[].reviews[].entries[].metadata[].unit": {telemetrytypes.String}, - "interests[].entities[].reviews[].entries[].metadata[].positions": {telemetrytypes.ArrayJSON}, - "interests[].entities[].reviews[].entries[].metadata[].positions[].name": {telemetrytypes.String}, - "interests[].entities[].reviews[].entries[].metadata[].positions[].duration": {telemetrytypes.Int64, telemetrytypes.Float64}, - "interests[].entities[].reviews[].entries[].metadata[].positions[].unit": {telemetrytypes.String}, - "interests[].entities[].reviews[].entries[].metadata[].positions[].ratings": {telemetrytypes.ArrayInt64, telemetrytypes.ArrayString}, - "message": {telemetrytypes.String}, - } - - mockMetadataStore := telemetrytypestest.NewMockMetadataStore() - for path, dataTypes := range types { - for _, dataType := range dataTypes { - mockMetadataStore.SetKey(&telemetrytypes.TelemetryFieldKey{ - Name: path, - JSONDataType: &dataType, - Signal: telemetrytypes.SignalLogs, - FieldContext: telemetrytypes.FieldContextBody, - FieldDataType: telemetrytypes.MappingJSONDataTypeToFieldDataType[dataType], - }) - } - } - return types, mockMetadataStore -} diff --git a/pkg/types/telemetrytypes/test_data.go b/pkg/types/telemetrytypes/test_data.go new file mode 100644 index 0000000000..6ec46c242e --- /dev/null +++ b/pkg/types/telemetrytypes/test_data.go @@ -0,0 +1,71 @@ +package telemetrytypes + +// ============================================================================ +// Test JSON Type Set Data Setup +// ============================================================================ + +// TestJSONTypeSet returns a map of path->types for testing +// This represents the type information available in the test JSON structure +func TestJSONTypeSet() (map[string][]JSONDataType, MetadataStore) { + types := map[string][]JSONDataType{ + "user.name": {String}, + "user.permissions": {ArrayString}, + "user.age": {Int64, String}, + "user.height": {Float64}, + "education": {ArrayJSON}, + "education[].name": {String}, + "education[].type": {String, Int64}, + "education[].internal_type": {String}, + "education[].metadata.location": {String}, + "education[].parameters": {ArrayFloat64, ArrayDynamic}, + "education[].duration": {String}, + "education[].mode": {String}, + "education[].year": {Int64}, + "education[].field": {String}, + "education[].awards": {ArrayDynamic, ArrayJSON}, + "education[].awards[].name": {String}, + "education[].awards[].rank": {Int64}, + "education[].awards[].medal": {String}, + "education[].awards[].type": {String}, + "education[].awards[].semester": {Int64}, + "education[].awards[].participated": {ArrayDynamic, ArrayJSON}, + "education[].awards[].participated[].type": {String}, + "education[].awards[].participated[].field": {String}, + "education[].awards[].participated[].project_type": {String}, + "education[].awards[].participated[].project_name": {String}, + "education[].awards[].participated[].race_type": {String}, + "education[].awards[].participated[].team_based": {Bool}, + "education[].awards[].participated[].team_name": {String}, + "education[].awards[].participated[].team": {ArrayJSON}, + "education[].awards[].participated[].members": {ArrayString}, + "education[].awards[].participated[].team[].name": {String}, + "education[].awards[].participated[].team[].branch": {String}, + "education[].awards[].participated[].team[].semester": {Int64}, + "interests": {ArrayJSON}, + "interests[].type": {String}, + "interests[].entities": {ArrayJSON}, + "interests[].entities.application_date": {String}, + "interests[].entities[].reviews": {ArrayJSON}, + "interests[].entities[].reviews[].given_by": {String}, + "interests[].entities[].reviews[].remarks": {String}, + "interests[].entities[].reviews[].weight": {Float64}, + "interests[].entities[].reviews[].passed": {Bool}, + "interests[].entities[].reviews[].type": {String}, + "interests[].entities[].reviews[].analysis_type": {Int64}, + "interests[].entities[].reviews[].entries": {ArrayJSON}, + "interests[].entities[].reviews[].entries[].subject": {String}, + "interests[].entities[].reviews[].entries[].status": {String}, + "interests[].entities[].reviews[].entries[].metadata": {ArrayJSON}, + "interests[].entities[].reviews[].entries[].metadata[].company": {String}, + "interests[].entities[].reviews[].entries[].metadata[].experience": {Int64}, + "interests[].entities[].reviews[].entries[].metadata[].unit": {String}, + "interests[].entities[].reviews[].entries[].metadata[].positions": {ArrayJSON}, + "interests[].entities[].reviews[].entries[].metadata[].positions[].name": {String}, + "interests[].entities[].reviews[].entries[].metadata[].positions[].duration": {Int64, Float64}, + "interests[].entities[].reviews[].entries[].metadata[].positions[].unit": {String}, + "interests[].entities[].reviews[].entries[].metadata[].positions[].ratings": {ArrayInt64, ArrayString}, + "message": {String}, + } + + return types, nil +}