Compare commits

..

1 Commits

Author SHA1 Message Date
Cursor Agent
57ed82e121 Fix: Handle unrecognized child type in VisitPrimary and remove no-op boolean expressions 2026-03-11 09:25:08 +00:00
16 changed files with 118 additions and 165 deletions

View File

@@ -658,7 +658,7 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
telemetryStore := telemetrystoretest.New(telemetrystore.Config{}, &queryMatcherAny{})
// Setup mock metadata store
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
// Create query parser
queryParser := queryparser.New(settings)

View File

@@ -330,6 +330,9 @@ func (v *filterExpressionVisitor) VisitPrimary(ctx *grammar.PrimaryContext) any
v.errors = append(v.errors, fmt.Sprintf("unsupported value type: %s", valCtx.GetText()))
return ""
}
} else {
// Child is neither KeyContext nor ValueContext, return empty condition
return ""
}
cond, err := v.conditionBuilder.ConditionFor(context.Background(), v.fullTextColumn, qbtypes.FilterOperatorRegexp, FormatFullTextSearch(searchText), v.builder, v.startNs, v.endNs)
if err != nil {

View File

@@ -33,7 +33,7 @@ func (c *conditionBuilder) conditionFor(
return "", err
}
if column.Type.GetType() == schema.ColumnTypeEnumJSON && querybuilder.BodyJSONQueryEnabled && !key.Materialized {
if column.Type.GetType() == schema.ColumnTypeEnumJSON && querybuilder.BodyJSONQueryEnabled && key.FieldContext != telemetrytypes.FieldContextLog {
valueType, value := InferDataType(value, operator, key)
cond, err := NewJSONConditionBuilder(key, valueType).buildJSONCondition(operator, value, sb)
if err != nil {
@@ -52,7 +52,7 @@ func (c *conditionBuilder) conditionFor(
}
// Check if this is a body JSON search - either by FieldContext
if key.FieldContext == telemetrytypes.FieldContextBody && !querybuilder.BodyJSONQueryEnabled {
if key.FieldContext == telemetrytypes.FieldContextBody {
tblFieldName, value = GetBodyJSONKey(ctx, key, operator, value)
}
@@ -259,15 +259,14 @@ func (c *conditionBuilder) ConditionFor(
// immidiately return
return condition, nil
case telemetrytypes.FieldContextResource, telemetrytypes.FieldContextAttribute:
buildExistCondition = buildExistCondition && true
// pass; buildExistCondition already has the correct value
case telemetrytypes.FieldContextBody:
// Querying JSON fields already account for Nullability of fields
// so additional exists checks are not needed
if querybuilder.BodyJSONQueryEnabled {
return condition, nil
}
buildExistCondition = buildExistCondition && true
// pass; buildExistCondition already has the correct value
}
if buildExistCondition {

View File

@@ -3,7 +3,6 @@ package telemetrylogs
import (
"fmt"
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz-otel-collector/constants"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
@@ -40,22 +39,20 @@ const (
BodyV2ColumnPrefix = constants.BodyV2ColumnPrefix
BodyPromotedColumnPrefix = constants.BodyPromotedColumnPrefix
MessageBodyField = "message"
MessageSubColumn = "body_v2.message"
bodySearchDefaultWarning = "body searches default to `body.message:string`. Use `body.<key>` to search a different field inside body"
)
var (
// BodyLogicalFieldJSONMapping is the canonical key for the "message" type hint.
// It lives under body (FieldContextBody) but Materialized=true signals the field
// mapper to access it as a direct sub-column (body_v2.message) rather than via
// dynamicElement(). Its name is the bare field name ("message"), not the column path.
// mapping for body logical field to message sub column
// Here field context is log since message is a type hint and
// in a sense it is a direct column of log table and doesn't need any lambda expressions
// TODO(Piyush): Add description for detailed explanation of remapping of body to message
BodyLogicalFieldJSONMapping = &telemetrytypes.TelemetryFieldKey{
Name: MessageBodyField,
Name: MessageSubColumn,
Signal: telemetrytypes.SignalLogs,
FieldContext: telemetrytypes.FieldContextBody,
FieldContext: telemetrytypes.FieldContextLog,
FieldDataType: telemetrytypes.FieldDataTypeString,
Materialized: true,
}
DefaultFullTextColumn = &telemetrytypes.TelemetryFieldKey{
Name: "body",
@@ -149,15 +146,7 @@ func enrichMapsForJSONBodyEnabled() {
if querybuilder.BodyJSONQueryEnabled {
DefaultFullTextColumn = BodyLogicalFieldJSONMapping
IntrinsicFields["body"] = *BodyLogicalFieldJSONMapping
// Register all key names that should resolve to the message type-hint column so
// QB can look them up directly: bare "message" and qualified "body_v2.message".
IntrinsicFields[MessageSubColumn] = *BodyLogicalFieldJSONMapping
IntrinsicFields[MessageBodyField] = *BodyLogicalFieldJSONMapping
logsV2Columns[MessageSubColumn] = &schema.Column{
Name: MessageSubColumn,
Type: schema.ColumnTypeString,
}
}
}

View File

@@ -58,6 +58,7 @@ var (
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
ValueType: schema.ColumnTypeString,
}},
MessageSubColumn: {Name: MessageSubColumn, Type: schema.ColumnTypeString},
}
)
@@ -88,13 +89,6 @@ func (m *fieldMapper) getColumn(_ context.Context, key *telemetrytypes.Telemetry
return logsV2Columns["attributes_bool"], nil
}
case telemetrytypes.FieldContextBody:
// Type hints (Materialized=true) have a direct physical sub-column in body_v2.
// Return a synthetic String column so the condition builder uses the direct path
// instead of the JSON condition builder (which expects a JSONPlan).
if key.Materialized {
// Type hints have a direct physical sub-column in body_v2 (e.g. body_v2.message).
return logsV2Columns[fmt.Sprintf("%s.%s", LogsV2BodyV2Column, key.Name)], nil
}
// Body context is for JSON body fields
// Use body_v2 if feature flag is enabled
if querybuilder.BodyJSONQueryEnabled {
@@ -255,37 +249,34 @@ func (m *fieldMapper) buildFieldForJSON(key *telemetrytypes.TelemetryFieldKey) (
node := plan[0]
expr := fmt.Sprintf("dynamicElement(%s, '%s')", node.FieldPath(), node.TerminalConfig.ElemType.StringValue())
// TODO(Piyush): Promoted path logic commented out. Materialized now means type hint
// promotion will be extracted from key field evolution
// (direct sub-column access), not a promoted body_promoted.* column.
// if key.Materialized {
// if len(plan) < 2 {
// return "", errors.Newf(errors.TypeUnexpected, CodePromotedPlanMissing,
// "plan length is less than 2 for promoted path: %s", key.Name)
// }
if key.Materialized {
if len(plan) < 2 {
return "", errors.Newf(errors.TypeUnexpected, CodePromotedPlanMissing,
"plan length is less than 2 for promoted path: %s", key.Name)
}
// node := plan[1]
// promotedExpr := fmt.Sprintf(
// "dynamicElement(%s, '%s')",
// node.FieldPath(),
// node.TerminalConfig.ElemType.StringValue(),
// )
node := plan[1]
promotedExpr := fmt.Sprintf(
"dynamicElement(%s, '%s')",
node.FieldPath(),
node.TerminalConfig.ElemType.StringValue(),
)
// // dynamicElement returns NULL for scalar types or an empty array for array types.
// if node.TerminalConfig.ElemType.IsArray {
// expr = fmt.Sprintf(
// "if(length(%s) > 0, %s, %s)",
// promotedExpr,
// promotedExpr,
// expr,
// )
// } else {
// // promoted column first then body_json column
// // TODO(Piyush): Change this in future for better performance
// expr = fmt.Sprintf("coalesce(%s, %s)", promotedExpr, expr)
// }
// dynamicElement returns NULL for scalar types or an empty array for array types.
if node.TerminalConfig.ElemType.IsArray {
expr = fmt.Sprintf(
"if(length(%s) > 0, %s, %s)",
promotedExpr,
promotedExpr,
expr,
)
} else {
// promoted column first then body_json column
// TODO(Piyush): Change this in future for better performance
expr = fmt.Sprintf("coalesce(%s, %s)", promotedExpr, expr)
}
// }
}
return expr, nil
}

View File

@@ -96,9 +96,6 @@ func TestStmtBuilderTimeSeriesBodyGroupByJSON(t *testing.T) {
}
}
/* Promoted path tests commented out — Materialized now means type hint (direct sub-column),
not a body_promoted.* column. These tests assumed the old coalesce(body_promoted.x, body_v2.x) path.
func TestStmtBuilderTimeSeriesBodyGroupByPromoted(t *testing.T) {
enable, disable := jsonQueryTestUtil(t)
enable()
@@ -159,7 +156,6 @@ func TestStmtBuilderTimeSeriesBodyGroupByPromoted(t *testing.T) {
})
}
}
*/
func TestStatementBuilderListQueryBodyHas(t *testing.T) {
enable, disable := jsonQueryTestUtil(t)
@@ -390,8 +386,8 @@ func TestStatementBuilderListQueryBody(t *testing.T) {
Limit: 10,
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (arrayExists(`body_v2.education`-> (arrayExists(`body_v2.education[].awards`-> toFloat64(dynamicElement(`body_v2.education[].awards`.`semester`, 'Int64')) BETWEEN ? AND ?, dynamicElement(`body_v2.education`.`awards`, 'Array(JSON(max_dynamic_types=8, max_dynamic_paths=0))')) OR arrayExists(`body_v2.education[].awards`-> toFloat64(dynamicElement(`body_v2.education[].awards`.`semester`, 'Int64')) BETWEEN ? AND ?, arrayMap(x->dynamicElement(x, 'JSON'), arrayFilter(x->(dynamicType(x) = 'JSON'), dynamicElement(`body_v2.education`.`awards`, 'Array(Dynamic)'))))), dynamicElement(body_v2.`education`, 'Array(JSON(max_dynamic_types=16, max_dynamic_paths=0))'))) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), float64(2), float64(4), float64(2), float64(4), "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (arrayExists(`body_v2.education`-> (arrayExists(`body_v2.education[].awards`-> toFloat64(dynamicElement(`body_v2.education[].awards`.`semester`, 'Int64')) BETWEEN ? AND ?, dynamicElement(`body_v2.education`.`awards`, 'Array(JSON(max_dynamic_types=8, max_dynamic_paths=0))')) OR arrayExists(`body_v2.education[].awards`-> toFloat64(dynamicElement(`body_v2.education[].awards`.`semester`, 'Int64')) BETWEEN ? AND ?, arrayMap(x->dynamicElement(x, 'JSON'), arrayFilter(x->(dynamicType(x) = 'JSON'), dynamicElement(`body_v2.education`.`awards`, 'Array(Dynamic)'))))), dynamicElement(body_v2.`education`, 'Array(JSON(max_dynamic_types=16, max_dynamic_paths=0))'))) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (arrayExists(`body_v2.education`-> (arrayExists(`body_v2.education[].awards`-> toFloat64(dynamicElement(`body_v2.education[].awards`.`semester`, 'Int64')) BETWEEN ? AND ?, dynamicElement(`body_v2.education`.`awards`, 'Array(JSON(max_dynamic_types=8, max_dynamic_paths=0))')) OR arrayExists(`body_v2.education[].awards`-> toFloat64(dynamicElement(`body_v2.education[].awards`.`semester`, 'Int64')) BETWEEN ? AND ?, arrayMap(x->dynamicElement(x, 'JSON'), arrayFilter(x->(dynamicType(x) = 'JSON'), dynamicElement(`body_v2.education`.`awards`, 'Array(Dynamic)'))))), dynamicElement(body_v2.`education`, 'Array(JSON(max_dynamic_types=16, max_dynamic_paths=0))'))) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{float64(2), float64(4), float64(2), float64(4), uint64(1747945619), uint64(1747983448), float64(2), float64(4), float64(2), float64(4), "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
},
expectedErr: nil,
},
@@ -404,8 +400,8 @@ func TestStatementBuilderListQueryBody(t *testing.T) {
Limit: 10,
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (arrayExists(`body_v2.education`-> (arrayExists(`body_v2.education[].awards`-> toFloat64(dynamicElement(`body_v2.education[].awards`.`semester`, 'Int64')) IN (?, ?), dynamicElement(`body_v2.education`.`awards`, 'Array(JSON(max_dynamic_types=8, max_dynamic_paths=0))')) OR arrayExists(`body_v2.education[].awards`-> toFloat64(dynamicElement(`body_v2.education[].awards`.`semester`, 'Int64')) IN (?, ?), arrayMap(x->dynamicElement(x, 'JSON'), arrayFilter(x->(dynamicType(x) = 'JSON'), dynamicElement(`body_v2.education`.`awards`, 'Array(Dynamic)'))))), dynamicElement(body_v2.`education`, 'Array(JSON(max_dynamic_types=16, max_dynamic_paths=0))'))) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), float64(2), float64(4), float64(2), float64(4), "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (arrayExists(`body_v2.education`-> (arrayExists(`body_v2.education[].awards`-> toFloat64(dynamicElement(`body_v2.education[].awards`.`semester`, 'Int64')) IN (?, ?), dynamicElement(`body_v2.education`.`awards`, 'Array(JSON(max_dynamic_types=8, max_dynamic_paths=0))')) OR arrayExists(`body_v2.education[].awards`-> toFloat64(dynamicElement(`body_v2.education[].awards`.`semester`, 'Int64')) IN (?, ?), arrayMap(x->dynamicElement(x, 'JSON'), arrayFilter(x->(dynamicType(x) = 'JSON'), dynamicElement(`body_v2.education`.`awards`, 'Array(Dynamic)'))))), dynamicElement(body_v2.`education`, 'Array(JSON(max_dynamic_types=16, max_dynamic_paths=0))'))) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (arrayExists(`body_v2.education`-> (arrayExists(`body_v2.education[].awards`-> toFloat64(dynamicElement(`body_v2.education[].awards`.`semester`, 'Int64')) IN (?, ?), dynamicElement(`body_v2.education`.`awards`, 'Array(JSON(max_dynamic_types=8, max_dynamic_paths=0))')) OR arrayExists(`body_v2.education[].awards`-> toFloat64(dynamicElement(`body_v2.education[].awards`.`semester`, 'Int64')) IN (?, ?), arrayMap(x->dynamicElement(x, 'JSON'), arrayFilter(x->(dynamicType(x) = 'JSON'), dynamicElement(`body_v2.education`.`awards`, 'Array(Dynamic)'))))), dynamicElement(body_v2.`education`, 'Array(JSON(max_dynamic_types=16, max_dynamic_paths=0))'))) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{float64(2), float64(4), float64(2), float64(4), uint64(1747945619), uint64(1747983448), float64(2), float64(4), float64(2), float64(4), "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
},
expectedErr: nil,
},
@@ -529,9 +525,6 @@ func TestStatementBuilderListQueryBody(t *testing.T) {
}
}
/* Promoted path list-query tests commented out — Materialized now means type hint
(direct sub-column access), not a body_promoted.* column.
func TestStatementBuilderListQueryBodyPromoted(t *testing.T) {
enable, disable := jsonQueryTestUtil(t)
enable()
@@ -707,7 +700,6 @@ func TestStatementBuilderListQueryBodyPromoted(t *testing.T) {
})
}
}
*/
func TestStatementBuilderListQueryBodyMessage(t *testing.T) {
enable, disable := jsonQueryTestUtil(t)
@@ -715,6 +707,19 @@ func TestStatementBuilderListQueryBodyMessage(t *testing.T) {
defer disable()
statementBuilder := buildJSONTestStatementBuilder(t)
indexed := []*telemetrytypes.TelemetryFieldKey{
{
Name: "message",
Indexes: []telemetrytypes.JSONDataTypeIndex{
{
Type: telemetrytypes.String,
ColumnExpression: "body_promoted.message",
IndexExpression: "(lower(assumeNotNull(dynamicElement(body_promoted.message, 'String'))))",
},
},
},
}
testAddIndexedPaths(t, statementBuilder, indexed...)
cases := []struct {
name string
requestType qbtypes.RequestType
@@ -727,12 +732,12 @@ func TestStatementBuilderListQueryBodyMessage(t *testing.T) {
requestType: qbtypes.RequestTypeRaw,
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Signal: telemetrytypes.SignalLogs,
Filter: &qbtypes.Filter{Expression: "message Exists"},
Filter: &qbtypes.Filter{Expression: "body.message Exists"},
Limit: 10,
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND body_v2.message <> ? AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), "", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (dynamicElement(body_v2.`message`, 'String') IS NOT NULL) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
},
expectedErr: nil,
},
@@ -745,7 +750,7 @@ func TestStatementBuilderListQueryBodyMessage(t *testing.T) {
Limit: 10,
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND body_v2.message = ? AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (dynamicElement(body_v2.`message`, 'String') = ?) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), "", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
},
expectedErr: nil,
@@ -759,21 +764,21 @@ func TestStatementBuilderListQueryBodyMessage(t *testing.T) {
Limit: 10,
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND body_v2.message = ? AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (dynamicElement(body_v2.`message`, 'String') = ?) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), "Iron Award", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
},
expectedErr: nil,
},
{
name: "message contains 'Iron Award'",
name: "body.message contains 'Iron Award'",
requestType: qbtypes.RequestTypeRaw,
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
Signal: telemetrytypes.SignalLogs,
Filter: &qbtypes.Filter{Expression: "message Contains 'Iron Award'"},
Filter: &qbtypes.Filter{Expression: "body.message Contains 'Iron Award'"},
Limit: 10,
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND LOWER(body_v2.message) LIKE LOWER(?) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (LOWER(dynamicElement(body_v2.`message`, 'String')) LIKE LOWER(?)) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), "%Iron Award%", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
},
expectedErr: nil,
@@ -799,7 +804,8 @@ func TestStatementBuilderListQueryBodyMessage(t *testing.T) {
}
func buildTestTelemetryMetadataStore(t *testing.T, promotedPaths ...string) *telemetrytypestest.MockMetadataStore {
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(IntrinsicFields)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
types, _ := telemetrytypes.TestJSONTypeSet()
for path, jsonTypes := range types {
promoted := false
@@ -836,14 +842,11 @@ func buildJSONTestStatementBuilder(t *testing.T, promotedPaths ...string) *logQu
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
resourceFilterFM := resourcefilter.NewFieldMapper()
resourceFilterCB := resourcefilter.NewConditionBuilder(resourceFilterFM)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
resourceFilterStmtBuilder := resourcefilter.NewLogResourceFilterStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
resourceFilterFM,
resourceFilterCB,
fm,
cb,
mockMetadataStore,
DefaultFullTextColumn,
GetBodyJSONKey,
@@ -863,6 +866,19 @@ func buildJSONTestStatementBuilder(t *testing.T, promotedPaths ...string) *logQu
return statementBuilder
}
func testAddIndexedPaths(t *testing.T, statementBuilder *logQueryStatementBuilder, telemetryFieldKeys ...*telemetrytypes.TelemetryFieldKey) {
mockMetadataStore := statementBuilder.metadataStore.(*telemetrytypestest.MockMetadataStore)
for _, key := range telemetryFieldKeys {
if strings.Contains(key.Name, telemetrytypes.ArraySep) || strings.Contains(key.Name, telemetrytypes.ArrayAnyIndex) {
t.Fatalf("array paths are not supported: %s", key.Name)
}
for _, storedKey := range mockMetadataStore.KeysMap[key.Name] {
storedKey.Indexes = append(storedKey.Indexes, key.Indexes...)
}
}
}
func jsonQueryTestUtil(_ *testing.T) (func(), func()) {
querybuilder.BodyJSONQueryEnabled = true
base := telemetrytypes.TelemetryFieldKey{
@@ -881,8 +897,6 @@ func jsonQueryTestUtil(_ *testing.T) (func(), func()) {
DefaultFullTextColumn = &base
IntrinsicFields["body"] = base
delete(IntrinsicFields, MessageSubColumn)
delete(IntrinsicFields, MessageBodyField)
delete(logsV2Columns, MessageSubColumn)
}
return enable, disable

View File

@@ -18,7 +18,7 @@ import (
func resourceFilterStmtBuilder() qbtypes.StatementBuilder[qbtypes.LogAggregation] {
fm := resourcefilter.NewFieldMapper()
cb := resourcefilter.NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
keysMap := buildCompleteFieldKeyMap()
for _, keys := range keysMap {
for _, key := range keys {
@@ -196,7 +196,7 @@ func TestStatementBuilderTimeSeries(t *testing.T) {
},
}
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
@@ -316,7 +316,7 @@ func TestStatementBuilderListQuery(t *testing.T) {
},
}
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
@@ -456,7 +456,7 @@ func TestStatementBuilderListQueryResourceTests(t *testing.T) {
},
}
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
@@ -532,7 +532,7 @@ func TestStatementBuilderTimeSeriesBodyGroupBy(t *testing.T) {
},
}
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
@@ -627,7 +627,7 @@ func TestStatementBuilderListQueryServiceCollision(t *testing.T) {
},
}
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMapCollision()
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
@@ -850,7 +850,7 @@ func TestAdjustKey(t *testing.T) {
}
fm := NewFieldMapper()
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMapCollision()
cb := NewConditionBuilder(fm)
@@ -1005,7 +1005,7 @@ func TestStmtBuilderBodyField(t *testing.T) {
disable()
}
// build the key map after enabling/disabling body JSON query
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)

View File

@@ -939,6 +939,10 @@ func buildCompleteFieldKeyMap() map[string][]*telemetrytypes.TelemetryFieldKey {
}
}
// add intrinsic fields to the map
for fieldName, key := range IntrinsicFields {
keysMap[fieldName] = append(keysMap[fieldName], &key)
}
return keysMap
}

