mirror of
https://github.com/SigNoz/signoz.git
synced 2026-05-26 11:50:31 +01:00
Compare commits
1 Commits
nv/v2-dash
...
promooted-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
65252943a8 |
@@ -10,6 +10,7 @@ import (
|
||||
"time"
|
||||
|
||||
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
|
||||
schemamigrator "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
|
||||
"github.com/SigNoz/signoz-otel-collector/utils"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/flagger"
|
||||
@@ -76,7 +77,7 @@ func NewFieldMapper(fl flagger.Flagger) qbtypes.FieldMapper {
|
||||
return &fieldMapper{fl: fl}
|
||||
}
|
||||
|
||||
func (m *fieldMapper) getColumn(ctx context.Context, key *telemetrytypes.TelemetryFieldKey) ([]*schema.Column, error) {
|
||||
func (m *fieldMapper) getColumns(ctx context.Context, key *telemetrytypes.TelemetryFieldKey) ([]*schema.Column, error) {
|
||||
switch key.FieldContext {
|
||||
case telemetrytypes.FieldContextResource:
|
||||
columns := []*schema.Column{logsV2Columns["resources_string"], logsV2Columns["resource"]}
|
||||
@@ -245,7 +246,7 @@ func selectEvolutionsForColumns(columns []*schema.Column, evolutions []*telemetr
|
||||
}
|
||||
|
||||
func (m *fieldMapper) FieldFor(ctx context.Context, tsStart, tsEnd uint64, key *telemetrytypes.TelemetryFieldKey) (string, error) {
|
||||
columns, err := m.getColumn(ctx, key)
|
||||
columns, err := m.getColumns(ctx, key)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@@ -287,12 +288,14 @@ func (m *fieldMapper) FieldFor(ctx context.Context, tsStart, tsEnd uint64, key *
|
||||
return "", qbtypes.ErrColumnNotFound
|
||||
}
|
||||
|
||||
expr, err := m.buildFieldForJSON(key)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
for _, column := range columns {
|
||||
expr, err := m.buildFieldForJSON(key, column)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
exprs = append(exprs, expr)
|
||||
exprs = append(exprs, expr)
|
||||
}
|
||||
default:
|
||||
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "only resource/body context fields are supported for json columns, got %s", key.FieldContext.String)
|
||||
}
|
||||
@@ -348,7 +351,7 @@ func (m *fieldMapper) FieldFor(ctx context.Context, tsStart, tsEnd uint64, key *
|
||||
}
|
||||
|
||||
func (m *fieldMapper) ColumnFor(ctx context.Context, _, _ uint64, key *telemetrytypes.TelemetryFieldKey) ([]*schema.Column, error) {
|
||||
return m.getColumn(ctx, key)
|
||||
return m.getColumns(ctx, key)
|
||||
}
|
||||
|
||||
func (m *fieldMapper) ColumnExpressionFor(
|
||||
@@ -403,16 +406,13 @@ func (m *fieldMapper) ColumnExpressionFor(
|
||||
}
|
||||
|
||||
// buildFieldForJSON builds the field expression for body JSON fields using arrayConcat pattern.
|
||||
func (m *fieldMapper) buildFieldForJSON(key *telemetrytypes.TelemetryFieldKey) (string, error) {
|
||||
plan := key.JSONPlan
|
||||
if len(plan) == 0 {
|
||||
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput,
|
||||
"Could not find any valid paths for: %s", key.Name)
|
||||
func (m *fieldMapper) buildFieldForJSON(key *telemetrytypes.TelemetryFieldKey, column *schemamigrator.Column) (string, error) {
|
||||
node, err := key.PlanBuilder.Build(*column)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if plan[0].IsTerminal {
|
||||
node := plan[0]
|
||||
|
||||
if node.IsTerminal {
|
||||
expr := fmt.Sprintf("dynamicElement(%s, '%s')", node.FieldPath(), node.TerminalConfig.ElemType.StringValue())
|
||||
// TODO(Piyush): Promoted path logic commented out. Materialized now means type hint
|
||||
// promotion will be extracted from key field evolution
|
||||
@@ -450,7 +450,7 @@ func (m *fieldMapper) buildFieldForJSON(key *telemetrytypes.TelemetryFieldKey) (
|
||||
}
|
||||
|
||||
// Build arrayConcat pattern directly from the tree structure
|
||||
arrayConcatExpr, err := m.buildArrayConcat(plan)
|
||||
arrayConcatExpr, err := m.buildArrayConcat(node)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@@ -459,23 +459,18 @@ func (m *fieldMapper) buildFieldForJSON(key *telemetrytypes.TelemetryFieldKey) (
|
||||
}
|
||||
|
||||
// buildArrayConcat builds the arrayConcat pattern directly from the tree structure.
|
||||
func (m *fieldMapper) buildArrayConcat(plan telemetrytypes.JSONAccessPlan) (string, error) {
|
||||
if len(plan) == 0 {
|
||||
return "", errors.Newf(errors.TypeInternal, CodeGroupByPlanEmpty, "group by plan is empty while building arrayConcat")
|
||||
}
|
||||
|
||||
func (m *fieldMapper) buildArrayConcat(node *telemetrytypes.JSONAccessNode) (string, error) {
|
||||
// Build arrayMap expressions for ALL available branches at the root level.
|
||||
// Iterate branches in deterministic order (JSON then Dynamic)
|
||||
var arrayMapExpressions []string
|
||||
for _, node := range plan {
|
||||
for _, branchType := range node.BranchesInOrder() {
|
||||
expr, err := m.buildArrayMap(node, branchType)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
arrayMapExpressions = append(arrayMapExpressions, expr)
|
||||
for _, branchType := range node.BranchesInOrder() {
|
||||
expr, err := m.buildArrayMap(node, branchType)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
arrayMapExpressions = append(arrayMapExpressions, expr)
|
||||
}
|
||||
|
||||
if len(arrayMapExpressions) == 0 {
|
||||
return "", errors.Newf(errors.TypeInternal, CodeArrayMapExpressionsEmpty, "array map expressions are empty while building arrayConcat")
|
||||
}
|
||||
|
||||
@@ -33,32 +33,42 @@ func NewJSONConditionBuilder(key *telemetrytypes.TelemetryFieldKey, valueType te
|
||||
}
|
||||
|
||||
// BuildCondition builds the full WHERE condition for body_v2 JSON paths.
|
||||
func (c *jsonConditionBuilder) buildJSONCondition(operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
|
||||
baseCond, err := c.emitPlannedCondition(operator, value, sb)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// path index
|
||||
if operator.AddDefaultExistsFilter() {
|
||||
pathIndex := fmt.Sprintf(`has(%s, '%s')`, schemamigrator.JSONPathsIndexExpr(LogsV2BodyV2Column), c.key.ArrayParentPaths()[0])
|
||||
return sb.And(baseCond, pathIndex), nil
|
||||
}
|
||||
|
||||
return baseCond, nil
|
||||
}
|
||||
|
||||
func (c *jsonConditionBuilder) emitPlannedCondition(operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
|
||||
// Build traversal + terminal recursively per-hop
|
||||
conditions := []string{}
|
||||
for _, node := range c.key.JSONPlan {
|
||||
condition, err := c.recurseArrayHops(node, operator, value, sb)
|
||||
func (c *jsonConditionBuilder) buildJSONCondition(columns []schemamigrator.Column, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
|
||||
if len(columns) == 1 {
|
||||
baseCond, err := c.emitPlannedCondition(columns[0], operator, value, sb)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
conditions = append(conditions, condition)
|
||||
|
||||
// path index
|
||||
if operator.AddDefaultExistsFilter() {
|
||||
pathIndex := fmt.Sprintf(`has(%s, '%s')`, schemamigrator.JSONPathsIndexExpr(LogsV2BodyV2Column), c.key.ArrayParentPaths()[0])
|
||||
return sb.And(baseCond, pathIndex), nil
|
||||
}
|
||||
|
||||
return baseCond, nil
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
func (c *jsonConditionBuilder) emitPlannedCondition(column schemamigrator.Column, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
|
||||
if c.key.PlanBuilder == nil {
|
||||
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "no plan builder for key %s", c.key.Name)
|
||||
}
|
||||
|
||||
var conditions []string
|
||||
|
||||
node, err := c.key.PlanBuilder.Build(column)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
cond, err := c.recurseArrayHops(node, operator, value, sb)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
conditions = append(conditions, cond)
|
||||
|
||||
return sb.Or(conditions...), nil
|
||||
}
|
||||
|
||||
|
||||
@@ -85,27 +85,16 @@ func (t *telemetryMetaStore) enrichJSONKeys(ctx context.Context, selectors []*te
|
||||
key.Indexes = indexes[key.Name]
|
||||
}
|
||||
|
||||
// build JSON access plans using the pre-fetched parent type cache
|
||||
return t.buildJSONPlans(filteredKeys, parentTypeCache)
|
||||
}
|
||||
|
||||
// buildJSONPlans builds JSON access plans for the given keys
|
||||
// using the provided parent type cache (pre-fetched in the main UNION query).
|
||||
func (t *telemetryMetaStore) buildJSONPlans(keys []*telemetrytypes.TelemetryFieldKey, typeCache map[string][]telemetrytypes.FieldDataType) error {
|
||||
if len(keys) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// attach a lazy PlanBuilder so field_mapper can build plans on demand
|
||||
columnMeta := t.jsonColumnMetadata[telemetrytypes.SignalLogs][telemetrytypes.FieldContextBody]
|
||||
for _, key := range keys {
|
||||
if err := key.SetJSONAccessPlan(columnMeta, typeCache); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, key := range filteredKeys {
|
||||
key.PlanBuilder = telemetrytypes.NewFieldPlanBuilder(key, columnMeta, parentTypeCache)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
func (t *telemetryMetaStore) getJSONPathIndexes(ctx context.Context, paths ...string) (map[string][]telemetrytypes.TelemetryFieldKeySkipIndex, error) {
|
||||
filteredPaths := []string{}
|
||||
for _, path := range paths {
|
||||
|
||||
@@ -37,7 +37,7 @@ type TelemetryFieldKey struct {
|
||||
FieldContext FieldContext `json:"fieldContext,omitzero"`
|
||||
FieldDataType FieldDataType `json:"fieldDataType,omitzero"`
|
||||
|
||||
JSONPlan JSONAccessPlan `json:"-"`
|
||||
PlanBuilder *FieldPlanBuilder `json:"-"`
|
||||
Indexes []TelemetryFieldKeySkipIndex `json:"-"`
|
||||
Materialized bool `json:"-"` // refers to promoted in case of body.... fields
|
||||
|
||||
@@ -120,7 +120,7 @@ func (f *TelemetryFieldKey) OverrideMetadataFrom(src *TelemetryFieldKey) {
|
||||
f.FieldDataType = src.FieldDataType
|
||||
f.Indexes = src.Indexes
|
||||
f.Materialized = src.Materialized
|
||||
f.JSONPlan = src.JSONPlan
|
||||
f.PlanBuilder = src.PlanBuilder
|
||||
f.Evolutions = src.Evolutions
|
||||
}
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"slices"
|
||||
"strings"
|
||||
|
||||
schemamigrator "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
|
||||
"github.com/SigNoz/signoz-otel-collector/exporter/jsontypeexporter"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
@@ -28,8 +29,6 @@ type JSONColumnMetadata struct {
|
||||
PromotedColumn string
|
||||
}
|
||||
|
||||
type JSONAccessPlan = []*JSONAccessNode
|
||||
|
||||
type TerminalConfig struct {
|
||||
Key *TelemetryFieldKey
|
||||
ElemType JSONDataType
|
||||
@@ -99,23 +98,48 @@ func (n *JSONAccessNode) BranchesInOrder() []JSONAccessBranchType {
|
||||
})
|
||||
}
|
||||
|
||||
type planBuilder struct {
|
||||
// FieldPlanBuilder builds JSON access nodes on demand for a specific field key.
|
||||
// It holds all necessary metadata (key, column info, type cache) and produces
|
||||
// the correct root node per column, dispatching between base and promoted plans
|
||||
// by column name — without any external "is promoted?" flag.
|
||||
type FieldPlanBuilder struct {
|
||||
key *TelemetryFieldKey
|
||||
columnInfo JSONColumnMetadata
|
||||
typeCache map[string][]FieldDataType
|
||||
paths []string // cumulative paths for type cache lookups
|
||||
segments []string // individual path segments for node names
|
||||
isPromoted bool
|
||||
typeCache map[string][]FieldDataType
|
||||
}
|
||||
|
||||
func NewFieldPlanBuilder(key *TelemetryFieldKey, info JSONColumnMetadata, typeCache map[string][]FieldDataType) *FieldPlanBuilder {
|
||||
return &FieldPlanBuilder{key: key, columnInfo: info, typeCache: typeCache}
|
||||
}
|
||||
|
||||
// Build dispatches by column name — called by JSONConditionBuilder in field_mapper.
|
||||
func (b *FieldPlanBuilder) Build(column schemamigrator.Column) (*JSONAccessNode, error) {
|
||||
if column.Name == b.columnInfo.BaseColumn {
|
||||
return b.build(NewRootJSONAccessNode(b.columnInfo.BaseColumn, 32, 0))
|
||||
}
|
||||
return b.build(NewRootJSONAccessNode(b.columnInfo.PromotedColumn, 32, 1024))
|
||||
}
|
||||
|
||||
func (b *FieldPlanBuilder) build(root *JSONAccessNode) (*JSONAccessNode, error) {
|
||||
if b.key.Name == "" {
|
||||
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "path is empty")
|
||||
}
|
||||
b.paths = b.key.ArrayParentPaths()
|
||||
b.segments = b.key.ArrayPathSegments()
|
||||
return b.buildPlan(0, root, false)
|
||||
}
|
||||
|
||||
// buildPlan recursively builds the path plan tree.
|
||||
func (pb *planBuilder) buildPlan(index int, parent *JSONAccessNode, isDynArrChild bool) (*JSONAccessNode, error) {
|
||||
if index >= len(pb.paths) {
|
||||
func (b *FieldPlanBuilder) buildPlan(index int, parent *JSONAccessNode, isDynArrChild bool) (*JSONAccessNode, error) {
|
||||
if index >= len(b.paths) {
|
||||
return nil, errors.NewInvalidInputf(CodePlanIndexOutOfBounds, "index is out of bounds")
|
||||
}
|
||||
|
||||
pathSoFar := pb.paths[index] // cumulative path for type cache lookup
|
||||
segmentName := pb.segments[index] // segment name for node
|
||||
isTerminal := index == len(pb.paths)-1
|
||||
pathSoFar := b.paths[index] // cumulative path for type cache lookup
|
||||
segmentName := b.segments[index] // segment name for node
|
||||
isTerminal := index == len(b.paths)-1
|
||||
|
||||
// Calculate progression parameters based on parent's values
|
||||
var maxTypes, maxPaths int
|
||||
@@ -149,18 +173,18 @@ func (pb *planBuilder) buildPlan(index int, parent *JSONAccessNode, isDynArrChil
|
||||
// Configure terminal if this is the last part
|
||||
if isTerminal {
|
||||
// fielddatatype must not be unspecified else expression can not be generated
|
||||
if pb.key.FieldDataType == FieldDataTypeUnspecified {
|
||||
if b.key.FieldDataType == FieldDataTypeUnspecified {
|
||||
return nil, errors.NewInternalf(CodePlanFieldDataTypeMissing, "field data type is missing for path %s", pathSoFar)
|
||||
}
|
||||
|
||||
node.TerminalConfig = &TerminalConfig{
|
||||
Key: pb.key,
|
||||
ElemType: pb.key.GetJSONDataType(),
|
||||
Key: b.key,
|
||||
ElemType: b.key.GetJSONDataType(),
|
||||
}
|
||||
} else {
|
||||
var err error
|
||||
// Use cached types from the batched metadata query
|
||||
types, ok := pb.typeCache[pathSoFar]
|
||||
types, ok := b.typeCache[pathSoFar]
|
||||
if !ok {
|
||||
return nil, errors.NewInternalf(errors.CodeInvalidInput, "types missing for path %s", pathSoFar)
|
||||
}
|
||||
@@ -172,13 +196,13 @@ func (pb *planBuilder) buildPlan(index int, parent *JSONAccessNode, isDynArrChil
|
||||
}
|
||||
|
||||
if hasJSON {
|
||||
node.Branches[BranchJSON], err = pb.buildPlan(index+1, node, false)
|
||||
node.Branches[BranchJSON], err = b.buildPlan(index+1, node, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if hasDynamic {
|
||||
node.Branches[BranchDynamic], err = pb.buildPlan(index+1, node, true)
|
||||
node.Branches[BranchDynamic], err = b.buildPlan(index+1, node, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -187,45 +211,3 @@ func (pb *planBuilder) buildPlan(index int, parent *JSONAccessNode, isDynArrChil
|
||||
|
||||
return node, nil
|
||||
}
|
||||
|
||||
// buildJSONAccessPlan builds a tree structure representing the complete JSON path traversal
|
||||
// that precomputes all possible branches and their types.
|
||||
func (key *TelemetryFieldKey) SetJSONAccessPlan(columnInfo JSONColumnMetadata, typeCache map[string][]FieldDataType,
|
||||
) error {
|
||||
// if path is empty, return nil
|
||||
if key.Name == "" {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput, "path is empty")
|
||||
}
|
||||
|
||||
pb := &planBuilder{
|
||||
key: key,
|
||||
paths: key.ArrayParentPaths(),
|
||||
segments: key.ArrayPathSegments(),
|
||||
isPromoted: key.Materialized,
|
||||
typeCache: typeCache,
|
||||
}
|
||||
|
||||
node, err := pb.buildPlan(0,
|
||||
NewRootJSONAccessNode(columnInfo.BaseColumn,
|
||||
32, 0),
|
||||
false,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
key.JSONPlan = append(key.JSONPlan, node)
|
||||
|
||||
if pb.isPromoted {
|
||||
node, err := pb.buildPlan(0,
|
||||
NewRootJSONAccessNode(columnInfo.PromotedColumn,
|
||||
32, 1024),
|
||||
true,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
key.JSONPlan = append(key.JSONPlan, node)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user