Compare commits

...

1 Commits

Author SHA1 Message Date
Piyush Singariya
77d72cabd0 feat: router 2026-01-29 13:09:01 +05:30
5 changed files with 582 additions and 0 deletions

View File

@@ -0,0 +1,144 @@
package telemetrylogs
import (
"context"
"fmt"
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
)
// CompositeColumnStrategy handles condition building for composite columns,
// which are logical fields that span multiple physical columns.
// Examples: "body" field that can be body, body_json, or body_json_promoted.
type CompositeColumnStrategy struct {
fm qbtypes.FieldMapper
}
func NewCompositeColumnStrategy(fm qbtypes.FieldMapper) *CompositeColumnStrategy {
return &CompositeColumnStrategy{fm: fm}
}
func (s *CompositeColumnStrategy) CanHandle(ctx context.Context, key *telemetrytypes.TelemetryFieldKey, column *schema.Column) bool {
// Check if this is a composite column by looking it up in the logical field registry
_, exists := logicalFieldRegistry[key.Name]
return exists
}
func (s *CompositeColumnStrategy) BuildCondition(
ctx context.Context,
key *telemetrytypes.TelemetryFieldKey,
operator qbtypes.FilterOperator,
value any,
sb *sqlbuilder.SelectBuilder,
) (string, error) {
metadata, exists := logicalFieldRegistry[key.Name]
if !exists {
// Should not happen if CanHandle returned true, but handle gracefully
return "", qbtypes.ErrColumnNotFound
}
// Check if there's a custom condition handler for this operator
if metadata.Conditions != nil {
if handler, ok := metadata.Conditions[operator]; ok {
return handler(sb, value)
}
}
// Fallback: use the expression from the logical field and apply standard operators
expr := metadata.Expr()
// Format value for string search operators
if operator.IsStringSearchOperator() {
value = querybuilder.FormatValueForContains(value)
}
// Apply standard operators using the logical field expression
return s.buildConditionForExpression(sb, expr, operator, value)
}
// buildConditionForExpression builds conditions for a given expression string.
// This is used by composite columns that have their own expressions.
func (s *CompositeColumnStrategy) buildConditionForExpression(
sb *sqlbuilder.SelectBuilder,
expr string,
operator qbtypes.FilterOperator,
value any,
) (string, error) {
switch operator {
case qbtypes.FilterOperatorEqual:
return sb.E(expr, value), nil
case qbtypes.FilterOperatorNotEqual:
return sb.NE(expr, value), nil
case qbtypes.FilterOperatorGreaterThan:
return sb.G(expr, value), nil
case qbtypes.FilterOperatorGreaterThanOrEq:
return sb.GE(expr, value), nil
case qbtypes.FilterOperatorLessThan:
return sb.LT(expr, value), nil
case qbtypes.FilterOperatorLessThanOrEq:
return sb.LE(expr, value), nil
case qbtypes.FilterOperatorLike:
return sb.Like(expr, value), nil
case qbtypes.FilterOperatorNotLike:
return sb.NotLike(expr, value), nil
case qbtypes.FilterOperatorILike:
return sb.ILike(expr, value), nil
case qbtypes.FilterOperatorNotILike:
return sb.NotILike(expr, value), nil
case qbtypes.FilterOperatorContains:
return sb.ILike(expr, fmt.Sprintf("%%%s%%", value)), nil
case qbtypes.FilterOperatorNotContains:
return sb.NotILike(expr, fmt.Sprintf("%%%s%%", value)), nil
case qbtypes.FilterOperatorRegexp:
return fmt.Sprintf(`match(%s, %s)`, sqlbuilder.Escape(expr), sb.Var(value)), nil
case qbtypes.FilterOperatorNotRegexp:
return fmt.Sprintf(`NOT match(%s, %s)`, sqlbuilder.Escape(expr), sb.Var(value)), nil
case qbtypes.FilterOperatorBetween:
values, ok := value.([]any)
if !ok || len(values) != 2 {
return "", qbtypes.ErrBetweenValues
}
return sb.Between(expr, values[0], values[1]), nil
case qbtypes.FilterOperatorNotBetween:
values, ok := value.([]any)
if !ok || len(values) != 2 {
return "", qbtypes.ErrBetweenValues
}
return sb.NotBetween(expr, values[0], values[1]), nil
case qbtypes.FilterOperatorIn:
values, ok := value.([]any)
if !ok {
return "", qbtypes.ErrInValues
}
conditions := []string{}
for _, v := range values {
conditions = append(conditions, sb.E(expr, v))
}
return sb.Or(conditions...), nil
case qbtypes.FilterOperatorNotIn:
values, ok := value.([]any)
if !ok {
return "", qbtypes.ErrInValues
}
conditions := []string{}
for _, v := range values {
conditions = append(conditions, sb.NE(expr, v))
}
return sb.And(conditions...), nil
case qbtypes.FilterOperatorExists, qbtypes.FilterOperatorNotExists:
// For composite columns, exists/not exists should be handled by the custom conditions
// If we reach here, it means no custom handler was provided
// We'll use a simple IS NOT NULL check on the expression
if operator == qbtypes.FilterOperatorExists {
return fmt.Sprintf("%s IS NOT NULL", expr), nil
}
return fmt.Sprintf("%s IS NULL", expr), nil
default:
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported operator: %v", operator)
}
}

