Compare commits

...

1 Commits

Author SHA1 Message Date
Piyush Singariya
199f8367b9 feat: update message as typehint in JSON Column 2026-03-10 20:53:47 +05:30
6 changed files with 48 additions and 41 deletions

View File

@@ -3,14 +3,12 @@ package telemetrylogs
import (
"context"
"fmt"
"slices"
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"golang.org/x/exp/maps"
"github.com/huandu/go-sqlbuilder"
)
@@ -35,7 +33,7 @@ func (c *conditionBuilder) conditionFor(
return "", err
}
if column.Type.GetType() == schema.ColumnTypeEnumJSON && querybuilder.BodyJSONQueryEnabled {
if column.Type.GetType() == schema.ColumnTypeEnumJSON && querybuilder.BodyJSONQueryEnabled && key.FieldContext != telemetrytypes.FieldContextLog {
valueType, value := InferDataType(value, operator, key)
cond, err := NewJSONConditionBuilder(key, valueType).buildJSONCondition(operator, value, sb)
if err != nil {
@@ -253,19 +251,32 @@ func (c *conditionBuilder) ConditionFor(
return "", err
}
if !(key.FieldContext == telemetrytypes.FieldContextBody && querybuilder.BodyJSONQueryEnabled) && operator.AddDefaultExistsFilter() {
// skip adding exists filter for intrinsic fields
// with an exception for body json search
field, _ := c.fm.FieldFor(ctx, key)
if slices.Contains(maps.Keys(IntrinsicFields), field) && key.FieldContext != telemetrytypes.FieldContextBody {
// Skip adding exists filter for intrinsic fields i.e. Table level log context fields
buildExistCondition := operator.AddDefaultExistsFilter()
switch key.FieldContext {
case telemetrytypes.FieldContextLog, telemetrytypes.FieldContextScope:
// pass; No need to build exist condition for top level columns
// immidiately return
return condition, nil
case telemetrytypes.FieldContextResource, telemetrytypes.FieldContextAttribute:
buildExistCondition = buildExistCondition && true
case telemetrytypes.FieldContextBody:
// Querying JSON fields already account for Nullability of fields
// so additional exists checks are not needed
if querybuilder.BodyJSONQueryEnabled {
return condition, nil
}
buildExistCondition = buildExistCondition && true
}
if buildExistCondition {
existsCondition, err := c.conditionFor(ctx, key, qbtypes.FilterOperatorExists, nil, sb)
if err != nil {
return "", err
}
return sb.And(condition, existsCondition), nil
}
return condition, nil
}

View File

@@ -127,7 +127,8 @@ func TestConditionFor(t *testing.T) {
{
name: "Contains operator - body",
key: telemetrytypes.TelemetryFieldKey{
Name: "body",
Name: "body",
FieldContext: telemetrytypes.FieldContextLog,
},
operator: qbtypes.FilterOperatorContains,
value: 521509198310,

View File

@@ -39,19 +39,20 @@ const (
BodyV2ColumnPrefix = constants.BodyV2ColumnPrefix
BodyPromotedColumnPrefix = constants.BodyPromotedColumnPrefix
MessageSubColumn = "message"
MessageSubColumn = "body_v2.message"
bodySearchDefaultWarning = "body searches default to `body.message:string`. Use `body.<key>` to search a different field inside body"
)
var (
// mapping for body logical field to message sub column
// Here field context is log since message is a type hint and
// in a sense it is a direct column of log table and doesn't need any lambda expressions
// TODO(Piyush): Add description for detailed explanation of remapping of body to message
BodyLogicalFieldJSONMapping = &telemetrytypes.TelemetryFieldKey{
Name: MessageSubColumn,
Signal: telemetrytypes.SignalLogs,
FieldContext: telemetrytypes.FieldContextBody,
FieldContext: telemetrytypes.FieldContextLog,
FieldDataType: telemetrytypes.FieldDataTypeString,
JSONDataType: &telemetrytypes.String,
}
DefaultFullTextColumn = &telemetrytypes.TelemetryFieldKey{
Name: "body",
@@ -141,20 +142,14 @@ func bodyAliasExpression() string {
return fmt.Sprintf("%s as body", LogsV2BodyV2Column)
}
func init() {
// body logical field is mapped to message field in the body context that too only with String data type
err := BodyLogicalFieldJSONMapping.SetJSONAccessPlan(telemetrytypes.JSONColumnMetadata{
BaseColumn: LogsV2BodyV2Column,
PromotedColumn: LogsV2BodyPromotedColumn,
}, map[string][]telemetrytypes.JSONDataType{
MessageSubColumn: {telemetrytypes.String},
})
if err != nil {
panic(err)
}
func enrichMapsForJSONBodyEnabled() {
if querybuilder.BodyJSONQueryEnabled {
DefaultFullTextColumn = BodyLogicalFieldJSONMapping
IntrinsicFields["body"] = *BodyLogicalFieldJSONMapping
IntrinsicFields[MessageSubColumn] = *BodyLogicalFieldJSONMapping
}
}
func init() {
enrichMapsForJSONBodyEnabled()
}

View File

@@ -58,6 +58,7 @@ var (
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
ValueType: schema.ColumnTypeString,
}},
MessageSubColumn: {Name: MessageSubColumn, Type: schema.ColumnTypeString},
}
)
@@ -147,6 +148,8 @@ func (m *fieldMapper) FieldFor(ctx context.Context, key *telemetrytypes.Telemetr
}
return m.buildFieldForJSON(key)
case telemetrytypes.FieldContextLog:
return column.Name, nil
default:
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "only resource/body context fields are supported for json columns, got %s", key.FieldContext.String)
}

View File

@@ -736,7 +736,7 @@ func TestStatementBuilderListQueryBodyMessage(t *testing.T) {
Limit: 10,
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (dynamicElement(body_v2.`message`, 'String') IS NOT NULL OR dynamicElement(body_promoted.`message`, 'String') IS NOT NULL) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (dynamicElement(body_v2.`message`, 'String') IS NOT NULL) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
},
expectedErr: nil,
@@ -750,8 +750,8 @@ func TestStatementBuilderListQueryBodyMessage(t *testing.T) {
Limit: 10,
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (dynamicElement(body_v2.`message`, 'String') = ? OR dynamicElement(body_promoted.`message`, 'String') = ?) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), "", "", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (dynamicElement(body_v2.`message`, 'String') = ?) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), "", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
},
expectedErr: nil,
},
@@ -764,8 +764,8 @@ func TestStatementBuilderListQueryBodyMessage(t *testing.T) {
Limit: 10,
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (dynamicElement(body_v2.`message`, 'String') = ? OR dynamicElement(body_promoted.`message`, 'String') = ?) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), "Iron Award", "Iron Award", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (dynamicElement(body_v2.`message`, 'String') = ?) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), "Iron Award", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
},
expectedErr: nil,
},
@@ -778,8 +778,8 @@ func TestStatementBuilderListQueryBodyMessage(t *testing.T) {
Limit: 10,
},
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (LOWER(dynamicElement(body_v2.`message`, 'String')) LIKE LOWER(?) OR LOWER(dynamicElement(body_promoted.`message`, 'String')) LIKE LOWER(?)) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), "%Iron Award%", "%Iron Award%", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (LOWER(dynamicElement(body_v2.`message`, 'String')) LIKE LOWER(?)) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), "%Iron Award%", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
},
expectedErr: nil,
},
@@ -811,9 +811,7 @@ func buildTestTelemetryMetadataStore(t *testing.T, promotedPaths ...string) *tel
promoted := false
split := strings.Split(path, telemetrytypes.ArraySep)
if path == "message" {
promoted = true
} else if slices.Contains(promotedPaths, split[0]) {
if slices.Contains(promotedPaths, split[0]) {
promoted = true
}
// Create a TelemetryFieldKey for each JSONDataType for this path
@@ -892,14 +890,13 @@ func jsonQueryTestUtil(_ *testing.T) (func(), func()) {
enable := func() {
querybuilder.BodyJSONQueryEnabled = true
DefaultFullTextColumn = BodyLogicalFieldJSONMapping
IntrinsicFields["body"] = *BodyLogicalFieldJSONMapping
enrichMapsForJSONBodyEnabled()
}
disable := func() {
querybuilder.BodyJSONQueryEnabled = false
DefaultFullTextColumn = &base
IntrinsicFields["body"] = base
delete(IntrinsicFields, MessageSubColumn)
}
return enable, disable

View File

@@ -907,8 +907,8 @@ func TestStmtBuilderBodyField(t *testing.T) {
},
enableBodyJSONQuery: true,
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (dynamicElement(body_v2.`message`, 'String') IS NOT NULL) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND body_v2.message <> ? AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), "", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
Warnings: []string{bodySearchDefaultWarning},
},
expectedErr: nil,
@@ -938,7 +938,7 @@ func TestStmtBuilderBodyField(t *testing.T) {
},
enableBodyJSONQuery: true,
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (dynamicElement(body_v2.`message`, 'String') = ?) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND body_v2.message = ? AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), "", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
Warnings: []string{bodySearchDefaultWarning},
},
@@ -969,7 +969,7 @@ func TestStmtBuilderBodyField(t *testing.T) {
},
enableBodyJSONQuery: true,
expected: qbtypes.Statement{
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (LOWER(dynamicElement(body_v2.`message`, 'String')) LIKE LOWER(?)) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND LOWER(body_v2.message) LIKE LOWER(?) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{uint64(1747945619), uint64(1747983448), "%error%", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
Warnings: []string{bodySearchDefaultWarning},
},