View File

@@ -540,7 +540,7 @@ func (t *telemetryMetaStore) getLogsKeys(ctx context.Context, fieldKeySelectors
}
mapOfKeys[key.Name] = append(mapOfKeys[key.Name], key)
setOfKeys[key.Text()] = key
setOfKeys[name+";"+fieldContext.StringValue()+";"+fieldDataType.StringValue()] = key
}
if rows.Err() != nil {
@@ -566,15 +566,11 @@ func (t *telemetryMetaStore) getLogsKeys(ctx context.Context, fieldKeySelectors
if found {
if field, exists := telemetrylogs.IntrinsicFields[key]; exists {
// Register by physical name only once and only when it differs from the
// logical key — if they are the same, the always-register below covers it.
if _, added := setOfKeys[field.Text()]; !added {
mapOfKeys[field.Text()] = append(mapOfKeys[field.Name], &field)
if _, added := setOfKeys[field.Name+";"+field.FieldContext.StringValue()+";"+field.FieldDataType.StringValue()]; !added {
mapOfKeys[field.Name] = append(mapOfKeys[field.Name], &field)
// intrinsic field can be a logical field mapped to a different physical location
mapOfKeys[key] = append(mapOfKeys[key], &field)
}
// Always register the logical key so that every alias in IntrinsicFields
// (e.g. "message", "body_v2.message") independently resolves to the same
// physical field in the keys map.
mapOfKeys[key] = append(mapOfKeys[key], &field)
continue
}
}

View File

@@ -159,7 +159,7 @@ func TestStatementBuilder(t *testing.T) {
fm := telemetrymetrics.NewFieldMapper()
cb := telemetrymetrics.NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
keys, err := telemetrytypestest.LoadFieldKeysFromJSON("testdata/keys_map.json")
if err != nil {
t.Fatalf("failed to load field keys: %v", err)

View File

@@ -221,7 +221,7 @@ func TestStatementBuilder(t *testing.T) {
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
keys, err := telemetrytypestest.LoadFieldKeysFromJSON("testdata/keys_map.json")
if err != nil {
t.Fatalf("failed to load field keys: %v", err)

View File

@@ -18,7 +18,7 @@ import (
func resourceFilterStmtBuilder() qbtypes.StatementBuilder[qbtypes.TraceAggregation] {
fm := resourcefilter.NewFieldMapper()
cb := resourcefilter.NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
return resourcefilter.NewTraceResourceFilterStatementBuilder(
@@ -368,7 +368,7 @@ func TestStatementBuilder(t *testing.T) {
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
@@ -664,7 +664,7 @@ func TestStatementBuilderListQuery(t *testing.T) {
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
@@ -771,7 +771,7 @@ func TestStatementBuilderListQueryWithCorruptData(t *testing.T) {
t.Run(c.name, func(t *testing.T) {
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = c.keysMap
if mockMetadataStore.KeysMap == nil {
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
@@ -927,7 +927,7 @@ func TestStatementBuilderTraceQuery(t *testing.T) {
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
@@ -1145,7 +1145,7 @@ func TestAdjustKey(t *testing.T) {
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
@@ -1420,7 +1420,7 @@ func TestAdjustKeys(t *testing.T) {
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()

View File

@@ -388,7 +388,7 @@ func TestTraceOperatorStatementBuilder(t *testing.T) {
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
@@ -504,7 +504,7 @@ func TestTraceOperatorStatementBuilderErrors(t *testing.T) {
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)

View File

@@ -19,7 +19,7 @@ func TestTraceTimeRangeOptimization(t *testing.T) {
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
mockMetadataStore.KeysMap["trace_id"] = []*telemetrytypes.TelemetryFieldKey{{

View File

@@ -40,7 +40,7 @@ type TelemetryFieldKey struct {
JSONDataType *JSONDataType `json:"-"`
JSONPlan JSONAccessPlan `json:"-"`
Indexes []JSONDataTypeIndex `json:"-"`
Materialized bool `json:"-"` // refers to type hint in case of JSON column fields
Materialized bool `json:"-"` // refers to promoted in case of body.... fields
}
func (f *TelemetryFieldKey) KeyNameContainsArray() bool {

View File

@@ -21,20 +21,10 @@ type MockMetadataStore struct {
PromotedPathsMap map[string]bool
LogsJSONIndexesMap map[string][]schemamigrator.Index
LookupKeysMap map[telemetrytypes.MetricMetadataLookupKey]int64
// StaticFields holds signal-specific intrinsic field definitions (e.g. telemetrylogs.IntrinsicFields).
// It is injected into GetKeys / GetKey results, mirroring what the real metadata store does when
// it reads telemetrylogs.IntrinsicFields directly. Callers pass their package's IntrinsicFields
// map at construction time to avoid a circular import.
StaticFields map[string]telemetrytypes.TelemetryFieldKey
}
// NewMockMetadataStore creates a new instance of MockMetadataStore with initialized maps.
// Pass the signal-specific intrinsic fields (e.g. telemetrylogs.IntrinsicFields) so the mock
// mirrors what the real metadata store does when injecting those definitions into key results.
func NewMockMetadataStore(intrinsicFields map[string]telemetrytypes.TelemetryFieldKey) *MockMetadataStore {
if intrinsicFields == nil {
intrinsicFields = make(map[string]telemetrytypes.TelemetryFieldKey)
}
// NewMockMetadataStore creates a new instance of MockMetadataStore with initialized maps
func NewMockMetadataStore() *MockMetadataStore {
return &MockMetadataStore{
KeysMap: make(map[string][]*telemetrytypes.TelemetryFieldKey),
RelatedValuesMap: make(map[string][]string),
@@ -44,7 +34,6 @@ func NewMockMetadataStore(intrinsicFields map[string]telemetrytypes.TelemetryFie
PromotedPathsMap: make(map[string]bool),
LogsJSONIndexesMap: make(map[string][]schemamigrator.Index),
LookupKeysMap: make(map[telemetrytypes.MetricMetadataLookupKey]int64),
StaticFields: intrinsicFields,
}
}
@@ -58,8 +47,9 @@ func (m *MockMetadataStore) GetKeys(ctx context.Context, fieldKeySelector *telem
return m.KeysMap, true, nil
}
// Apply selector logic from KeysMap
// Apply selector logic
for name, keys := range m.KeysMap {
// Check if name matches
if matchesName(fieldKeySelector, name) {
filteredKeys := []*telemetrytypes.TelemetryFieldKey{}
for _, key := range keys {
@@ -73,31 +63,6 @@ func (m *MockMetadataStore) GetKeys(ctx context.Context, fieldKeySelector *telem
}
}
// Inject StaticFields (e.g. IntrinsicFields), mirroring the real metadata store.
// StaticFields take precedence over KeysMap entries for the same logical key.
// Each logical key always gets its own entry (so "message" and "body.message" both
// resolve to the IntrinsicField independently). The physical name is registered
// only once to avoid duplicate entries in that slot.
injectedPhysKeys := make(map[string]bool)
for key, field := range m.StaticFields {
if !matchesName(fieldKeySelector, key) {
continue
}
fieldCopy := field
// Always register the logical key (override any KeysMap entries).
result[key] = []*telemetrytypes.TelemetryFieldKey{&fieldCopy}
// Register by physical name only once when it differs from the logical key.
if key != fieldCopy.Name {
physKey := fieldCopy.Name + ";" + fieldCopy.FieldContext.StringValue() + ";" + fieldCopy.FieldDataType.StringValue()
if !injectedPhysKeys[physKey] {
injectedPhysKeys[physKey] = true
result[fieldCopy.Name] = append(result[fieldCopy.Name], &fieldCopy)
}
}
}
return result, true, nil
}
@@ -144,7 +109,7 @@ func (m *MockMetadataStore) GetKey(ctx context.Context, fieldKeySelector *teleme
result := []*telemetrytypes.TelemetryFieldKey{}
// Find keys matching the selector from KeysMap
// Find keys matching the selector
for name, keys := range m.KeysMap {
if matchesName(fieldKeySelector, name) {
for _, key := range keys {
@@ -155,14 +120,6 @@ func (m *MockMetadataStore) GetKey(ctx context.Context, fieldKeySelector *teleme
}
}
// Add matching StaticFields (e.g. IntrinsicFields), same as the real metadata store does
for key, field := range m.StaticFields {
if fieldKeySelector.Name == "" || strings.Contains(key, fieldKeySelector.Name) {
fieldCopy := field
result = append(result, &fieldCopy)
}
}
return result, nil
}
@@ -334,7 +291,7 @@ func (m *MockMetadataStore) FetchTemporalityMulti(ctx context.Context, queryTime
return result, nil
}
// FetchTemporalityAndTypeMulti fetches the temporality and type for multiple metrics
// FetchTemporalityMulti fetches the temporality for multiple metrics
func (m *MockMetadataStore) FetchTemporalityAndTypeMulti(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string]metrictypes.Temporality, map[string]metrictypes.Type, error) {
temporalities := make(map[string]metrictypes.Temporality)
types := make(map[string]metrictypes.Type)