View File

@@ -0,0 +1,72 @@
package telemetrylogs
import (
"context"
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
)
// ConditionStrategy defines the interface for building conditions.
// Different strategies handle different data access patterns:
// - DirectExpressionStrategy: Physical columns, map expressions, simple JSON paths
// - LambdaExpressionStrategy: JSON paths with array traversal requiring nested lambdas
// - CompositeColumnStrategy: Logical fields spanning multiple physical columns
type ConditionStrategy interface {
// BuildCondition builds a SQL condition for the given key, operator, and value.
BuildCondition(
ctx context.Context,
key *telemetrytypes.TelemetryFieldKey,
operator qbtypes.FilterOperator,
value any,
sb *sqlbuilder.SelectBuilder,
) (string, error)
// CanHandle returns true if this strategy can handle the given field key and column.
CanHandle(ctx context.Context, key *telemetrytypes.TelemetryFieldKey, column *schema.Column) bool
}
// ConditionStrategyRouter determines which strategy to use for a given field.
type ConditionStrategyRouter struct {
strategies []ConditionStrategy
fm qbtypes.FieldMapper
}
// NewConditionStrategyRouter creates a new router with all available strategies.
func NewConditionStrategyRouter(fm qbtypes.FieldMapper) *ConditionStrategyRouter {
return &ConditionStrategyRouter{
strategies: []ConditionStrategy{
NewCompositeColumnStrategy(fm),
NewLambdaExpressionStrategy(fm),
NewDirectExpressionStrategy(fm),
},
fm: fm,
}
}
// Route determines which strategy to use and builds the condition.
func (r *ConditionStrategyRouter) Route(
ctx context.Context,
key *telemetrytypes.TelemetryFieldKey,
operator qbtypes.FilterOperator,
value any,
sb *sqlbuilder.SelectBuilder,
) (string, error) {
column, err := r.fm.ColumnFor(ctx, key)
if err != nil {
return "", err
}
// Find the first strategy that can handle this field
for _, strategy := range r.strategies {
if strategy.CanHandle(ctx, key, column) {
return strategy.BuildCondition(ctx, key, operator, value, sb)
}
}
// Fallback to direct expression if no strategy matches
directStrategy := NewDirectExpressionStrategy(r.fm)
return directStrategy.BuildCondition(ctx, key, operator, value, sb)
}

View File

