mirror of
https://github.com/SigNoz/signoz.git
synced 2026-04-01 18:10:21 +01:00
Compare commits
20 Commits
refactor/c
...
fix/array-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e7f4a04b36 | ||
|
|
0687634da3 | ||
|
|
7e7732243e | ||
|
|
2f952e402f | ||
|
|
a12febca4a | ||
|
|
cb71c9c3f7 | ||
|
|
1cd4ce6509 | ||
|
|
9299c8ab18 | ||
|
|
24749de269 | ||
|
|
39098ec3f4 | ||
|
|
fe554f5c94 | ||
|
|
8a60a041a6 | ||
|
|
541f19c34a | ||
|
|
010db03d6e | ||
|
|
5408acbd8c | ||
|
|
0de6c85f81 | ||
|
|
69ec24fa05 | ||
|
|
539d732b65 | ||
|
|
843d5fb199 | ||
|
|
fabdfb8cc1 |
@@ -277,6 +277,18 @@ func DataTypeCollisionHandledFieldName(key *telemetrytypes.TelemetryFieldKey, va
|
|||||||
tblFieldName, value = castString(tblFieldName), toStrings(v)
|
tblFieldName, value = castString(tblFieldName), toStrings(v)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
case telemetrytypes.FieldDataTypeArrayDynamic:
|
||||||
|
switch v := value.(type) {
|
||||||
|
case string:
|
||||||
|
tblFieldName = castString(tblFieldName)
|
||||||
|
case float32, float64, int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64:
|
||||||
|
tblFieldName = accurateCastFloat(tblFieldName)
|
||||||
|
case bool:
|
||||||
|
tblFieldName = castBool(tblFieldName)
|
||||||
|
case []any:
|
||||||
|
// dynamic array elements will be default casted to string
|
||||||
|
tblFieldName, value = castString(tblFieldName), toStrings(v)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return tblFieldName, value
|
return tblFieldName, value
|
||||||
}
|
}
|
||||||
@@ -284,6 +296,10 @@ func DataTypeCollisionHandledFieldName(key *telemetrytypes.TelemetryFieldKey, va
|
|||||||
func castFloat(col string) string { return fmt.Sprintf("toFloat64OrNull(%s)", col) }
|
func castFloat(col string) string { return fmt.Sprintf("toFloat64OrNull(%s)", col) }
|
||||||
func castFloatHack(col string) string { return fmt.Sprintf("toFloat64(%s)", col) }
|
func castFloatHack(col string) string { return fmt.Sprintf("toFloat64(%s)", col) }
|
||||||
func castString(col string) string { return fmt.Sprintf("toString(%s)", col) }
|
func castString(col string) string { return fmt.Sprintf("toString(%s)", col) }
|
||||||
|
func castBool(col string) string { return fmt.Sprintf("accurateCastOrNull(%s, 'Bool')", col) }
|
||||||
|
func accurateCastFloat(col string) string {
|
||||||
|
return fmt.Sprintf("accurateCastOrNull(%s, 'Float64')", col)
|
||||||
|
}
|
||||||
|
|
||||||
func allFloats(in []any) bool {
|
func allFloats(in []any) bool {
|
||||||
for _, x := range in {
|
for _, x := range in {
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package telemetrylogs
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"slices"
|
"slices"
|
||||||
|
"strings"
|
||||||
|
|
||||||
schemamigrator "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
|
schemamigrator "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
|
||||||
"github.com/SigNoz/signoz/pkg/errors"
|
"github.com/SigNoz/signoz/pkg/errors"
|
||||||
@@ -33,182 +34,34 @@ func NewJSONConditionBuilder(key *telemetrytypes.TelemetryFieldKey, valueType te
|
|||||||
|
|
||||||
// BuildCondition builds the full WHERE condition for body_v2 JSON paths.
|
// BuildCondition builds the full WHERE condition for body_v2 JSON paths.
|
||||||
func (c *jsonConditionBuilder) buildJSONCondition(operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
|
func (c *jsonConditionBuilder) buildJSONCondition(operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
|
||||||
conditions := []string{}
|
baseCond, err := c.emitPlannedCondition(operator, value, sb)
|
||||||
for _, node := range c.key.JSONPlan {
|
if err != nil {
|
||||||
condition, err := c.emitPlannedCondition(node, operator, value, sb)
|
return "", err
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
conditions = append(conditions, condition)
|
|
||||||
}
|
}
|
||||||
baseCond := sb.Or(conditions...)
|
|
||||||
|
|
||||||
// path index
|
// path index
|
||||||
if operator.AddDefaultExistsFilter() {
|
if operator.AddDefaultExistsFilter() {
|
||||||
pathIndex := fmt.Sprintf(`has(%s, '%s')`, schemamigrator.JSONPathsIndexExpr(LogsV2BodyV2Column), c.key.ArrayParentPaths()[0])
|
pathIndex := fmt.Sprintf(`has(%s, '%s')`, schemamigrator.JSONPathsIndexExpr(LogsV2BodyV2Column), c.key.ArrayParentPaths()[0])
|
||||||
return sb.And(baseCond, pathIndex), nil
|
return sb.And(baseCond, pathIndex), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return baseCond, nil
|
return baseCond, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// emitPlannedCondition handles paths with array traversal.
|
func (c *jsonConditionBuilder) emitPlannedCondition(operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
|
||||||
func (c *jsonConditionBuilder) emitPlannedCondition(node *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
|
|
||||||
// Build traversal + terminal recursively per-hop
|
// Build traversal + terminal recursively per-hop
|
||||||
compiled, err := c.recurseArrayHops(node, operator, value, sb)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
return compiled, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// buildTerminalCondition creates the innermost condition.
|
|
||||||
func (c *jsonConditionBuilder) buildTerminalCondition(node *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
|
|
||||||
if node.TerminalConfig.ElemType.IsArray {
|
|
||||||
conditions := []string{}
|
|
||||||
// if the value type is not an array
|
|
||||||
// TODO(piyush): Confirm the Query built for Array case and add testcases for it later
|
|
||||||
if !c.valueType.IsArray {
|
|
||||||
// if operator is a String search Operator, then we need to build one more String comparison condition along with the Strict match condition
|
|
||||||
if operator.IsStringSearchOperator() {
|
|
||||||
formattedValue := querybuilder.FormatValueForContains(value)
|
|
||||||
arrayCond, err := c.buildArrayMembershipCondition(node, operator, formattedValue, sb)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
conditions = append(conditions, arrayCond)
|
|
||||||
}
|
|
||||||
|
|
||||||
// switch operator for array membership checks
|
|
||||||
switch operator {
|
|
||||||
case qbtypes.FilterOperatorContains:
|
|
||||||
operator = qbtypes.FilterOperatorEqual
|
|
||||||
case qbtypes.FilterOperatorNotContains:
|
|
||||||
operator = qbtypes.FilterOperatorNotEqual
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
arrayCond, err := c.buildArrayMembershipCondition(node, operator, value, sb)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
conditions = append(conditions, arrayCond)
|
|
||||||
// or the conditions together
|
|
||||||
return sb.Or(conditions...), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return c.buildPrimitiveTerminalCondition(node, operator, value, sb)
|
|
||||||
}
|
|
||||||
|
|
||||||
// buildPrimitiveTerminalCondition builds the condition if the terminal node is a primitive type
|
|
||||||
// it handles the data type collisions and utilizes indexes for the condition if available.
|
|
||||||
func (c *jsonConditionBuilder) buildPrimitiveTerminalCondition(node *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
|
|
||||||
fieldPath := node.FieldPath()
|
|
||||||
conditions := []string{}
|
conditions := []string{}
|
||||||
var formattedValue = value
|
for _, node := range c.key.JSONPlan {
|
||||||
if operator.IsStringSearchOperator() {
|
condition, err := c.recurseArrayHops(node, operator, value, sb)
|
||||||
formattedValue = querybuilder.FormatValueForContains(value)
|
|
||||||
}
|
|
||||||
|
|
||||||
elemType := node.TerminalConfig.ElemType
|
|
||||||
fieldExpr := fmt.Sprintf("dynamicElement(%s, '%s')", fieldPath, elemType.StringValue())
|
|
||||||
fieldExpr, formattedValue = querybuilder.DataTypeCollisionHandledFieldName(node.TerminalConfig.Key, formattedValue, fieldExpr, operator)
|
|
||||||
|
|
||||||
// utilize indexes for the condition if available
|
|
||||||
indexed := slices.ContainsFunc(node.TerminalConfig.Key.Indexes, func(index telemetrytypes.JSONDataTypeIndex) bool {
|
|
||||||
return index.Type == elemType && index.ColumnExpression == fieldPath
|
|
||||||
})
|
|
||||||
if elemType.IndexSupported && indexed {
|
|
||||||
indexedExpr := assumeNotNull(fieldPath, elemType)
|
|
||||||
emptyValue := func() any {
|
|
||||||
switch elemType {
|
|
||||||
case telemetrytypes.String:
|
|
||||||
return ""
|
|
||||||
case telemetrytypes.Int64, telemetrytypes.Float64, telemetrytypes.Bool:
|
|
||||||
return 0
|
|
||||||
default:
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// switch the operator and value for exists and not exists
|
|
||||||
switch operator {
|
|
||||||
case qbtypes.FilterOperatorExists:
|
|
||||||
operator = qbtypes.FilterOperatorNotEqual
|
|
||||||
value = emptyValue
|
|
||||||
case qbtypes.FilterOperatorNotExists:
|
|
||||||
operator = qbtypes.FilterOperatorEqual
|
|
||||||
value = emptyValue
|
|
||||||
default:
|
|
||||||
// do nothing
|
|
||||||
}
|
|
||||||
|
|
||||||
indexedExpr, indexedComparisonValue := querybuilder.DataTypeCollisionHandledFieldName(node.TerminalConfig.Key, formattedValue, indexedExpr, operator)
|
|
||||||
cond, err := c.applyOperator(sb, indexedExpr, operator, indexedComparisonValue)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
conditions = append(conditions, condition)
|
||||||
// if qb has a definitive value, we can skip adding a condition to
|
|
||||||
// check the existence of the path in the json column
|
|
||||||
if value != emptyValue {
|
|
||||||
return cond, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
conditions = append(conditions, cond)
|
|
||||||
// Switch operator to EXISTS since indexed paths on assumedNotNull, indexes will always have a default value
|
|
||||||
// So we flip the operator to Exists and filter the rows that actually have the value
|
|
||||||
operator = qbtypes.FilterOperatorExists
|
|
||||||
}
|
}
|
||||||
|
|
||||||
cond, err := c.applyOperator(sb, fieldExpr, operator, formattedValue)
|
return sb.Or(conditions...), nil
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
conditions = append(conditions, cond)
|
|
||||||
if len(conditions) > 1 {
|
|
||||||
return sb.And(conditions...), nil
|
|
||||||
}
|
|
||||||
return conditions[0], nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// buildArrayMembershipCondition handles array membership checks.
|
|
||||||
func (c *jsonConditionBuilder) buildArrayMembershipCondition(node *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
|
|
||||||
arrayPath := node.FieldPath()
|
|
||||||
localKeyCopy := *node.TerminalConfig.Key
|
|
||||||
// create typed array out of a dynamic array
|
|
||||||
filteredDynamicExpr := func() string {
|
|
||||||
// Change the field data type from []dynamic to the value type
|
|
||||||
// since we've filtered the value type out of the dynamic array, we need to change the field data corresponding to the value type
|
|
||||||
localKeyCopy.FieldDataType = telemetrytypes.MappingJSONDataTypeToFieldDataType[telemetrytypes.ScalerTypeToArrayType[c.valueType]]
|
|
||||||
|
|
||||||
baseArrayDynamicExpr := fmt.Sprintf("dynamicElement(%s, 'Array(Dynamic)')", arrayPath)
|
|
||||||
return fmt.Sprintf("arrayMap(x->dynamicElement(x, '%s'), arrayFilter(x->(dynamicType(x) = '%s'), %s))",
|
|
||||||
c.valueType.StringValue(),
|
|
||||||
c.valueType.StringValue(),
|
|
||||||
baseArrayDynamicExpr)
|
|
||||||
}
|
|
||||||
typedArrayExpr := func() string {
|
|
||||||
return fmt.Sprintf("dynamicElement(%s, '%s')", arrayPath, node.TerminalConfig.ElemType.StringValue())
|
|
||||||
}
|
|
||||||
|
|
||||||
var arrayExpr string
|
|
||||||
if node.TerminalConfig.ElemType == telemetrytypes.ArrayDynamic {
|
|
||||||
arrayExpr = filteredDynamicExpr()
|
|
||||||
} else {
|
|
||||||
arrayExpr = typedArrayExpr()
|
|
||||||
}
|
|
||||||
|
|
||||||
key := "x"
|
|
||||||
fieldExpr, value := querybuilder.DataTypeCollisionHandledFieldName(&localKeyCopy, value, key, operator)
|
|
||||||
op, err := c.applyOperator(sb, fieldExpr, operator, value)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
return fmt.Sprintf("arrayExists(%s -> %s, %s)", key, op, arrayExpr), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// recurseArrayHops recursively builds array traversal conditions.
|
|
||||||
func (c *jsonConditionBuilder) recurseArrayHops(current *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
|
func (c *jsonConditionBuilder) recurseArrayHops(current *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
|
||||||
if current == nil {
|
if current == nil {
|
||||||
return "", errors.NewInternalf(CodeArrayNavigationFailed, "navigation failed, current node is nil")
|
return "", errors.NewInternalf(CodeArrayNavigationFailed, "navigation failed, current node is nil")
|
||||||
@@ -222,6 +75,33 @@ func (c *jsonConditionBuilder) recurseArrayHops(current *telemetrytypes.JSONAcce
|
|||||||
return terminalCond, nil
|
return terminalCond, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// apply NOT at top level arrayExists so that any subsequent arrayExists fails we count it as true (matching log)
|
||||||
|
yes, operator := applyNotCondition(operator)
|
||||||
|
condition, err := c.buildAccessNodeBranches(current, operator, value, sb)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
if yes {
|
||||||
|
return sb.Not(condition), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return condition, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func applyNotCondition(operator qbtypes.FilterOperator) (bool, qbtypes.FilterOperator) {
|
||||||
|
if operator.IsNegativeOperator() {
|
||||||
|
return true, operator.Inverse()
|
||||||
|
}
|
||||||
|
return false, operator
|
||||||
|
}
|
||||||
|
|
||||||
|
// buildAccessNodeBranches builds conditions for each branch of the access node
|
||||||
|
func (c *jsonConditionBuilder) buildAccessNodeBranches(current *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
|
||||||
|
if current == nil {
|
||||||
|
return "", errors.NewInternalf(CodeArrayNavigationFailed, "navigation failed, current node is nil")
|
||||||
|
}
|
||||||
|
|
||||||
currAlias := current.Alias()
|
currAlias := current.Alias()
|
||||||
fieldPath := current.FieldPath()
|
fieldPath := current.FieldPath()
|
||||||
// Determine availability of Array(JSON) and Array(Dynamic) at this hop
|
// Determine availability of Array(JSON) and Array(Dynamic) at this hop
|
||||||
@@ -256,6 +136,200 @@ func (c *jsonConditionBuilder) recurseArrayHops(current *telemetrytypes.JSONAcce
|
|||||||
return sb.Or(branches...), nil
|
return sb.Or(branches...), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// buildTerminalCondition creates the innermost condition
|
||||||
|
func (c *jsonConditionBuilder) buildTerminalCondition(node *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
|
||||||
|
if node.TerminalConfig.ElemType.IsArray {
|
||||||
|
// Note: here applyNotCondition will return true only if; top level path is an array; and operator is a negative operator
|
||||||
|
// Otherwise this code will be triggered by buildAccessNodeBranches; Where operator would've been already inverted if needed.
|
||||||
|
yes, operator := applyNotCondition(operator)
|
||||||
|
cond, err := c.buildTerminalArrayCondition(node, operator, value, sb)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
if yes {
|
||||||
|
return sb.Not(cond), nil
|
||||||
|
}
|
||||||
|
return cond, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.buildPrimitiveTerminalCondition(node, operator, value, sb)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getEmptyValue(elemType telemetrytypes.JSONDataType) any {
|
||||||
|
switch elemType {
|
||||||
|
case telemetrytypes.String:
|
||||||
|
return ""
|
||||||
|
case telemetrytypes.Int64, telemetrytypes.Float64, telemetrytypes.Bool:
|
||||||
|
return 0
|
||||||
|
default:
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *jsonConditionBuilder) terminalIndexedCondition(node *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
|
||||||
|
fieldPath := node.FieldPath()
|
||||||
|
if strings.Contains(fieldPath, telemetrytypes.ArraySepSuffix) {
|
||||||
|
return "", errors.NewInternalf(CodeArrayNavigationFailed, "can not build index condition for array field %s", fieldPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
elemType := node.TerminalConfig.ElemType
|
||||||
|
dynamicExpr := fmt.Sprintf("dynamicElement(%s, '%s')", fieldPath, elemType.StringValue())
|
||||||
|
indexedExpr := assumeNotNull(dynamicExpr)
|
||||||
|
|
||||||
|
// switch the operator and value for exists and not exists
|
||||||
|
switch operator {
|
||||||
|
case qbtypes.FilterOperatorExists:
|
||||||
|
operator = qbtypes.FilterOperatorNotEqual
|
||||||
|
value = getEmptyValue(elemType)
|
||||||
|
case qbtypes.FilterOperatorNotExists:
|
||||||
|
operator = qbtypes.FilterOperatorEqual
|
||||||
|
value = getEmptyValue(elemType)
|
||||||
|
default:
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
|
||||||
|
indexedExpr, formattedValue := querybuilder.DataTypeCollisionHandledFieldName(node.TerminalConfig.Key, value, indexedExpr, operator)
|
||||||
|
cond, err := c.applyOperator(sb, indexedExpr, operator, formattedValue)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
return cond, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// buildPrimitiveTerminalCondition builds the condition if the terminal node is a primitive type
|
||||||
|
// it handles the data type collisions and utilizes indexes for the condition if available
|
||||||
|
func (c *jsonConditionBuilder) buildPrimitiveTerminalCondition(node *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
|
||||||
|
fieldPath := node.FieldPath()
|
||||||
|
conditions := []string{}
|
||||||
|
|
||||||
|
// utilize indexes for the condition if available
|
||||||
|
//
|
||||||
|
// Note: Indexing code doesn't get executed for Array Nested fields because they can not be indexed
|
||||||
|
indexed := slices.ContainsFunc(node.TerminalConfig.Key.Indexes, func(index telemetrytypes.JSONDataTypeIndex) bool {
|
||||||
|
return index.Type == node.TerminalConfig.ElemType
|
||||||
|
})
|
||||||
|
if node.TerminalConfig.ElemType.IndexSupported && indexed {
|
||||||
|
indexCond, err := c.terminalIndexedCondition(node, operator, value, sb)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
// if qb has a definitive value, we can skip adding a condition to
|
||||||
|
// check the existence of the path in the json column
|
||||||
|
if value != nil && value != getEmptyValue(node.TerminalConfig.ElemType) {
|
||||||
|
return indexCond, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
conditions = append(conditions, indexCond)
|
||||||
|
|
||||||
|
// Switch operator to EXISTS except when operator is NOT EXISTS since
|
||||||
|
// indexed paths on assumedNotNull, indexes will always have a default
|
||||||
|
// value so we flip the operator to Exists and filter the rows that
|
||||||
|
// actually have the value
|
||||||
|
if operator != qbtypes.FilterOperatorNotExists {
|
||||||
|
operator = qbtypes.FilterOperatorExists
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var formattedValue any = value
|
||||||
|
if operator.IsStringSearchOperator() {
|
||||||
|
formattedValue = querybuilder.FormatValueForContains(value)
|
||||||
|
}
|
||||||
|
|
||||||
|
fieldExpr := fmt.Sprintf("dynamicElement(%s, '%s')", fieldPath, node.TerminalConfig.ElemType.StringValue())
|
||||||
|
|
||||||
|
// if operator is negative and has a value comparison i.e. excluding EXISTS and NOT EXISTS, we need to assume that the field exists everywhere
|
||||||
|
//
|
||||||
|
// Note: here applyNotCondition will return true only if; top level path is being queried and operator is a negative operator
|
||||||
|
// Otherwise this code will be triggered by buildAccessNodeBranches; Where operator would've been already inverted if needed.
|
||||||
|
if node.IsNonNestedPath() {
|
||||||
|
yes, _ := applyNotCondition(operator)
|
||||||
|
if yes {
|
||||||
|
switch operator {
|
||||||
|
case qbtypes.FilterOperatorNotExists:
|
||||||
|
// skip
|
||||||
|
default:
|
||||||
|
fieldExpr = assumeNotNull(fieldExpr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fieldExpr, formattedValue = querybuilder.DataTypeCollisionHandledFieldName(node.TerminalConfig.Key, formattedValue, fieldExpr, operator)
|
||||||
|
cond, err := c.applyOperator(sb, fieldExpr, operator, formattedValue)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
conditions = append(conditions, cond)
|
||||||
|
if len(conditions) > 1 {
|
||||||
|
return sb.And(conditions...), nil
|
||||||
|
}
|
||||||
|
return conditions[0], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *jsonConditionBuilder) buildTerminalArrayCondition(node *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
|
||||||
|
conditions := []string{}
|
||||||
|
// if operator is a String search Operator, then we need to build one more String comparison condition along with the Strict match condition
|
||||||
|
if operator.IsStringSearchOperator() {
|
||||||
|
formattedValue := querybuilder.FormatValueForContains(value)
|
||||||
|
arrayCond, err := c.buildArrayMembershipCondition(node, operator, formattedValue, sb)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
conditions = append(conditions, arrayCond)
|
||||||
|
|
||||||
|
// switch operator for array membership checks
|
||||||
|
switch operator {
|
||||||
|
case qbtypes.FilterOperatorContains:
|
||||||
|
operator = qbtypes.FilterOperatorEqual
|
||||||
|
case qbtypes.FilterOperatorNotContains:
|
||||||
|
operator = qbtypes.FilterOperatorNotEqual
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
arrayCond, err := c.buildArrayMembershipCondition(node, operator, value, sb)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
conditions = append(conditions, arrayCond)
|
||||||
|
if len(conditions) > 1 {
|
||||||
|
return sb.Or(conditions...), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return conditions[0], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// buildArrayMembershipCondition builds condition of the part where Arrays becomes primitive typed Arrays
|
||||||
|
// e.g. [300, 404, 500], and value operations will work on the array elements
|
||||||
|
func (c *jsonConditionBuilder) buildArrayMembershipCondition(node *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
|
||||||
|
arrayPath := node.FieldPath()
|
||||||
|
// create typed array out of a dynamic array
|
||||||
|
filteredDynamicExpr := func() string {
|
||||||
|
baseArrayDynamicExpr := fmt.Sprintf("dynamicElement(%s, 'Array(Dynamic)')", arrayPath)
|
||||||
|
return fmt.Sprintf("arrayFilter(x->(dynamicType(x) IN ('String', 'Int64', 'Float64', 'Bool')), %s)",
|
||||||
|
baseArrayDynamicExpr)
|
||||||
|
}
|
||||||
|
typedArrayExpr := func() string {
|
||||||
|
return fmt.Sprintf("dynamicElement(%s, '%s')", arrayPath, node.TerminalConfig.ElemType.StringValue())
|
||||||
|
}
|
||||||
|
|
||||||
|
var arrayExpr string
|
||||||
|
if node.TerminalConfig.ElemType == telemetrytypes.ArrayDynamic {
|
||||||
|
arrayExpr = filteredDynamicExpr()
|
||||||
|
} else {
|
||||||
|
arrayExpr = typedArrayExpr()
|
||||||
|
}
|
||||||
|
|
||||||
|
key := "x"
|
||||||
|
fieldExpr, value := querybuilder.DataTypeCollisionHandledFieldName(node.TerminalConfig.Key, value, key, operator)
|
||||||
|
op, err := c.applyOperator(sb, fieldExpr, operator, value)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Sprintf("arrayExists(%s -> %s, %s)", key, op, arrayExpr), nil
|
||||||
|
}
|
||||||
|
|
||||||
func (c *jsonConditionBuilder) applyOperator(sb *sqlbuilder.SelectBuilder, fieldExpr string, operator qbtypes.FilterOperator, value any) (string, error) {
|
func (c *jsonConditionBuilder) applyOperator(sb *sqlbuilder.SelectBuilder, fieldExpr string, operator qbtypes.FilterOperator, value any) (string, error) {
|
||||||
switch operator {
|
switch operator {
|
||||||
case qbtypes.FilterOperatorEqual:
|
case qbtypes.FilterOperatorEqual:
|
||||||
@@ -317,6 +391,6 @@ func (c *jsonConditionBuilder) applyOperator(sb *sqlbuilder.SelectBuilder, field
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func assumeNotNull(column string, elemType telemetrytypes.JSONDataType) string {
|
func assumeNotNull(fieldExpr string) string {
|
||||||
return fmt.Sprintf("assumeNotNull(dynamicElement(%s, '%s'))", column, elemType.StringValue())
|
return fmt.Sprintf("assumeNotNull(%s)", fieldExpr)
|
||||||
}
|
}
|
||||||
|
|||||||
File diff suppressed because one or more lines are too long
@@ -113,6 +113,29 @@ const (
|
|||||||
FilterOperatorNotContains
|
FilterOperatorNotContains
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var operatorInverseMapping = map[FilterOperator]FilterOperator{
|
||||||
|
FilterOperatorEqual: FilterOperatorNotEqual,
|
||||||
|
FilterOperatorNotEqual: FilterOperatorEqual,
|
||||||
|
FilterOperatorGreaterThan: FilterOperatorLessThanOrEq,
|
||||||
|
FilterOperatorGreaterThanOrEq: FilterOperatorLessThan,
|
||||||
|
FilterOperatorLessThan: FilterOperatorGreaterThanOrEq,
|
||||||
|
FilterOperatorLessThanOrEq: FilterOperatorGreaterThan,
|
||||||
|
FilterOperatorLike: FilterOperatorNotLike,
|
||||||
|
FilterOperatorNotLike: FilterOperatorLike,
|
||||||
|
FilterOperatorILike: FilterOperatorNotILike,
|
||||||
|
FilterOperatorNotILike: FilterOperatorILike,
|
||||||
|
FilterOperatorBetween: FilterOperatorNotBetween,
|
||||||
|
FilterOperatorNotBetween: FilterOperatorBetween,
|
||||||
|
FilterOperatorIn: FilterOperatorNotIn,
|
||||||
|
FilterOperatorNotIn: FilterOperatorIn,
|
||||||
|
FilterOperatorExists: FilterOperatorNotExists,
|
||||||
|
FilterOperatorNotExists: FilterOperatorExists,
|
||||||
|
FilterOperatorRegexp: FilterOperatorNotRegexp,
|
||||||
|
FilterOperatorNotRegexp: FilterOperatorRegexp,
|
||||||
|
FilterOperatorContains: FilterOperatorNotContains,
|
||||||
|
FilterOperatorNotContains: FilterOperatorContains,
|
||||||
|
}
|
||||||
|
|
||||||
// AddDefaultExistsFilter returns true if addl exists filter should be added to the query
|
// AddDefaultExistsFilter returns true if addl exists filter should be added to the query
|
||||||
// For the negative predicates, we don't want to add the exists filter. Why?
|
// For the negative predicates, we don't want to add the exists filter. Why?
|
||||||
// Say for example, user adds a filter `service.name != "redis"`, we can't interpret it
|
// Say for example, user adds a filter `service.name != "redis"`, we can't interpret it
|
||||||
@@ -162,6 +185,10 @@ func (f FilterOperator) IsNegativeOperator() bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f FilterOperator) Inverse() FilterOperator {
|
||||||
|
return operatorInverseMapping[f]
|
||||||
|
}
|
||||||
|
|
||||||
func (f FilterOperator) IsComparisonOperator() bool {
|
func (f FilterOperator) IsComparisonOperator() bool {
|
||||||
switch f {
|
switch f {
|
||||||
case FilterOperatorGreaterThan, FilterOperatorGreaterThanOrEq, FilterOperatorLessThan, FilterOperatorLessThanOrEq:
|
case FilterOperatorGreaterThan, FilterOperatorGreaterThanOrEq, FilterOperatorLessThan, FilterOperatorLessThanOrEq:
|
||||||
|
|||||||
@@ -90,6 +90,11 @@ func (n *JSONAccessNode) FieldPath() string {
|
|||||||
return n.Parent.Alias() + "." + key
|
return n.Parent.Alias() + "." + key
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Returns true if the current node is a non-nested path
|
||||||
|
func (n *JSONAccessNode) IsNonNestedPath() bool {
|
||||||
|
return !strings.Contains(n.FieldPath(), ArraySep)
|
||||||
|
}
|
||||||
|
|
||||||
func (n *JSONAccessNode) BranchesInOrder() []JSONAccessBranchType {
|
func (n *JSONAccessNode) BranchesInOrder() []JSONAccessBranchType {
|
||||||
return slices.SortedFunc(maps.Keys(n.Branches), func(a, b JSONAccessBranchType) int {
|
return slices.SortedFunc(maps.Keys(n.Branches), func(a, b JSONAccessBranchType) int {
|
||||||
return strings.Compare(b.StringValue(), a.StringValue())
|
return strings.Compare(b.StringValue(), a.StringValue())
|
||||||
|
|||||||
@@ -8,65 +8,101 @@ package telemetrytypes
|
|||||||
// This represents the type information available in the test JSON structure.
|
// This represents the type information available in the test JSON structure.
|
||||||
func TestJSONTypeSet() (map[string][]JSONDataType, MetadataStore) {
|
func TestJSONTypeSet() (map[string][]JSONDataType, MetadataStore) {
|
||||||
types := map[string][]JSONDataType{
|
types := map[string][]JSONDataType{
|
||||||
"user.name": {String},
|
|
||||||
"user.permissions": {ArrayString},
|
// ── user (primitives) ─────────────────────────────────────────────
|
||||||
"user.age": {Int64, String},
|
"user.name": {String},
|
||||||
"user.height": {Float64},
|
"user.permissions": {ArrayString},
|
||||||
"education": {ArrayJSON},
|
"user.age": {Int64, String}, // Int64/String ambiguity
|
||||||
"education[].name": {String},
|
"user.height": {Float64},
|
||||||
"education[].type": {String, Int64},
|
"user.active": {Bool}, // Bool — not IndexSupported
|
||||||
"education[].internal_type": {String},
|
|
||||||
"education[].metadata.location": {String},
|
// Deeper non-array nesting (a.b.c — no array hops)
|
||||||
"education[].parameters": {ArrayFloat64, ArrayDynamic},
|
"user.address.zip": {Int64},
|
||||||
"education[].duration": {String},
|
|
||||||
"education[].mode": {String},
|
// ── education[] ───────────────────────────────────────────────────
|
||||||
"education[].year": {Int64},
|
// Pattern: x[].y
|
||||||
"education[].field": {String},
|
"education": {ArrayJSON},
|
||||||
"education[].awards": {ArrayDynamic, ArrayJSON},
|
"education[].name": {String},
|
||||||
"education[].awards[].name": {String},
|
"education[].type": {String, Int64},
|
||||||
"education[].awards[].rank": {Int64},
|
"education[].year": {Int64},
|
||||||
"education[].awards[].medal": {String},
|
"education[].scores": {ArrayInt64},
|
||||||
"education[].awards[].type": {String},
|
"education[].parameters": {ArrayFloat64, ArrayDynamic},
|
||||||
"education[].awards[].semester": {Int64},
|
|
||||||
"education[].awards[].participated": {ArrayDynamic, ArrayJSON},
|
// Pattern: x[].y[]
|
||||||
"education[].awards[].participated[].type": {String},
|
"education[].awards": {ArrayDynamic, ArrayJSON},
|
||||||
"education[].awards[].participated[].field": {String},
|
|
||||||
"education[].awards[].participated[].project_type": {String},
|
// Pattern: x[].y[].z
|
||||||
"education[].awards[].participated[].project_name": {String},
|
"education[].awards[].name": {String},
|
||||||
"education[].awards[].participated[].race_type": {String},
|
"education[].awards[].type": {String},
|
||||||
"education[].awards[].participated[].team_based": {Bool},
|
"education[].awards[].semester": {Int64},
|
||||||
"education[].awards[].participated[].team_name": {String},
|
|
||||||
"education[].awards[].participated[].team": {ArrayJSON},
|
// Pattern: x[].y[].z[]
|
||||||
"education[].awards[].participated[].members": {ArrayString},
|
"education[].awards[].participated": {ArrayDynamic, ArrayJSON},
|
||||||
"education[].awards[].participated[].team[].name": {String},
|
|
||||||
"education[].awards[].participated[].team[].branch": {String},
|
// Pattern: x[].y[].z[].w
|
||||||
"education[].awards[].participated[].team[].semester": {Int64},
|
"education[].awards[].participated[].members": {ArrayString},
|
||||||
"interests": {ArrayJSON},
|
|
||||||
"interests[].type": {String},
|
// Pattern: x[].y[].z[].w[]
|
||||||
"interests[].entities": {ArrayJSON},
|
"education[].awards[].participated[].team": {ArrayJSON},
|
||||||
"interests[].entities.application_date": {String},
|
|
||||||
"interests[].entities[].reviews": {ArrayJSON},
|
// Pattern: x[].y[].z[].w[].v
|
||||||
"interests[].entities[].reviews[].given_by": {String},
|
"education[].awards[].participated[].team[].branch": {String},
|
||||||
"interests[].entities[].reviews[].remarks": {String},
|
|
||||||
"interests[].entities[].reviews[].weight": {Float64},
|
// ── interests[] ───────────────────────────────────────────────────
|
||||||
"interests[].entities[].reviews[].passed": {Bool},
|
"interests": {ArrayJSON},
|
||||||
"interests[].entities[].reviews[].type": {String},
|
"interests[].entities": {ArrayJSON},
|
||||||
"interests[].entities[].reviews[].analysis_type": {Int64},
|
"interests[].entities[].reviews": {ArrayJSON},
|
||||||
"interests[].entities[].reviews[].entries": {ArrayJSON},
|
"interests[].entities[].reviews[].entries": {ArrayJSON},
|
||||||
"interests[].entities[].reviews[].entries[].subject": {String},
|
"interests[].entities[].reviews[].entries[].metadata": {ArrayJSON},
|
||||||
"interests[].entities[].reviews[].entries[].status": {String},
|
"interests[].entities[].reviews[].entries[].metadata[].positions": {ArrayJSON},
|
||||||
"interests[].entities[].reviews[].entries[].metadata": {ArrayJSON},
|
"interests[].entities[].reviews[].entries[].metadata[].positions[].name": {String},
|
||||||
"interests[].entities[].reviews[].entries[].metadata[].company": {String},
|
"interests[].entities[].reviews[].entries[].metadata[].positions[].ratings": {ArrayInt64, ArrayString},
|
||||||
"interests[].entities[].reviews[].entries[].metadata[].experience": {Int64},
|
"http-events": {ArrayJSON},
|
||||||
"interests[].entities[].reviews[].entries[].metadata[].unit": {String},
|
"http-events[].request-info.host": {String},
|
||||||
"interests[].entities[].reviews[].entries[].metadata[].positions": {ArrayJSON},
|
"ids": {ArrayDynamic},
|
||||||
"interests[].entities[].reviews[].entries[].metadata[].positions[].name": {String},
|
|
||||||
"interests[].entities[].reviews[].entries[].metadata[].positions[].duration": {Int64, Float64},
|
// ── top-level primitives ──────────────────────────────────────────
|
||||||
"interests[].entities[].reviews[].entries[].metadata[].positions[].unit": {String},
|
"message": {String},
|
||||||
"interests[].entities[].reviews[].entries[].metadata[].positions[].ratings": {ArrayInt64, ArrayString},
|
"http-status": {Int64, String}, // hyphen in root key, ambiguous
|
||||||
"message": {String},
|
|
||||||
"tags": {ArrayString},
|
// ── top-level nested objects (no array hops) ───────────────────────
|
||||||
|
"response.time-taken": {Float64}, // hyphen inside nested key
|
||||||
}
|
}
|
||||||
|
|
||||||
return types, nil
|
return types, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestIndexedPathEntry is a path + JSON type pair representing a field
|
||||||
|
// backed by a ClickHouse skip index in the test data.
|
||||||
|
//
|
||||||
|
// Only non-array paths with IndexSupported types (String, Int64, Float64)
|
||||||
|
// are valid entries — arrays and Bool cannot carry a skip index.
|
||||||
|
//
|
||||||
|
// The ColumnExpression for each entry is computed at test-setup time from
|
||||||
|
// the access plan, since it depends on the column name (e.g. body_v2)
|
||||||
|
// which is unknown to this package.
|
||||||
|
type TestIndexedPathEntry struct {
|
||||||
|
Path string
|
||||||
|
Type JSONDataType
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestIndexedPaths lists path+type pairs from TestJSONTypeSet that are
|
||||||
|
// backed by a JSON data type index. Test setup uses this to populate
|
||||||
|
// key.Indexes after calling SetJSONAccessPlan.
|
||||||
|
//
|
||||||
|
// Intentionally excluded:
|
||||||
|
// - user.active → Bool, IndexSupported=false
|
||||||
|
var TestIndexedPaths = []TestIndexedPathEntry{
|
||||||
|
// user primitives
|
||||||
|
{Path: "user.name", Type: String},
|
||||||
|
|
||||||
|
// user.address — deeper non-array nesting
|
||||||
|
{Path: "user.address.zip", Type: Int64},
|
||||||
|
|
||||||
|
// root-level with special characters
|
||||||
|
{Path: "http-status", Type: Int64},
|
||||||
|
{Path: "http-status", Type: String},
|
||||||
|
|
||||||
|
// root-level nested objects (no array hops)
|
||||||
|
{Path: "response.time-taken", Type: Float64},
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user