Compare commits

...

6 Commits

Author SHA1 Message Date
Piyush Singariya
252330321a chore: nearing wrapping up 2026-06-01 15:54:06 +05:30
Piyush Singariya
1bfc20582c fix: field mapper update 2026-06-01 15:40:34 +05:30
Piyush Singariya
ece9d45fc5 fix: plan test 2026-06-01 12:03:58 +05:30
Piyush Singariya
ca6199fbed chore: field mapper changes 2026-06-01 10:38:27 +05:30
Piyush Singariya
7f0adb5b2b chore: working on filter 2026-05-29 14:19:04 +05:30
Piyush Singariya
65252943a8 chore: bridging field evolution with promoted 2026-05-26 16:09:06 +05:30
10 changed files with 521 additions and 400 deletions

View File

@@ -38,17 +38,14 @@ func (c *conditionBuilder) conditionFor(
return "", err
}
// TODO(Piyush): Update this to support multiple JSON columns based on evolutions
for _, column := range columns {
// TODO(Tushar): thread orgID here to evaluate correctly
if column.Type.GetType() == schema.ColumnTypeEnumJSON && key.FieldContext == telemetrytypes.FieldContextBody && c.fl.BooleanOrEmpty(ctx, flagger.FeatureUseJSONBody, featuretypes.NewFlaggerEvaluationContext(valuer.UUID{})) && key.Name != messageSubField {
valueType, value := InferDataType(value, operator, key)
cond, err := NewJSONConditionBuilder(key, valueType).buildJSONCondition(operator, value, sb)
if err != nil {
return "", err
}
return cond, nil
// TODO(Tushar): thread orgID here to evaluate correctly
if key.FieldContext == telemetrytypes.FieldContextBody && c.fl.BooleanOrEmpty(ctx, flagger.FeatureUseJSONBody, featuretypes.NewFlaggerEvaluationContext(valuer.UUID{})) && key.Name != messageSubField {
valueType, value := InferDataType(value, operator, key)
cond, err := NewJSONConditionBuilder(key, valueType, logsV2Columns[LogsV2BodyV2Column]).buildJSONCondition(startNs, endNs, columns, operator, value, sb)
if err != nil {
return "", err
}
return cond, nil
}
if operator.IsStringSearchOperator() {

View File

@@ -69,14 +69,15 @@ var (
)
type fieldMapper struct {
fl flagger.Flagger
fl flagger.Flagger
jsonMapper *JSONFieldMapper
}
func NewFieldMapper(fl flagger.Flagger) qbtypes.FieldMapper {
return &fieldMapper{fl: fl}
return &fieldMapper{fl: fl, jsonMapper: NewJSONFieldMapper()}
}
func (m *fieldMapper) getColumn(ctx context.Context, key *telemetrytypes.TelemetryFieldKey) ([]*schema.Column, error) {
func (m *fieldMapper) getColumns(ctx context.Context, key *telemetrytypes.TelemetryFieldKey) ([]*schema.Column, error) {
switch key.FieldContext {
case telemetrytypes.FieldContextResource:
columns := []*schema.Column{logsV2Columns["resources_string"], logsV2Columns["resource"]}
@@ -105,7 +106,7 @@ func (m *fieldMapper) getColumn(ctx context.Context, key *telemetrytypes.Telemet
if key.Name == messageSubField {
return []*schema.Column{logsV2Columns[messageSubColumn]}, nil
}
return []*schema.Column{logsV2Columns[LogsV2BodyV2Column]}, nil
return []*schema.Column{logsV2Columns[LogsV2BodyV2Column], logsV2Columns[LogsV2BodyPromotedColumn]}, nil
}
// Fall back to legacy body column
return []*schema.Column{logsV2Columns["body"]}, nil
@@ -118,13 +119,9 @@ func (m *fieldMapper) getColumn(ctx context.Context, key *telemetrytypes.Telemet
if !ok {
// check if the key has body JSON search
if strings.HasPrefix(key.Name, telemetrytypes.BodyJSONStringSearchPrefix) {
// Use body_v2 if feature flag is enabled and we have a body condition builder
// TODO(Tushar): thread orgID here to evaluate correctly
if m.fl.BooleanOrEmpty(ctx, flagger.FeatureUseJSONBody, featuretypes.NewFlaggerEvaluationContext(valuer.UUID{})) {
// TODO(Piyush): Update this to support multiple JSON columns based on evolutions
// i.e return both the body json and body json promoted and let the evolutions decide which one to use
// based on the query range time.
return []*schema.Column{logsV2Columns[LogsV2BodyV2Column]}, nil
return []*schema.Column{logsV2Columns[LogsV2BodyV2Column], logsV2Columns[LogsV2BodyPromotedColumn]}, nil
}
// Fall back to legacy body column
return []*schema.Column{logsV2Columns["body"]}, nil
@@ -145,12 +142,15 @@ func (m *fieldMapper) getColumn(ctx context.Context, key *telemetrytypes.Telemet
// - For each column, includes its evolution if it's >= latest base evolution and <= tsEndTime
// - Results are sorted by ReleaseTime descending (newest first)
func selectEvolutionsForColumns(columns []*schema.Column, evolutions []*telemetrytypes.EvolutionEntry, tsStart, tsEnd uint64) ([]*schema.Column, []*telemetrytypes.EvolutionEntry, error) {
sortedEvolutions := make([]*telemetrytypes.EvolutionEntry, len(evolutions))
copy(sortedEvolutions, evolutions)
// sort the evolutions by ReleaseTime ascending
// Sort by ReleaseTime ascending; break ties by Version ascending so that at the
// same timestamp the highest-versioned entry is last and becomes the base anchor.
sort.Slice(sortedEvolutions, func(i, j int) bool {
if sortedEvolutions[i].ReleaseTime.Equal(sortedEvolutions[j].ReleaseTime) {
return sortedEvolutions[i].Version < sortedEvolutions[j].Version
}
return sortedEvolutions[i].ReleaseTime.Before(sortedEvolutions[j].ReleaseTime)
})
@@ -199,6 +199,12 @@ func selectEvolutionsForColumns(columns []*schema.Column, evolutions []*telemetr
if evolution.ReleaseTime.Before(latestBaseEvolutionAcrossAll.ReleaseTime) {
continue
}
// At the same timestamp, a lower-versioned entry is superseded by the higher-versioned
// base (e.g. body v0 is superseded by body_v2 v1 at epoch).
if evolution.ReleaseTime.Equal(latestBaseEvolutionAcrossAll.ReleaseTime) &&
evolution.Version < latestBaseEvolutionAcrossAll.Version {
continue
}
// skip evolutions after tsEndTime
if evolution.ReleaseTime.After(tsEndTime) || evolution.ReleaseTime.Equal(tsEndTime) {
continue
@@ -245,7 +251,7 @@ func selectEvolutionsForColumns(columns []*schema.Column, evolutions []*telemetr
}
func (m *fieldMapper) FieldFor(ctx context.Context, tsStart, tsEnd uint64, key *telemetrytypes.TelemetryFieldKey) (string, error) {
columns, err := m.getColumn(ctx, key)
columns, err := m.getColumns(ctx, key)
if err != nil {
return "", err
}
@@ -287,12 +293,18 @@ func (m *fieldMapper) FieldFor(ctx context.Context, tsStart, tsEnd uint64, key *
return "", qbtypes.ErrColumnNotFound
}
expr, err := m.buildFieldForJSON(key)
// Use `column` from the outer newColumns loop (already evolution-filtered).
expr, err := m.jsonMapper.FieldExprFor(key, column)
if err != nil {
return "", err
}
exprs = append(exprs, expr)
exist, err := m.jsonMapper.ExistExprFor(key, column)
if err != nil {
return "", err
}
existExpr = append(existExpr, exist)
default:
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "only resource/body context fields are supported for json columns, got %s", key.FieldContext.String)
}
@@ -348,7 +360,7 @@ func (m *fieldMapper) FieldFor(ctx context.Context, tsStart, tsEnd uint64, key *
}
func (m *fieldMapper) ColumnFor(ctx context.Context, _, _ uint64, key *telemetrytypes.TelemetryFieldKey) ([]*schema.Column, error) {
return m.getColumn(ctx, key)
return m.getColumns(ctx, key)
}
func (m *fieldMapper) ColumnExpressionFor(
@@ -401,145 +413,3 @@ func (m *fieldMapper) ColumnExpressionFor(
return fmt.Sprintf("%s AS `%s`", sqlbuilder.Escape(fieldExpression), field.Name), nil
}
// buildFieldForJSON builds the field expression for body JSON fields using arrayConcat pattern.
func (m *fieldMapper) buildFieldForJSON(key *telemetrytypes.TelemetryFieldKey) (string, error) {
plan := key.JSONPlan
if len(plan) == 0 {
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput,
"Could not find any valid paths for: %s", key.Name)
}
if plan[0].IsTerminal {
node := plan[0]
expr := fmt.Sprintf("dynamicElement(%s, '%s')", node.FieldPath(), node.TerminalConfig.ElemType.StringValue())
// TODO(Piyush): Promoted path logic commented out. Materialized now means type hint
// promotion will be extracted from key field evolution
// (direct sub-column access), not a promoted body_promoted.* column.
// if key.Materialized {
// if len(plan) < 2 {
// return "", errors.Newf(errors.TypeUnexpected, CodePromotedPlanMissing,
// "plan length is less than 2 for promoted path: %s", key.Name)
// }
// node := plan[1]
// promotedExpr := fmt.Sprintf(
// "dynamicElement(%s, '%s')",
// node.FieldPath(),
// node.TerminalConfig.ElemType.StringValue(),
// )
// // dynamicElement returns NULL for scalar types or an empty array for array types.
// if node.TerminalConfig.ElemType.IsArray {
// expr = fmt.Sprintf(
// "if(length(%s) > 0, %s, %s)",
// promotedExpr,
// promotedExpr,
// expr,
// )
// } else {
// // promoted column first then body_json column
// // TODO(Piyush): Change this in future for better performance
// expr = fmt.Sprintf("coalesce(%s, %s)", promotedExpr, expr)
// }
// }
return expr, nil
}
// Build arrayConcat pattern directly from the tree structure
arrayConcatExpr, err := m.buildArrayConcat(plan)
if err != nil {
return "", err
}
return arrayConcatExpr, nil
}
// buildArrayConcat builds the arrayConcat pattern directly from the tree structure.
func (m *fieldMapper) buildArrayConcat(plan telemetrytypes.JSONAccessPlan) (string, error) {
if len(plan) == 0 {
return "", errors.Newf(errors.TypeInternal, CodeGroupByPlanEmpty, "group by plan is empty while building arrayConcat")
}
// Build arrayMap expressions for ALL available branches at the root level.
// Iterate branches in deterministic order (JSON then Dynamic)
var arrayMapExpressions []string
for _, node := range plan {
for _, branchType := range node.BranchesInOrder() {
expr, err := m.buildArrayMap(node, branchType)
if err != nil {
return "", err
}
arrayMapExpressions = append(arrayMapExpressions, expr)
}
}
if len(arrayMapExpressions) == 0 {
return "", errors.Newf(errors.TypeInternal, CodeArrayMapExpressionsEmpty, "array map expressions are empty while building arrayConcat")
}
// Build the arrayConcat expression
arrayConcatExpr := fmt.Sprintf("arrayConcat(%s)", strings.Join(arrayMapExpressions, ", "))
// Wrap with arrayFlatten
arrayFlattenExpr := fmt.Sprintf("arrayFlatten(%s)", arrayConcatExpr)
return arrayFlattenExpr, nil
}
// buildArrayMap builds the arrayMap expression for a specific branch, handling all sub-branches.
func (m *fieldMapper) buildArrayMap(currentNode *telemetrytypes.JSONAccessNode, branchType telemetrytypes.JSONAccessBranchType) (string, error) {
if currentNode == nil {
return "", errors.Newf(errors.TypeInternal, CodeCurrentNodeNil, "current node is nil while building arrayMap")
}
childNode := currentNode.Branches[branchType]
if childNode == nil {
return "", errors.Newf(errors.TypeInternal, CodeChildNodeNil, "child node is nil while building arrayMap")
}
// Build the array expression for this level
var arrayExpr string
if branchType == telemetrytypes.BranchJSON {
// Array(JSON) branch
arrayExpr = fmt.Sprintf("dynamicElement(%s, 'Array(JSON(max_dynamic_types=%d, max_dynamic_paths=%d))')",
currentNode.FieldPath(), currentNode.MaxDynamicTypes, currentNode.MaxDynamicPaths)
} else {
// Array(Dynamic) branch - filter for JSON objects
dynBaseExpr := fmt.Sprintf("dynamicElement(%s, 'Array(Dynamic)')", currentNode.FieldPath())
arrayExpr = fmt.Sprintf("arrayMap(x->assumeNotNull(dynamicElement(x, 'JSON')), arrayFilter(x->(dynamicType(x) = 'JSON'), %s))", dynBaseExpr)
}
// If this is the terminal level, return the simple arrayMap
if childNode.IsTerminal {
dynamicElementExpr := fmt.Sprintf("dynamicElement(%s, '%s')", childNode.FieldPath(),
childNode.TerminalConfig.ElemType.StringValue(),
)
return fmt.Sprintf("arrayMap(%s->%s, %s)", currentNode.Alias(), dynamicElementExpr, arrayExpr), nil
}
// For non-terminal nodes, we need to handle ALL possible branches at the next level.
// Use deterministic branch order so generated SQL is stable across environments.
var nestedExpressions []string
for _, branchType := range childNode.BranchesInOrder() {
expr, err := m.buildArrayMap(childNode, branchType)
if err != nil {
return "", err
}
nestedExpressions = append(nestedExpressions, expr)
}
// If we have multiple nested expressions, we need to concat them
var nestedExpr string
if len(nestedExpressions) == 1 {
nestedExpr = nestedExpressions[0]
} else if len(nestedExpressions) > 1 {
nestedExpr = fmt.Sprintf("arrayConcat(%s)", strings.Join(nestedExpressions, ", "))
} else {
return "", errors.Newf(errors.TypeInternal, CodeNestedExpressionsEmpty, "nested expressions are empty while building arrayMap")
}
return fmt.Sprintf("arrayMap(%s->%s, %s)", currentNode.Alias(), nestedExpr, arrayExpr), nil
}

View File

@@ -23,42 +23,73 @@ var (
CodeArrayNavigationFailed = errors.MustNewCode("array_navigation_failed")
)
// JSON body fields are stored across two columns:
// - Base column (body_v2): present from the start, always carries a value,
// and is the only column with skip-indexes (e.g. JSON paths bloom filter).
// - Promoted column (body_promoted): introduced later via a schema evolution;
// its field set is a subset of the base column at the time of promotion.
type jsonConditionBuilder struct {
key *telemetrytypes.TelemetryFieldKey
valueType telemetrytypes.JSONDataType
key *telemetrytypes.TelemetryFieldKey
valueType telemetrytypes.JSONDataType
baseColumn *schemamigrator.Column
}
func NewJSONConditionBuilder(key *telemetrytypes.TelemetryFieldKey, valueType telemetrytypes.FieldDataType) *jsonConditionBuilder {
return &jsonConditionBuilder{key: key, valueType: telemetrytypes.MappingFieldDataTypeToJSONDataType[valueType]}
func NewJSONConditionBuilder(key *telemetrytypes.TelemetryFieldKey, valueType telemetrytypes.FieldDataType, baseColumn *schemamigrator.Column) *jsonConditionBuilder {
return &jsonConditionBuilder{key: key, valueType: telemetrytypes.MappingFieldDataTypeToJSONDataType[valueType], baseColumn: baseColumn}
}
// BuildCondition builds the full WHERE condition for body_v2 JSON paths.
func (c *jsonConditionBuilder) buildJSONCondition(operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
baseCond, err := c.emitPlannedCondition(operator, value, sb)
if err != nil {
return "", err
}
// path index
if operator.AddDefaultExistsFilter() {
pathIndex := fmt.Sprintf(`has(%s, '%s')`, schemamigrator.JSONPathsIndexExpr(LogsV2BodyV2Column), c.key.ArrayParentPaths()[0])
return sb.And(baseCond, pathIndex), nil
}
return baseCond, nil
}
func (c *jsonConditionBuilder) emitPlannedCondition(operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
// Build traversal + terminal recursively per-hop
conditions := []string{}
for _, node := range c.key.JSONPlan {
condition, err := c.recurseArrayHops(node, operator, value, sb)
func (c *jsonConditionBuilder) buildJSONCondition(startNs, endNs uint64, columns []*schemamigrator.Column, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
if len(c.key.Evolutions) > 0 {
filtered, _, err := selectEvolutionsForColumns(columns, c.key.Evolutions, startNs, endNs)
if err != nil {
return "", err
}
conditions = append(conditions, condition)
columns = filtered
}
columnConditions := make([]string, 0, len(columns))
for _, col := range columns {
cond, err := c.emitPlannedCondition(col, operator, value, sb)
if err != nil {
return "", err
}
columnConditions = append(columnConditions, cond)
}
var baseCondition string
if len(columnConditions) == 1 {
baseCondition = columnConditions[0]
} else {
baseCondition = sb.Or(columnConditions...)
}
// path index is only available on the base column
if operator.AddDefaultExistsFilter() {
return sb.And(baseCondition, fmt.Sprintf(`has(%s, '%s')`, schemamigrator.JSONPathsIndexExpr(c.baseColumn.Name), c.key.ArrayParentPaths()[0])), nil
}
return baseCondition, nil
}
func (c *jsonConditionBuilder) emitPlannedCondition(column *schemamigrator.Column, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
if c.key.PlanBuilder == nil {
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "no plan builder for key %s", c.key.Name)
}
var conditions []string
node, err := c.key.PlanBuilder.Build(column)
if err != nil {
return "", err
}
cond, err := c.recurseArrayHops(node, operator, value, sb)
if err != nil {
return "", err
}
conditions = append(conditions, cond)
return sb.Or(conditions...), nil
}

View File

@@ -0,0 +1,114 @@
package telemetrylogs
import (
"fmt"
"strings"
schemamigrator "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
type JSONFieldMapper struct{}
func NewJSONFieldMapper() *JSONFieldMapper { return &JSONFieldMapper{} }
// FieldExprFor returns the ClickHouse expression that reads the field value
// from the given JSON column — a dynamicElement(...) scalar for primitives or
// an arrayFlatten(arrayConcat(...)) for array paths.
func (j *JSONFieldMapper) FieldExprFor(key *telemetrytypes.TelemetryFieldKey, column *schemamigrator.Column) (string, error) {
node, err := key.PlanBuilder.Build(column)
if err != nil {
return "", err
}
if node.IsTerminal {
return fmt.Sprintf("dynamicElement(%s, '%s')", node.FieldPath(), node.TerminalConfig.ElemType.StringValue()), nil
}
return j.buildArrayConcat(node)
}
// ExistExprFor returns a boolean ClickHouse expression that evaluates to true
// when the field is present in the given column. Used as the dispatch predicate
// inside multiIf when multiple columns are active for a time range.
func (j *JSONFieldMapper) ExistExprFor(key *telemetrytypes.TelemetryFieldKey, column *schemamigrator.Column) (string, error) {
node, err := key.PlanBuilder.Build(column)
if err != nil {
return "", err
}
if node.IsTerminal {
dynamicExpr := fmt.Sprintf("dynamicElement(%s, '%s')", node.FieldPath(), node.TerminalConfig.ElemType.StringValue())
if node.TerminalConfig.ElemType.IsArray {
return fmt.Sprintf("length(%s) > 0", dynamicExpr), nil
}
return fmt.Sprintf("%s IS NOT NULL", dynamicExpr), nil
}
arrayConcatExpr, err := j.buildArrayConcat(node)
if err != nil {
return "", err
}
return fmt.Sprintf("length(%s) > 0", arrayConcatExpr), nil
}
// buildArrayConcat produces an arrayFlatten(arrayConcat(...)) expression that
// collects every matching element across all JSON / Dynamic branches.
func (j *JSONFieldMapper) buildArrayConcat(node *telemetrytypes.JSONAccessNode) (string, error) {
var arrayMapExprs []string
for _, branchType := range node.BranchesInOrder() {
expr, err := j.buildArrayMap(node, branchType)
if err != nil {
return "", err
}
arrayMapExprs = append(arrayMapExprs, expr)
}
if len(arrayMapExprs) == 0 {
return "", errors.Newf(errors.TypeInternal, CodeArrayMapExpressionsEmpty, "array map expressions are empty while building arrayConcat")
}
return fmt.Sprintf("arrayFlatten(arrayConcat(%s))", strings.Join(arrayMapExprs, ", ")), nil
}
// buildArrayMap builds a single arrayMap expression for one branch (JSON or
// Dynamic) at the current node level, recursing into deeper levels as needed.
func (j *JSONFieldMapper) buildArrayMap(currentNode *telemetrytypes.JSONAccessNode, branchType telemetrytypes.JSONAccessBranchType) (string, error) {
if currentNode == nil {
return "", errors.Newf(errors.TypeInternal, CodeCurrentNodeNil, "current node is nil while building arrayMap")
}
childNode := currentNode.Branches[branchType]
if childNode == nil {
return "", errors.Newf(errors.TypeInternal, CodeChildNodeNil, "child node is nil while building arrayMap")
}
var arrayExpr string
if branchType == telemetrytypes.BranchJSON {
arrayExpr = fmt.Sprintf("dynamicElement(%s, 'Array(JSON(max_dynamic_types=%d, max_dynamic_paths=%d))')",
currentNode.FieldPath(), currentNode.MaxDynamicTypes, currentNode.MaxDynamicPaths)
} else {
dynBaseExpr := fmt.Sprintf("dynamicElement(%s, 'Array(Dynamic)')", currentNode.FieldPath())
arrayExpr = fmt.Sprintf("arrayMap(x->assumeNotNull(dynamicElement(x, 'JSON')), arrayFilter(x->(dynamicType(x) = 'JSON'), %s))", dynBaseExpr)
}
if childNode.IsTerminal {
dynamicElementExpr := fmt.Sprintf("dynamicElement(%s, '%s')", childNode.FieldPath(), childNode.TerminalConfig.ElemType.StringValue())
return fmt.Sprintf("arrayMap(%s->%s, %s)", currentNode.Alias(), dynamicElementExpr, arrayExpr), nil
}
var nestedExprs []string
for _, bt := range childNode.BranchesInOrder() {
expr, err := j.buildArrayMap(childNode, bt)
if err != nil {
return "", err
}
nestedExprs = append(nestedExprs, expr)
}
var nestedExpr string
switch len(nestedExprs) {
case 0:
return "", errors.Newf(errors.TypeInternal, CodeNestedExpressionsEmpty, "nested expressions are empty while building arrayMap")
case 1:
nestedExpr = nestedExprs[0]
default:
nestedExpr = fmt.Sprintf("arrayConcat(%s)", strings.Join(nestedExprs, ", "))
}
return fmt.Sprintf("arrayMap(%s->%s, %s)", currentNode.Alias(), nestedExpr, arrayExpr), nil
}

File diff suppressed because one or more lines are too long

View File

@@ -85,27 +85,16 @@ func (t *telemetryMetaStore) enrichJSONKeys(ctx context.Context, selectors []*te
key.Indexes = indexes[key.Name]
}
// build JSON access plans using the pre-fetched parent type cache
return t.buildJSONPlans(filteredKeys, parentTypeCache)
}
// buildJSONPlans builds JSON access plans for the given keys
// using the provided parent type cache (pre-fetched in the main UNION query).
func (t *telemetryMetaStore) buildJSONPlans(keys []*telemetrytypes.TelemetryFieldKey, typeCache map[string][]telemetrytypes.FieldDataType) error {
if len(keys) == 0 {
return nil
}
// attach a lazy PlanBuilder so field_mapper can build plans on demand
columnMeta := t.jsonColumnMetadata[telemetrytypes.SignalLogs][telemetrytypes.FieldContextBody]
for _, key := range keys {
if err := key.SetJSONAccessPlan(columnMeta, typeCache); err != nil {
return err
}
for _, key := range filteredKeys {
key.PlanBuilder = telemetrytypes.NewFieldPlanBuilder(key, columnMeta, parentTypeCache)
}
return nil
}
func (t *telemetryMetaStore) getJSONPathIndexes(ctx context.Context, paths ...string) (map[string][]telemetrytypes.TelemetryFieldKeySkipIndex, error) {
filteredPaths := []string{}
for _, path := range paths {

View File

@@ -37,7 +37,7 @@ type TelemetryFieldKey struct {
FieldContext FieldContext `json:"fieldContext,omitzero"`
FieldDataType FieldDataType `json:"fieldDataType,omitzero"`
JSONPlan JSONAccessPlan `json:"-"`
PlanBuilder *FieldPlanBuilder `json:"-"`
Indexes []TelemetryFieldKeySkipIndex `json:"-"`
Materialized bool `json:"-"` // refers to promoted in case of body.... fields
@@ -120,7 +120,7 @@ func (f *TelemetryFieldKey) OverrideMetadataFrom(src *TelemetryFieldKey) {
f.FieldDataType = src.FieldDataType
f.Indexes = src.Indexes
f.Materialized = src.Materialized
f.JSONPlan = src.JSONPlan
f.PlanBuilder = src.PlanBuilder
f.Evolutions = src.Evolutions
}

View File

@@ -6,6 +6,7 @@ import (
"slices"
"strings"
schemamigrator "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz-otel-collector/exporter/jsontypeexporter"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/valuer"
@@ -28,8 +29,6 @@ type JSONColumnMetadata struct {
PromotedColumn string
}
type JSONAccessPlan = []*JSONAccessNode
type TerminalConfig struct {
Key *TelemetryFieldKey
ElemType JSONDataType
@@ -99,23 +98,48 @@ func (n *JSONAccessNode) BranchesInOrder() []JSONAccessBranchType {
})
}
type planBuilder struct {
// FieldPlanBuilder builds JSON access nodes on demand for a specific field key.
// It holds all necessary metadata (key, column info, type cache) and produces
// the correct root node per column, dispatching between base and promoted plans
// by column name — without any external "is promoted?" flag.
type FieldPlanBuilder struct {
key *TelemetryFieldKey
columnInfo JSONColumnMetadata
typeCache map[string][]FieldDataType
paths []string // cumulative paths for type cache lookups
segments []string // individual path segments for node names
isPromoted bool
typeCache map[string][]FieldDataType
}
func NewFieldPlanBuilder(key *TelemetryFieldKey, info JSONColumnMetadata, typeCache map[string][]FieldDataType) *FieldPlanBuilder {
return &FieldPlanBuilder{key: key, columnInfo: info, typeCache: typeCache}
}
// Build dispatches by column name — called by JSONConditionBuilder in field_mapper.
func (b *FieldPlanBuilder) Build(column *schemamigrator.Column) (*JSONAccessNode, error) {
if column.Name == b.columnInfo.BaseColumn {
return b.build(NewRootJSONAccessNode(b.columnInfo.BaseColumn, 32, 0))
}
return b.build(NewRootJSONAccessNode(b.columnInfo.PromotedColumn, 32, 1024))
}
func (b *FieldPlanBuilder) build(root *JSONAccessNode) (*JSONAccessNode, error) {
if b.key.Name == "" {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "path is empty")
}
b.paths = b.key.ArrayParentPaths()
b.segments = b.key.ArrayPathSegments()
return b.buildPlan(0, root, false)
}
// buildPlan recursively builds the path plan tree.
func (pb *planBuilder) buildPlan(index int, parent *JSONAccessNode, isDynArrChild bool) (*JSONAccessNode, error) {
if index >= len(pb.paths) {
func (b *FieldPlanBuilder) buildPlan(index int, parent *JSONAccessNode, isDynArrChild bool) (*JSONAccessNode, error) {
if index >= len(b.paths) {
return nil, errors.NewInvalidInputf(CodePlanIndexOutOfBounds, "index is out of bounds")
}
pathSoFar := pb.paths[index] // cumulative path for type cache lookup
segmentName := pb.segments[index] // segment name for node
isTerminal := index == len(pb.paths)-1
pathSoFar := b.paths[index] // cumulative path for type cache lookup
segmentName := b.segments[index] // segment name for node
isTerminal := index == len(b.paths)-1
// Calculate progression parameters based on parent's values
var maxTypes, maxPaths int
@@ -149,18 +173,18 @@ func (pb *planBuilder) buildPlan(index int, parent *JSONAccessNode, isDynArrChil
// Configure terminal if this is the last part
if isTerminal {
// fielddatatype must not be unspecified else expression can not be generated
if pb.key.FieldDataType == FieldDataTypeUnspecified {
if b.key.FieldDataType == FieldDataTypeUnspecified {
return nil, errors.NewInternalf(CodePlanFieldDataTypeMissing, "field data type is missing for path %s", pathSoFar)
}
node.TerminalConfig = &TerminalConfig{
Key: pb.key,
ElemType: pb.key.GetJSONDataType(),
Key: b.key,
ElemType: b.key.GetJSONDataType(),
}
} else {
var err error
// Use cached types from the batched metadata query
types, ok := pb.typeCache[pathSoFar]
types, ok := b.typeCache[pathSoFar]
if !ok {
return nil, errors.NewInternalf(errors.CodeInvalidInput, "types missing for path %s", pathSoFar)
}
@@ -172,13 +196,13 @@ func (pb *planBuilder) buildPlan(index int, parent *JSONAccessNode, isDynArrChil
}
if hasJSON {
node.Branches[BranchJSON], err = pb.buildPlan(index+1, node, false)
node.Branches[BranchJSON], err = b.buildPlan(index+1, node, false)
if err != nil {
return nil, err
}
}
if hasDynamic {
node.Branches[BranchDynamic], err = pb.buildPlan(index+1, node, true)
node.Branches[BranchDynamic], err = b.buildPlan(index+1, node, true)
if err != nil {
return nil, err
}
@@ -187,45 +211,3 @@ func (pb *planBuilder) buildPlan(index int, parent *JSONAccessNode, isDynArrChil
return node, nil
}
// buildJSONAccessPlan builds a tree structure representing the complete JSON path traversal
// that precomputes all possible branches and their types.
func (key *TelemetryFieldKey) SetJSONAccessPlan(columnInfo JSONColumnMetadata, typeCache map[string][]FieldDataType,
) error {
// if path is empty, return nil
if key.Name == "" {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "path is empty")
}
pb := &planBuilder{
key: key,
paths: key.ArrayParentPaths(),
segments: key.ArrayPathSegments(),
isPromoted: key.Materialized,
typeCache: typeCache,
}
node, err := pb.buildPlan(0,
NewRootJSONAccessNode(columnInfo.BaseColumn,
32, 0),
false,
)
if err != nil {
return err
}
key.JSONPlan = append(key.JSONPlan, node)
if pb.isPromoted {
node, err := pb.buildPlan(0,
NewRootJSONAccessNode(columnInfo.PromotedColumn,
32, 1024),
true,
)
if err != nil {
return err
}
key.JSONPlan = append(key.JSONPlan, node)
}
return nil
}

View File

@@ -80,6 +80,29 @@ func toTestNode(n *JSONAccessNode) *jsonAccessTestNode {
return out
}
// buildTestPlans sets PlanBuilder on the key and returns the pre-built access plans.
// Non-materialized keys produce one plan (base column); materialized keys produce two (base + promoted).
func buildTestPlans(t *testing.T, key *TelemetryFieldKey, meta JSONColumnMetadata, types map[string][]FieldDataType) ([]*JSONAccessNode, error) {
t.Helper()
key.PlanBuilder = NewFieldPlanBuilder(key, meta, types)
basePlan, err := key.PlanBuilder.build(NewRootJSONAccessNode(meta.BaseColumn, 32, 0))
if err != nil {
return nil, err
}
plans := []*JSONAccessNode{basePlan}
if key.Materialized {
promotedPlan, err := key.PlanBuilder.build(NewRootJSONAccessNode(meta.PromotedColumn, 32, 1024))
if err != nil {
return nil, err
}
plans = append(plans, promotedPlan)
}
return plans, nil
}
// plansToYAML converts a slice of JSONAccessNode plans to a YAML string that
// can be compared against a per-test expectedTree.
func plansToYAML(t *testing.T, plans []*JSONAccessNode) string {
@@ -267,19 +290,17 @@ func TestPlanJSON_BasicStructure(t *testing.T) {
},
}
meta := JSONColumnMetadata{BaseColumn: bodyV2Column, PromotedColumn: bodyPromotedColumn}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.key.SetJSONAccessPlan(JSONColumnMetadata{
BaseColumn: bodyV2Column,
PromotedColumn: bodyPromotedColumn,
}, types)
plans, err := buildTestPlans(t, tt.key, meta, types)
if tt.expectErr {
require.Error(t, err)
require.Nil(t, tt.key.JSONPlan)
require.Nil(t, plans)
return
}
require.NoError(t, err)
got := plansToYAML(t, tt.key.JSONPlan)
got := plansToYAML(t, plans)
require.YAMLEq(t, tt.expectedYAML, got)
})
}
@@ -383,17 +404,15 @@ func TestPlanJSON_ArrayPaths(t *testing.T) {
},
}
meta := JSONColumnMetadata{BaseColumn: bodyV2Column, PromotedColumn: bodyPromotedColumn}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
key := makeKey(tt.path, FieldDataTypeString, false)
err := key.SetJSONAccessPlan(JSONColumnMetadata{
BaseColumn: bodyV2Column,
PromotedColumn: bodyPromotedColumn,
}, types)
plans, err := buildTestPlans(t, key, meta, types)
require.NoError(t, err)
require.NotNil(t, key.JSONPlan)
require.Len(t, key.JSONPlan, 1)
got := plansToYAML(t, key.JSONPlan)
require.NotNil(t, plans)
require.Len(t, plans, 1)
got := plansToYAML(t, plans)
require.YAMLEq(t, tt.expectedYAML, got)
})
}
@@ -403,14 +422,13 @@ func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) {
types, _ := TestJSONTypeSet()
path := "education[].awards[].type"
meta := JSONColumnMetadata{BaseColumn: bodyV2Column, PromotedColumn: bodyPromotedColumn}
t.Run("Non-promoted plan", func(t *testing.T) {
key := makeKey(path, FieldDataTypeString, false)
err := key.SetJSONAccessPlan(JSONColumnMetadata{
BaseColumn: bodyV2Column,
PromotedColumn: bodyPromotedColumn,
}, types)
plans, err := buildTestPlans(t, key, meta, types)
require.NoError(t, err)
require.Len(t, key.JSONPlan, 1)
require.Len(t, plans, 1)
expectedYAML := fmt.Sprintf(`
- name: education
@@ -433,18 +451,15 @@ func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) {
isTerminal: true
elemType: String
`, bodyV2Column)
got := plansToYAML(t, key.JSONPlan)
got := plansToYAML(t, plans)
require.YAMLEq(t, expectedYAML, got)
})
t.Run("Promoted plan", func(t *testing.T) {
key := makeKey(path, FieldDataTypeString, true)
err := key.SetJSONAccessPlan(JSONColumnMetadata{
BaseColumn: bodyV2Column,
PromotedColumn: bodyPromotedColumn,
}, types)
plans, err := buildTestPlans(t, key, meta, types)
require.NoError(t, err)
require.Len(t, key.JSONPlan, 2)
require.Len(t, plans, 2)
expectedYAML := fmt.Sprintf(`
- name: education
@@ -489,7 +504,7 @@ func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) {
isTerminal: true
elemType: String
`, bodyV2Column, bodyPromotedColumn)
got := plansToYAML(t, key.JSONPlan)
got := plansToYAML(t, plans)
require.YAMLEq(t, expectedYAML, got)
})
}
@@ -580,7 +595,7 @@ func TestPlanJSON_EdgeCases(t *testing.T) {
keyType = FieldDataTypeString
}
key := makeKey(tt.path, keyType, false)
err := key.SetJSONAccessPlan(JSONColumnMetadata{
plans, err := buildTestPlans(t, key, JSONColumnMetadata{
BaseColumn: bodyV2Column,
PromotedColumn: bodyPromotedColumn,
}, types)
@@ -589,7 +604,7 @@ func TestPlanJSON_EdgeCases(t *testing.T) {
return
}
require.NoError(t, err)
got := plansToYAML(t, key.JSONPlan)
got := plansToYAML(t, plans)
require.YAMLEq(t, tt.expectedYAML, got)
})
}
@@ -599,12 +614,12 @@ func TestPlanJSON_TreeStructure(t *testing.T) {
types, _ := TestJSONTypeSet()
path := "education[].awards[].participated[].team[].branch"
key := makeKey(path, FieldDataTypeString, false)
err := key.SetJSONAccessPlan(JSONColumnMetadata{
plans, err := buildTestPlans(t, key, JSONColumnMetadata{
BaseColumn: bodyV2Column,
PromotedColumn: bodyPromotedColumn,
}, types)
require.NoError(t, err)
require.Len(t, key.JSONPlan, 1)
require.Len(t, plans, 1)
expectedYAML := fmt.Sprintf(`
- name: education
@@ -668,6 +683,6 @@ func TestPlanJSON_TreeStructure(t *testing.T) {
elemType: String
`, bodyV2Column)
got := plansToYAML(t, key.JSONPlan)
got := plansToYAML(t, plans)
require.YAMLEq(t, expectedYAML, got)
}

View File

@@ -61,6 +61,9 @@ func TestJSONTypeSet() (map[string][]FieldDataType, MetadataStore) {
"http-events": {FieldDataTypeArrayJSON},
"http-events[].request-info.host": {FieldDataTypeString},
// ── top-level arrays ─────────────────────────────────────────────
"tags": {FieldDataTypeArrayString},
// ── top-level primitives ──────────────────────────────────────────
"message": {FieldDataTypeString},
"http-status": {FieldDataTypeInt64, FieldDataTypeString}, // hyphen in root key, ambiguous