@@ -0,0 +1,221 @@
package telemetrylogs
import (
"context"
"fmt"
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
)
// DirectExpressionStrategy handles condition building for:
// 1. Physical Columns (direct access: severity_text, body, etc.)
// 2. Fields inside Physical Columns -> Expressions (like attribute_string['service.name'])
// 3. Simple JSON paths without array traversal
type DirectExpressionStrategy struct {
fm qbtypes.FieldMapper
}
func NewDirectExpressionStrategy(fm qbtypes.FieldMapper) *DirectExpressionStrategy {
return &DirectExpressionStrategy{fm: fm}
}
func (s *DirectExpressionStrategy) CanHandle(ctx context.Context, key *telemetrytypes.TelemetryFieldKey, column *schema.Column) bool {
// Direct expressions can handle everything except:
// 1. JSON columns with array traversal (handled by LambdaExpressionStrategy)
// 2. Composite columns (handled by CompositeColumnStrategy)
// If it's a JSON column with array traversal, we need lambda
if column.Type.GetType() == schema.ColumnTypeEnumJSON && querybuilder.BodyJSONQueryEnabled {
// Check if this requires array traversal (has JSONPlan with non-terminal nodes)
if len(key.JSONPlan) > 0 && !key.JSONPlan[0].IsTerminal {
return false
}
}
return true
}
func (s *DirectExpressionStrategy) BuildCondition(
ctx context.Context,
key *telemetrytypes.TelemetryFieldKey,
operator qbtypes.FilterOperator,
value any,
sb *sqlbuilder.SelectBuilder,
) (string, error) {
column, err := s.fm.ColumnFor(ctx, key)
if err != nil {
return "", err
}
if operator.IsStringSearchOperator() {
value = querybuilder.FormatValueForContains(value)
}
tblFieldName, err := s.fm.FieldFor(ctx, key)
if err != nil {
return "", err
}
// Check if this is a body JSON search - either by FieldContext
if key.FieldContext == telemetrytypes.FieldContextBody {
tblFieldName, value = GetBodyJSONKey(ctx, key, operator, value)
}
tblFieldName, value = querybuilder.DataTypeCollisionHandledFieldName(key, value, tblFieldName, operator)
// Regular operators
return s.applyOperator(sb, tblFieldName, operator, value, column, key, ctx)
}
func (s *DirectExpressionStrategy) applyOperator(
sb *sqlbuilder.SelectBuilder,
tblFieldName string,
operator qbtypes.FilterOperator,
value any,
column *schema.Column,
key *telemetrytypes.TelemetryFieldKey,
ctx context.Context,
) (string, error) {
switch operator {
case qbtypes.FilterOperatorEqual:
return sb.E(tblFieldName, value), nil
case qbtypes.FilterOperatorNotEqual:
return sb.NE(tblFieldName, value), nil
case qbtypes.FilterOperatorGreaterThan:
return sb.G(tblFieldName, value), nil
case qbtypes.FilterOperatorGreaterThanOrEq:
return sb.GE(tblFieldName, value), nil
case qbtypes.FilterOperatorLessThan:
return sb.LT(tblFieldName, value), nil
case qbtypes.FilterOperatorLessThanOrEq:
return sb.LE(tblFieldName, value), nil
case qbtypes.FilterOperatorLike:
return sb.Like(tblFieldName, value), nil
case qbtypes.FilterOperatorNotLike:
return sb.NotLike(tblFieldName, value), nil
case qbtypes.FilterOperatorILike:
return sb.ILike(tblFieldName, value), nil
case qbtypes.FilterOperatorNotILike:
return sb.NotILike(tblFieldName, value), nil
case qbtypes.FilterOperatorContains:
return sb.ILike(tblFieldName, fmt.Sprintf("%%%s%%", value)), nil
case qbtypes.FilterOperatorNotContains:
return sb.NotILike(tblFieldName, fmt.Sprintf("%%%s%%", value)), nil
case qbtypes.FilterOperatorRegexp:
return fmt.Sprintf(`match(%s, %s)`, sqlbuilder.Escape(tblFieldName), sb.Var(value)), nil
case qbtypes.FilterOperatorNotRegexp:
return fmt.Sprintf(`NOT match(%s, %s)`, sqlbuilder.Escape(tblFieldName), sb.Var(value)), nil
case qbtypes.FilterOperatorBetween:
values, ok := value.([]any)
if !ok || len(values) != 2 {
return "", qbtypes.ErrBetweenValues
}
return sb.Between(tblFieldName, values[0], values[1]), nil
case qbtypes.FilterOperatorNotBetween:
values, ok := value.([]any)
if !ok || len(values) != 2 {
return "", qbtypes.ErrBetweenValues
}
return sb.NotBetween(tblFieldName, values[0], values[1]), nil
case qbtypes.FilterOperatorIn:
values, ok := value.([]any)
if !ok {
return "", qbtypes.ErrInValues
}
conditions := []string{}
for _, v := range values {
conditions = append(conditions, sb.E(tblFieldName, v))
}
return sb.Or(conditions...), nil
case qbtypes.FilterOperatorNotIn:
values, ok := value.([]any)
if !ok {
return "", qbtypes.ErrInValues
}
conditions := []string{}
for _, v := range values {
conditions = append(conditions, sb.NE(tblFieldName, v))
}
return sb.And(conditions...), nil
case qbtypes.FilterOperatorExists, qbtypes.FilterOperatorNotExists:
return s.buildExistsCondition(ctx, key, operator, column, sb)
default:
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported operator: %v", operator)
}
}
func (s *DirectExpressionStrategy) buildExistsCondition(
ctx context.Context,
key *telemetrytypes.TelemetryFieldKey,
operator qbtypes.FilterOperator,
column *schema.Column,
sb *sqlbuilder.SelectBuilder,
) (string, error) {
if key.FieldContext == telemetrytypes.FieldContextBody && !querybuilder.BodyJSONQueryEnabled {
if operator == qbtypes.FilterOperatorExists {
return GetBodyJSONKeyForExists(ctx, key, operator, nil), nil
}
return "NOT " + GetBodyJSONKeyForExists(ctx, key, operator, nil), nil
}
var value any
switch column.Type.GetType() {
case schema.ColumnTypeEnumJSON:
tblFieldName, err := s.fm.FieldFor(ctx, key)
if err != nil {
return "", err
}
if operator == qbtypes.FilterOperatorExists {
return sb.IsNotNull(tblFieldName), nil
}
return sb.IsNull(tblFieldName), nil
case schema.ColumnTypeEnumLowCardinality:
switch elementType := column.Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
case schema.ColumnTypeEnumString:
value = ""
if operator == qbtypes.FilterOperatorExists {
return sb.NE(column.Name, value), nil
}
return sb.E(column.Name, value), nil
default:
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for low cardinality column type %s", elementType)
}
case schema.ColumnTypeEnumString:
value = ""
if operator == qbtypes.FilterOperatorExists {
return sb.NE(column.Name, value), nil
}
return sb.E(column.Name, value), nil
case schema.ColumnTypeEnumUInt64, schema.ColumnTypeEnumUInt32, schema.ColumnTypeEnumUInt8:
value = 0
if operator == qbtypes.FilterOperatorExists {
return sb.NE(column.Name, value), nil
}
return sb.E(column.Name, value), nil
case schema.ColumnTypeEnumMap:
keyType := column.Type.(schema.MapColumnType).KeyType
if _, ok := keyType.(schema.LowCardinalityColumnType); !ok {
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "key type %s is not supported for map column type %s", keyType, column.Type)
}
switch valueType := column.Type.(schema.MapColumnType).ValueType; valueType.GetType() {
case schema.ColumnTypeEnumString, schema.ColumnTypeEnumBool, schema.ColumnTypeEnumFloat64:
leftOperand := fmt.Sprintf("mapContains(%s, '%s')", column.Name, key.Name)
if key.Materialized {
leftOperand = telemetrytypes.FieldKeyToMaterializedColumnNameForExists(key)
}
if operator == qbtypes.FilterOperatorExists {
return sb.E(leftOperand, true), nil
}
return sb.NE(leftOperand, true), nil
default:
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for map column type %s", valueType)
}
default:
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for column type %s", column.Type)
}
}

View File

@@ -0,0 +1,57 @@
package telemetrylogs
import (
"context"
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
)
// LambdaExpressionStrategy handles condition building for JSON fields
// that require nested lambda functions (arrayExists, arrayMap) because
// ClickHouse doesn't allow flat predicates for fields inside arrays.
type LambdaExpressionStrategy struct {
fm qbtypes.FieldMapper
}
func NewLambdaExpressionStrategy(fm qbtypes.FieldMapper) *LambdaExpressionStrategy {
return &LambdaExpressionStrategy{fm: fm}
}
func (s *LambdaExpressionStrategy) CanHandle(ctx context.Context, key *telemetrytypes.TelemetryFieldKey, column *schema.Column) bool {
// Lambda expressions are needed for JSON columns with array traversal
if column.Type.GetType() != schema.ColumnTypeEnumJSON {
return false
}
if !querybuilder.BodyJSONQueryEnabled {
return false
}
// Check if this requires array traversal (has JSONPlan with non-terminal nodes)
if len(key.JSONPlan) == 0 {
return false
}
// If the first node is terminal, we can use direct expression
if key.JSONPlan[0].IsTerminal {
return false
}
return true
}
func (s *LambdaExpressionStrategy) BuildCondition(
ctx context.Context,
key *telemetrytypes.TelemetryFieldKey,
operator qbtypes.FilterOperator,
value any,
sb *sqlbuilder.SelectBuilder,
) (string, error) {
valueType, value := InferDataType(value, operator, key)
jsonBuilder := NewJSONConditionBuilder(key, valueType)
return jsonBuilder.buildJSONCondition(operator, value, sb)
}

View File

@@ -0,0 +1,88 @@
package telemetrylogs
import (
"fmt"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/huandu/go-sqlbuilder"
)
// logicalFieldMetadata defines how a conceptual/logical field (like "body")
// is projected and how specific operators are translated into conditions.
//
// This allows us to treat fields that are backed by multiple physical columns
// (e.g. body, body_json, body_json_promoted) in a single, centralized place
// without scattering ad-hoc condition logic across builders.
type logicalFieldMetadata struct {
FieldExpressions []string
// Conditions optionally overrides how specific operators are translated
// into SQL conditions for this logical field. If an operator is not present
// here, we fall back to the standard condition-building flow.
Conditions qbtypes.OperatorToConditionRegistry
}
// buildBodyLogicalFieldMetadata creates the metadata for the "body" logical field.
// This function is used to avoid circular dependencies when condition functions
// need to access FieldExpressions.
func buildBodyLogicalFieldMetadata() logicalFieldMetadata {
// Capture field expressions in a local variable to avoid accessing the registry
// from within condition functions (which would create a circular dependency)
fieldExpressions := []string{
LogsV2BodyColumn,
}
if querybuilder.BodyJSONQueryEnabled {
fieldExpressions = append(fieldExpressions, jsonMergeExpr())
}
return logicalFieldMetadata{
FieldExpressions: fieldExpressions,
Conditions: qbtypes.OperatorToConditionRegistry{
// body Exists / body != ""
qbtypes.FilterOperatorExists: func(sb *sqlbuilder.SelectBuilder, _ any) (string, error) {
conditions := []string{
sb.Equal(emptyFunctionWrapped(LogsV2BodyColumn), false),
}
if !querybuilder.BodyJSONQueryEnabled {
return conditions[0], nil
}
conditions = append(conditions, sb.Equal(emptyFunctionWrapped(LogsV2BodyJSONColumn), false))
conditions = append(conditions, sb.Equal(emptyFunctionWrapped(LogsV2BodyPromotedColumn), false))
// Any of the representations being non-empty counts as "exists".
return sb.Or(conditions...), nil
},
// body Not Exists / body == ""
qbtypes.FilterOperatorNotExists: func(sb *sqlbuilder.SelectBuilder, _ any) (string, error) {
conditions := []string{
sb.Equal(emptyFunctionWrapped(LogsV2BodyColumn), true),
}
if !querybuilder.BodyJSONQueryEnabled {
return conditions[0], nil
}
conditions = append(conditions, sb.Equal(emptyFunctionWrapped(LogsV2BodyJSONColumn), true))
conditions = append(conditions, sb.Equal(emptyFunctionWrapped(LogsV2BodyPromotedColumn), true))
// All representations must be empty for "not exists".
return sb.And(conditions...), nil
},
qbtypes.FilterOperatorRegexp: func(sb *sqlbuilder.SelectBuilder, value any) (string, error) {
conditions := []string{}
for _, expression := range fieldExpressions {
conditions = append(conditions, fmt.Sprintf(`match(LOWER(%s), LOWER(%s))`, expression, sb.Var(value)))
}
return sb.Or(conditions...), nil
},
qbtypes.FilterOperatorNotRegexp: func(sb *sqlbuilder.SelectBuilder, value any) (string, error) {
conditions := []string{}
for _, expression := range fieldExpressions {
conditions = append(conditions, fmt.Sprintf(`NOT match(LOWER(%s), LOWER(%s))`, expression, sb.Var(value)))
}
return sb.Or(conditions...), nil
},
},
}
}
var logicalFieldRegistry = map[string]logicalFieldMetadata{
"body": buildBodyLogicalFieldMetadata(),
}