mirror of
https://github.com/SigNoz/signoz.git
synced 2026-03-12 00:12:02 +00:00
Compare commits
3 Commits
merge-json
...
fix/messag
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
66e800aa2e | ||
|
|
3d6c0c6764 | ||
|
|
d9a1d64def |
@@ -658,7 +658,7 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
|
||||
telemetryStore := telemetrystoretest.New(telemetrystore.Config{}, &queryMatcherAny{})
|
||||
|
||||
// Setup mock metadata store
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
|
||||
|
||||
// Create query parser
|
||||
queryParser := queryparser.New(settings)
|
||||
|
||||
@@ -33,7 +33,7 @@ func (c *conditionBuilder) conditionFor(
|
||||
return "", err
|
||||
}
|
||||
|
||||
if column.Type.GetType() == schema.ColumnTypeEnumJSON && querybuilder.BodyJSONQueryEnabled && key.FieldContext != telemetrytypes.FieldContextLog {
|
||||
if column.Type.GetType() == schema.ColumnTypeEnumJSON && querybuilder.BodyJSONQueryEnabled && !key.Materialized {
|
||||
valueType, value := InferDataType(value, operator, key)
|
||||
cond, err := NewJSONConditionBuilder(key, valueType).buildJSONCondition(operator, value, sb)
|
||||
if err != nil {
|
||||
@@ -52,7 +52,7 @@ func (c *conditionBuilder) conditionFor(
|
||||
}
|
||||
|
||||
// Check if this is a body JSON search - either by FieldContext
|
||||
if key.FieldContext == telemetrytypes.FieldContextBody {
|
||||
if key.FieldContext == telemetrytypes.FieldContextBody && !querybuilder.BodyJSONQueryEnabled {
|
||||
tblFieldName, value = GetBodyJSONKey(ctx, key, operator, value)
|
||||
}
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ package telemetrylogs
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
|
||||
"github.com/SigNoz/signoz-otel-collector/constants"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
@@ -39,20 +40,22 @@ const (
|
||||
|
||||
BodyV2ColumnPrefix = constants.BodyV2ColumnPrefix
|
||||
BodyPromotedColumnPrefix = constants.BodyPromotedColumnPrefix
|
||||
MessageBodyField = "message"
|
||||
MessageSubColumn = "body_v2.message"
|
||||
bodySearchDefaultWarning = "body searches default to `body.message:string`. Use `body.<key>` to search a different field inside body"
|
||||
)
|
||||
|
||||
var (
|
||||
// mapping for body logical field to message sub column
|
||||
// Here field context is log since message is a type hint and
|
||||
// in a sense it is a direct column of log table and doesn't need any lambda expressions
|
||||
// TODO(Piyush): Add description for detailed explanation of remapping of body to message
|
||||
// BodyLogicalFieldJSONMapping is the canonical key for the "message" type hint.
|
||||
// It lives under body (FieldContextBody) but Materialized=true signals the field
|
||||
// mapper to access it as a direct sub-column (body_v2.message) rather than via
|
||||
// dynamicElement(). Its name is the bare field name ("message"), not the column path.
|
||||
BodyLogicalFieldJSONMapping = &telemetrytypes.TelemetryFieldKey{
|
||||
Name: MessageSubColumn,
|
||||
Name: MessageBodyField,
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
FieldContext: telemetrytypes.FieldContextLog,
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
Materialized: true,
|
||||
}
|
||||
DefaultFullTextColumn = &telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body",
|
||||
@@ -146,7 +149,15 @@ func enrichMapsForJSONBodyEnabled() {
|
||||
if querybuilder.BodyJSONQueryEnabled {
|
||||
DefaultFullTextColumn = BodyLogicalFieldJSONMapping
|
||||
IntrinsicFields["body"] = *BodyLogicalFieldJSONMapping
|
||||
|
||||
// Register all key names that should resolve to the message type-hint column so
|
||||
// QB can look them up directly: bare "message" and qualified "body_v2.message".
|
||||
IntrinsicFields[MessageSubColumn] = *BodyLogicalFieldJSONMapping
|
||||
IntrinsicFields[MessageBodyField] = *BodyLogicalFieldJSONMapping
|
||||
logsV2Columns[MessageSubColumn] = &schema.Column{
|
||||
Name: MessageSubColumn,
|
||||
Type: schema.ColumnTypeString,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -58,7 +58,6 @@ var (
|
||||
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
ValueType: schema.ColumnTypeString,
|
||||
}},
|
||||
MessageSubColumn: {Name: MessageSubColumn, Type: schema.ColumnTypeString},
|
||||
}
|
||||
)
|
||||
|
||||
@@ -89,6 +88,13 @@ func (m *fieldMapper) getColumn(_ context.Context, key *telemetrytypes.Telemetry
|
||||
return logsV2Columns["attributes_bool"], nil
|
||||
}
|
||||
case telemetrytypes.FieldContextBody:
|
||||
// Type hints (Materialized=true) have a direct physical sub-column in body_v2.
|
||||
// Return a synthetic String column so the condition builder uses the direct path
|
||||
// instead of the JSON condition builder (which expects a JSONPlan).
|
||||
if key.Materialized {
|
||||
// Type hints have a direct physical sub-column in body_v2 (e.g. body_v2.message).
|
||||
return logsV2Columns[fmt.Sprintf("%s.%s", LogsV2BodyV2Column, key.Name)], nil
|
||||
}
|
||||
// Body context is for JSON body fields
|
||||
// Use body_v2 if feature flag is enabled
|
||||
if querybuilder.BodyJSONQueryEnabled {
|
||||
@@ -249,34 +255,37 @@ func (m *fieldMapper) buildFieldForJSON(key *telemetrytypes.TelemetryFieldKey) (
|
||||
node := plan[0]
|
||||
|
||||
expr := fmt.Sprintf("dynamicElement(%s, '%s')", node.FieldPath(), node.TerminalConfig.ElemType.StringValue())
|
||||
if key.Materialized {
|
||||
if len(plan) < 2 {
|
||||
return "", errors.Newf(errors.TypeUnexpected, CodePromotedPlanMissing,
|
||||
"plan length is less than 2 for promoted path: %s", key.Name)
|
||||
}
|
||||
// TODO(Piyush): Promoted path logic commented out. Materialized now means type hint
|
||||
// promotion will be extracted from key field evolution
|
||||
// (direct sub-column access), not a promoted body_promoted.* column.
|
||||
// if key.Materialized {
|
||||
// if len(plan) < 2 {
|
||||
// return "", errors.Newf(errors.TypeUnexpected, CodePromotedPlanMissing,
|
||||
// "plan length is less than 2 for promoted path: %s", key.Name)
|
||||
// }
|
||||
|
||||
node := plan[1]
|
||||
promotedExpr := fmt.Sprintf(
|
||||
"dynamicElement(%s, '%s')",
|
||||
node.FieldPath(),
|
||||
node.TerminalConfig.ElemType.StringValue(),
|
||||
)
|
||||
// node := plan[1]
|
||||
// promotedExpr := fmt.Sprintf(
|
||||
// "dynamicElement(%s, '%s')",
|
||||
// node.FieldPath(),
|
||||
// node.TerminalConfig.ElemType.StringValue(),
|
||||
// )
|
||||
|
||||
// dynamicElement returns NULL for scalar types or an empty array for array types.
|
||||
if node.TerminalConfig.ElemType.IsArray {
|
||||
expr = fmt.Sprintf(
|
||||
"if(length(%s) > 0, %s, %s)",
|
||||
promotedExpr,
|
||||
promotedExpr,
|
||||
expr,
|
||||
)
|
||||
} else {
|
||||
// promoted column first then body_json column
|
||||
// TODO(Piyush): Change this in future for better performance
|
||||
expr = fmt.Sprintf("coalesce(%s, %s)", promotedExpr, expr)
|
||||
}
|
||||
// // dynamicElement returns NULL for scalar types or an empty array for array types.
|
||||
// if node.TerminalConfig.ElemType.IsArray {
|
||||
// expr = fmt.Sprintf(
|
||||
// "if(length(%s) > 0, %s, %s)",
|
||||
// promotedExpr,
|
||||
// promotedExpr,
|
||||
// expr,
|
||||
// )
|
||||
// } else {
|
||||
// // promoted column first then body_json column
|
||||
// // TODO(Piyush): Change this in future for better performance
|
||||
// expr = fmt.Sprintf("coalesce(%s, %s)", promotedExpr, expr)
|
||||
// }
|
||||
|
||||
}
|
||||
// }
|
||||
|
||||
return expr, nil
|
||||
}
|
||||
|
||||
@@ -96,6 +96,9 @@ func TestStmtBuilderTimeSeriesBodyGroupByJSON(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
/* Promoted path tests commented out — Materialized now means type hint (direct sub-column),
|
||||
not a body_promoted.* column. These tests assumed the old coalesce(body_promoted.x, body_v2.x) path.
|
||||
|
||||
func TestStmtBuilderTimeSeriesBodyGroupByPromoted(t *testing.T) {
|
||||
enable, disable := jsonQueryTestUtil(t)
|
||||
enable()
|
||||
@@ -156,6 +159,7 @@ func TestStmtBuilderTimeSeriesBodyGroupByPromoted(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
func TestStatementBuilderListQueryBodyHas(t *testing.T) {
|
||||
enable, disable := jsonQueryTestUtil(t)
|
||||
@@ -386,8 +390,8 @@ func TestStatementBuilderListQueryBody(t *testing.T) {
|
||||
Limit: 10,
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (arrayExists(`body_v2.education`-> (arrayExists(`body_v2.education[].awards`-> toFloat64(dynamicElement(`body_v2.education[].awards`.`semester`, 'Int64')) BETWEEN ? AND ?, dynamicElement(`body_v2.education`.`awards`, 'Array(JSON(max_dynamic_types=8, max_dynamic_paths=0))')) OR arrayExists(`body_v2.education[].awards`-> toFloat64(dynamicElement(`body_v2.education[].awards`.`semester`, 'Int64')) BETWEEN ? AND ?, arrayMap(x->dynamicElement(x, 'JSON'), arrayFilter(x->(dynamicType(x) = 'JSON'), dynamicElement(`body_v2.education`.`awards`, 'Array(Dynamic)'))))), dynamicElement(body_v2.`education`, 'Array(JSON(max_dynamic_types=16, max_dynamic_paths=0))'))) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (arrayExists(`body_v2.education`-> (arrayExists(`body_v2.education[].awards`-> toFloat64(dynamicElement(`body_v2.education[].awards`.`semester`, 'Int64')) BETWEEN ? AND ?, dynamicElement(`body_v2.education`.`awards`, 'Array(JSON(max_dynamic_types=8, max_dynamic_paths=0))')) OR arrayExists(`body_v2.education[].awards`-> toFloat64(dynamicElement(`body_v2.education[].awards`.`semester`, 'Int64')) BETWEEN ? AND ?, arrayMap(x->dynamicElement(x, 'JSON'), arrayFilter(x->(dynamicType(x) = 'JSON'), dynamicElement(`body_v2.education`.`awards`, 'Array(Dynamic)'))))), dynamicElement(body_v2.`education`, 'Array(JSON(max_dynamic_types=16, max_dynamic_paths=0))'))) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Args: []any{float64(2), float64(4), float64(2), float64(4), uint64(1747945619), uint64(1747983448), float64(2), float64(4), float64(2), float64(4), "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (arrayExists(`body_v2.education`-> (arrayExists(`body_v2.education[].awards`-> toFloat64(dynamicElement(`body_v2.education[].awards`.`semester`, 'Int64')) BETWEEN ? AND ?, dynamicElement(`body_v2.education`.`awards`, 'Array(JSON(max_dynamic_types=8, max_dynamic_paths=0))')) OR arrayExists(`body_v2.education[].awards`-> toFloat64(dynamicElement(`body_v2.education[].awards`.`semester`, 'Int64')) BETWEEN ? AND ?, arrayMap(x->dynamicElement(x, 'JSON'), arrayFilter(x->(dynamicType(x) = 'JSON'), dynamicElement(`body_v2.education`.`awards`, 'Array(Dynamic)'))))), dynamicElement(body_v2.`education`, 'Array(JSON(max_dynamic_types=16, max_dynamic_paths=0))'))) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Args: []any{uint64(1747945619), uint64(1747983448), float64(2), float64(4), float64(2), float64(4), "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
@@ -400,8 +404,8 @@ func TestStatementBuilderListQueryBody(t *testing.T) {
|
||||
Limit: 10,
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (arrayExists(`body_v2.education`-> (arrayExists(`body_v2.education[].awards`-> toFloat64(dynamicElement(`body_v2.education[].awards`.`semester`, 'Int64')) IN (?, ?), dynamicElement(`body_v2.education`.`awards`, 'Array(JSON(max_dynamic_types=8, max_dynamic_paths=0))')) OR arrayExists(`body_v2.education[].awards`-> toFloat64(dynamicElement(`body_v2.education[].awards`.`semester`, 'Int64')) IN (?, ?), arrayMap(x->dynamicElement(x, 'JSON'), arrayFilter(x->(dynamicType(x) = 'JSON'), dynamicElement(`body_v2.education`.`awards`, 'Array(Dynamic)'))))), dynamicElement(body_v2.`education`, 'Array(JSON(max_dynamic_types=16, max_dynamic_paths=0))'))) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (arrayExists(`body_v2.education`-> (arrayExists(`body_v2.education[].awards`-> toFloat64(dynamicElement(`body_v2.education[].awards`.`semester`, 'Int64')) IN (?, ?), dynamicElement(`body_v2.education`.`awards`, 'Array(JSON(max_dynamic_types=8, max_dynamic_paths=0))')) OR arrayExists(`body_v2.education[].awards`-> toFloat64(dynamicElement(`body_v2.education[].awards`.`semester`, 'Int64')) IN (?, ?), arrayMap(x->dynamicElement(x, 'JSON'), arrayFilter(x->(dynamicType(x) = 'JSON'), dynamicElement(`body_v2.education`.`awards`, 'Array(Dynamic)'))))), dynamicElement(body_v2.`education`, 'Array(JSON(max_dynamic_types=16, max_dynamic_paths=0))'))) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Args: []any{float64(2), float64(4), float64(2), float64(4), uint64(1747945619), uint64(1747983448), float64(2), float64(4), float64(2), float64(4), "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (arrayExists(`body_v2.education`-> (arrayExists(`body_v2.education[].awards`-> toFloat64(dynamicElement(`body_v2.education[].awards`.`semester`, 'Int64')) IN (?, ?), dynamicElement(`body_v2.education`.`awards`, 'Array(JSON(max_dynamic_types=8, max_dynamic_paths=0))')) OR arrayExists(`body_v2.education[].awards`-> toFloat64(dynamicElement(`body_v2.education[].awards`.`semester`, 'Int64')) IN (?, ?), arrayMap(x->dynamicElement(x, 'JSON'), arrayFilter(x->(dynamicType(x) = 'JSON'), dynamicElement(`body_v2.education`.`awards`, 'Array(Dynamic)'))))), dynamicElement(body_v2.`education`, 'Array(JSON(max_dynamic_types=16, max_dynamic_paths=0))'))) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Args: []any{uint64(1747945619), uint64(1747983448), float64(2), float64(4), float64(2), float64(4), "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
@@ -525,6 +529,9 @@ func TestStatementBuilderListQueryBody(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
/* Promoted path list-query tests commented out — Materialized now means type hint
|
||||
(direct sub-column access), not a body_promoted.* column.
|
||||
|
||||
func TestStatementBuilderListQueryBodyPromoted(t *testing.T) {
|
||||
enable, disable := jsonQueryTestUtil(t)
|
||||
enable()
|
||||
@@ -700,6 +707,7 @@ func TestStatementBuilderListQueryBodyPromoted(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
func TestStatementBuilderListQueryBodyMessage(t *testing.T) {
|
||||
enable, disable := jsonQueryTestUtil(t)
|
||||
@@ -707,19 +715,6 @@ func TestStatementBuilderListQueryBodyMessage(t *testing.T) {
|
||||
defer disable()
|
||||
|
||||
statementBuilder := buildJSONTestStatementBuilder(t)
|
||||
indexed := []*telemetrytypes.TelemetryFieldKey{
|
||||
{
|
||||
Name: "message",
|
||||
Indexes: []telemetrytypes.JSONDataTypeIndex{
|
||||
{
|
||||
Type: telemetrytypes.String,
|
||||
ColumnExpression: "body_promoted.message",
|
||||
IndexExpression: "(lower(assumeNotNull(dynamicElement(body_promoted.message, 'String'))))",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
testAddIndexedPaths(t, statementBuilder, indexed...)
|
||||
cases := []struct {
|
||||
name string
|
||||
requestType qbtypes.RequestType
|
||||
@@ -732,12 +727,12 @@ func TestStatementBuilderListQueryBodyMessage(t *testing.T) {
|
||||
requestType: qbtypes.RequestTypeRaw,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
Filter: &qbtypes.Filter{Expression: "body.message Exists"},
|
||||
Filter: &qbtypes.Filter{Expression: "message Exists"},
|
||||
Limit: 10,
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (dynamicElement(body_v2.`message`, 'String') IS NOT NULL) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Args: []any{uint64(1747945619), uint64(1747983448), "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND body_v2.message <> ? AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Args: []any{uint64(1747945619), uint64(1747983448), "", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
@@ -750,7 +745,7 @@ func TestStatementBuilderListQueryBodyMessage(t *testing.T) {
|
||||
Limit: 10,
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (dynamicElement(body_v2.`message`, 'String') = ?) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND body_v2.message = ? AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Args: []any{uint64(1747945619), uint64(1747983448), "", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
@@ -764,21 +759,21 @@ func TestStatementBuilderListQueryBodyMessage(t *testing.T) {
|
||||
Limit: 10,
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (dynamicElement(body_v2.`message`, 'String') = ?) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND body_v2.message = ? AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Args: []any{uint64(1747945619), uint64(1747983448), "Iron Award", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
name: "body.message contains 'Iron Award'",
|
||||
name: "message contains 'Iron Award'",
|
||||
requestType: qbtypes.RequestTypeRaw,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]{
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
Filter: &qbtypes.Filter{Expression: "body.message Contains 'Iron Award'"},
|
||||
Filter: &qbtypes.Filter{Expression: "message Contains 'Iron Award'"},
|
||||
Limit: 10,
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND (LOWER(dynamicElement(body_v2.`message`, 'String')) LIKE LOWER(?)) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE true AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?) SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, scope_name, scope_version, body_v2 as body, attributes_string, attributes_number, attributes_bool, resources_string, scope_string FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter) AND LOWER(body_v2.message) LIKE LOWER(?) AND timestamp >= ? AND ts_bucket_start >= ? AND timestamp < ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Args: []any{uint64(1747945619), uint64(1747983448), "%Iron Award%", "1747947419000000000", uint64(1747945619), "1747983448000000000", uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
@@ -804,8 +799,7 @@ func TestStatementBuilderListQueryBodyMessage(t *testing.T) {
|
||||
}
|
||||
|
||||
func buildTestTelemetryMetadataStore(t *testing.T, promotedPaths ...string) *telemetrytypestest.MockMetadataStore {
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(IntrinsicFields)
|
||||
types, _ := telemetrytypes.TestJSONTypeSet()
|
||||
for path, jsonTypes := range types {
|
||||
promoted := false
|
||||
@@ -842,11 +836,14 @@ func buildJSONTestStatementBuilder(t *testing.T, promotedPaths ...string) *logQu
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
|
||||
resourceFilterFM := resourcefilter.NewFieldMapper()
|
||||
resourceFilterCB := resourcefilter.NewConditionBuilder(resourceFilterFM)
|
||||
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
|
||||
resourceFilterStmtBuilder := resourcefilter.NewLogResourceFilterStatementBuilder(
|
||||
instrumentationtest.New().ToProviderSettings(),
|
||||
fm,
|
||||
cb,
|
||||
resourceFilterFM,
|
||||
resourceFilterCB,
|
||||
mockMetadataStore,
|
||||
DefaultFullTextColumn,
|
||||
GetBodyJSONKey,
|
||||
@@ -866,19 +863,6 @@ func buildJSONTestStatementBuilder(t *testing.T, promotedPaths ...string) *logQu
|
||||
return statementBuilder
|
||||
}
|
||||
|
||||
func testAddIndexedPaths(t *testing.T, statementBuilder *logQueryStatementBuilder, telemetryFieldKeys ...*telemetrytypes.TelemetryFieldKey) {
|
||||
mockMetadataStore := statementBuilder.metadataStore.(*telemetrytypestest.MockMetadataStore)
|
||||
for _, key := range telemetryFieldKeys {
|
||||
if strings.Contains(key.Name, telemetrytypes.ArraySep) || strings.Contains(key.Name, telemetrytypes.ArrayAnyIndex) {
|
||||
t.Fatalf("array paths are not supported: %s", key.Name)
|
||||
}
|
||||
|
||||
for _, storedKey := range mockMetadataStore.KeysMap[key.Name] {
|
||||
storedKey.Indexes = append(storedKey.Indexes, key.Indexes...)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func jsonQueryTestUtil(_ *testing.T) (func(), func()) {
|
||||
querybuilder.BodyJSONQueryEnabled = true
|
||||
base := telemetrytypes.TelemetryFieldKey{
|
||||
@@ -897,6 +881,8 @@ func jsonQueryTestUtil(_ *testing.T) (func(), func()) {
|
||||
DefaultFullTextColumn = &base
|
||||
IntrinsicFields["body"] = base
|
||||
delete(IntrinsicFields, MessageSubColumn)
|
||||
delete(IntrinsicFields, MessageBodyField)
|
||||
delete(logsV2Columns, MessageSubColumn)
|
||||
}
|
||||
|
||||
return enable, disable
|
||||
|
||||
@@ -18,7 +18,7 @@ import (
|
||||
func resourceFilterStmtBuilder() qbtypes.StatementBuilder[qbtypes.LogAggregation] {
|
||||
fm := resourcefilter.NewFieldMapper()
|
||||
cb := resourcefilter.NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
|
||||
keysMap := buildCompleteFieldKeyMap()
|
||||
for _, keys := range keysMap {
|
||||
for _, key := range keys {
|
||||
@@ -196,7 +196,7 @@ func TestStatementBuilderTimeSeries(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
@@ -316,7 +316,7 @@ func TestStatementBuilderListQuery(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
@@ -456,7 +456,7 @@ func TestStatementBuilderListQueryResourceTests(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
@@ -532,7 +532,7 @@ func TestStatementBuilderTimeSeriesBodyGroupBy(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
@@ -627,7 +627,7 @@ func TestStatementBuilderListQueryServiceCollision(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMapCollision()
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
@@ -850,7 +850,7 @@ func TestAdjustKey(t *testing.T) {
|
||||
}
|
||||
|
||||
fm := NewFieldMapper()
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMapCollision()
|
||||
cb := NewConditionBuilder(fm)
|
||||
|
||||
@@ -1005,7 +1005,7 @@ func TestStmtBuilderBodyField(t *testing.T) {
|
||||
disable()
|
||||
}
|
||||
// build the key map after enabling/disabling body JSON query
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
|
||||
|
||||
@@ -939,10 +939,6 @@ func buildCompleteFieldKeyMap() map[string][]*telemetrytypes.TelemetryFieldKey {
|
||||
}
|
||||
}
|
||||
|
||||
// add intrinsic fields to the map
|
||||
for fieldName, key := range IntrinsicFields {
|
||||
keysMap[fieldName] = append(keysMap[fieldName], &key)
|
||||
}
|
||||
return keysMap
|
||||
}
|
||||
|
||||
|
||||
@@ -540,7 +540,7 @@ func (t *telemetryMetaStore) getLogsKeys(ctx context.Context, fieldKeySelectors
|
||||
}
|
||||
|
||||
mapOfKeys[key.Name] = append(mapOfKeys[key.Name], key)
|
||||
setOfKeys[name+";"+fieldContext.StringValue()+";"+fieldDataType.StringValue()] = key
|
||||
setOfKeys[key.Text()] = key
|
||||
}
|
||||
|
||||
if rows.Err() != nil {
|
||||
@@ -566,11 +566,15 @@ func (t *telemetryMetaStore) getLogsKeys(ctx context.Context, fieldKeySelectors
|
||||
|
||||
if found {
|
||||
if field, exists := telemetrylogs.IntrinsicFields[key]; exists {
|
||||
if _, added := setOfKeys[field.Name+";"+field.FieldContext.StringValue()+";"+field.FieldDataType.StringValue()]; !added {
|
||||
mapOfKeys[field.Name] = append(mapOfKeys[field.Name], &field)
|
||||
// intrinsic field can be a logical field mapped to a different physical location
|
||||
mapOfKeys[key] = append(mapOfKeys[key], &field)
|
||||
// Register by physical name only once and only when it differs from the
|
||||
// logical key — if they are the same, the always-register below covers it.
|
||||
if _, added := setOfKeys[field.Text()]; !added {
|
||||
mapOfKeys[field.Text()] = append(mapOfKeys[field.Name], &field)
|
||||
}
|
||||
// Always register the logical key so that every alias in IntrinsicFields
|
||||
// (e.g. "message", "body_v2.message") independently resolves to the same
|
||||
// physical field in the keys map.
|
||||
mapOfKeys[key] = append(mapOfKeys[key], &field)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
@@ -159,7 +159,7 @@ func TestStatementBuilder(t *testing.T) {
|
||||
|
||||
fm := telemetrymetrics.NewFieldMapper()
|
||||
cb := telemetrymetrics.NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
|
||||
keys, err := telemetrytypestest.LoadFieldKeysFromJSON("testdata/keys_map.json")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to load field keys: %v", err)
|
||||
|
||||
@@ -221,7 +221,7 @@ func TestStatementBuilder(t *testing.T) {
|
||||
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
|
||||
keys, err := telemetrytypestest.LoadFieldKeysFromJSON("testdata/keys_map.json")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to load field keys: %v", err)
|
||||
|
||||
@@ -18,7 +18,7 @@ import (
|
||||
func resourceFilterStmtBuilder() qbtypes.StatementBuilder[qbtypes.TraceAggregation] {
|
||||
fm := resourcefilter.NewFieldMapper()
|
||||
cb := resourcefilter.NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
|
||||
return resourcefilter.NewTraceResourceFilterStatementBuilder(
|
||||
@@ -368,7 +368,7 @@ func TestStatementBuilder(t *testing.T) {
|
||||
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
|
||||
|
||||
@@ -664,7 +664,7 @@ func TestStatementBuilderListQuery(t *testing.T) {
|
||||
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
|
||||
|
||||
@@ -771,7 +771,7 @@ func TestStatementBuilderListQueryWithCorruptData(t *testing.T) {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
|
||||
mockMetadataStore.KeysMap = c.keysMap
|
||||
if mockMetadataStore.KeysMap == nil {
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
@@ -927,7 +927,7 @@ func TestStatementBuilderTraceQuery(t *testing.T) {
|
||||
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
|
||||
|
||||
@@ -1145,7 +1145,7 @@ func TestAdjustKey(t *testing.T) {
|
||||
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
|
||||
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
|
||||
|
||||
@@ -1420,7 +1420,7 @@ func TestAdjustKeys(t *testing.T) {
|
||||
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
|
||||
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
|
||||
|
||||
|
||||
@@ -388,7 +388,7 @@ func TestTraceOperatorStatementBuilder(t *testing.T) {
|
||||
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
|
||||
|
||||
@@ -504,7 +504,7 @@ func TestTraceOperatorStatementBuilderErrors(t *testing.T) {
|
||||
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
|
||||
|
||||
|
||||
@@ -19,7 +19,7 @@ func TestTraceTimeRangeOptimization(t *testing.T) {
|
||||
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore(nil)
|
||||
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
mockMetadataStore.KeysMap["trace_id"] = []*telemetrytypes.TelemetryFieldKey{{
|
||||
|
||||
@@ -40,7 +40,7 @@ type TelemetryFieldKey struct {
|
||||
JSONDataType *JSONDataType `json:"-"`
|
||||
JSONPlan JSONAccessPlan `json:"-"`
|
||||
Indexes []JSONDataTypeIndex `json:"-"`
|
||||
Materialized bool `json:"-"` // refers to promoted in case of body.... fields
|
||||
Materialized bool `json:"-"` // refers to type hint in case of JSON column fields
|
||||
}
|
||||
|
||||
func (f *TelemetryFieldKey) KeyNameContainsArray() bool {
|
||||
|
||||
@@ -21,10 +21,20 @@ type MockMetadataStore struct {
|
||||
PromotedPathsMap map[string]bool
|
||||
LogsJSONIndexesMap map[string][]schemamigrator.Index
|
||||
LookupKeysMap map[telemetrytypes.MetricMetadataLookupKey]int64
|
||||
// StaticFields holds signal-specific intrinsic field definitions (e.g. telemetrylogs.IntrinsicFields).
|
||||
// It is injected into GetKeys / GetKey results, mirroring what the real metadata store does when
|
||||
// it reads telemetrylogs.IntrinsicFields directly. Callers pass their package's IntrinsicFields
|
||||
// map at construction time to avoid a circular import.
|
||||
StaticFields map[string]telemetrytypes.TelemetryFieldKey
|
||||
}
|
||||
|
||||
// NewMockMetadataStore creates a new instance of MockMetadataStore with initialized maps
|
||||
func NewMockMetadataStore() *MockMetadataStore {
|
||||
// NewMockMetadataStore creates a new instance of MockMetadataStore with initialized maps.
|
||||
// Pass the signal-specific intrinsic fields (e.g. telemetrylogs.IntrinsicFields) so the mock
|
||||
// mirrors what the real metadata store does when injecting those definitions into key results.
|
||||
func NewMockMetadataStore(intrinsicFields map[string]telemetrytypes.TelemetryFieldKey) *MockMetadataStore {
|
||||
if intrinsicFields == nil {
|
||||
intrinsicFields = make(map[string]telemetrytypes.TelemetryFieldKey)
|
||||
}
|
||||
return &MockMetadataStore{
|
||||
KeysMap: make(map[string][]*telemetrytypes.TelemetryFieldKey),
|
||||
RelatedValuesMap: make(map[string][]string),
|
||||
@@ -34,6 +44,7 @@ func NewMockMetadataStore() *MockMetadataStore {
|
||||
PromotedPathsMap: make(map[string]bool),
|
||||
LogsJSONIndexesMap: make(map[string][]schemamigrator.Index),
|
||||
LookupKeysMap: make(map[telemetrytypes.MetricMetadataLookupKey]int64),
|
||||
StaticFields: intrinsicFields,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -47,9 +58,8 @@ func (m *MockMetadataStore) GetKeys(ctx context.Context, fieldKeySelector *telem
|
||||
return m.KeysMap, true, nil
|
||||
}
|
||||
|
||||
// Apply selector logic
|
||||
// Apply selector logic from KeysMap
|
||||
for name, keys := range m.KeysMap {
|
||||
// Check if name matches
|
||||
if matchesName(fieldKeySelector, name) {
|
||||
filteredKeys := []*telemetrytypes.TelemetryFieldKey{}
|
||||
for _, key := range keys {
|
||||
@@ -63,6 +73,31 @@ func (m *MockMetadataStore) GetKeys(ctx context.Context, fieldKeySelector *telem
|
||||
}
|
||||
}
|
||||
|
||||
// Inject StaticFields (e.g. IntrinsicFields), mirroring the real metadata store.
|
||||
// StaticFields take precedence over KeysMap entries for the same logical key.
|
||||
// Each logical key always gets its own entry (so "message" and "body.message" both
|
||||
// resolve to the IntrinsicField independently). The physical name is registered
|
||||
// only once to avoid duplicate entries in that slot.
|
||||
injectedPhysKeys := make(map[string]bool)
|
||||
for key, field := range m.StaticFields {
|
||||
if !matchesName(fieldKeySelector, key) {
|
||||
continue
|
||||
}
|
||||
fieldCopy := field
|
||||
|
||||
// Always register the logical key (override any KeysMap entries).
|
||||
result[key] = []*telemetrytypes.TelemetryFieldKey{&fieldCopy}
|
||||
|
||||
// Register by physical name only once when it differs from the logical key.
|
||||
if key != fieldCopy.Name {
|
||||
physKey := fieldCopy.Name + ";" + fieldCopy.FieldContext.StringValue() + ";" + fieldCopy.FieldDataType.StringValue()
|
||||
if !injectedPhysKeys[physKey] {
|
||||
injectedPhysKeys[physKey] = true
|
||||
result[fieldCopy.Name] = append(result[fieldCopy.Name], &fieldCopy)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result, true, nil
|
||||
}
|
||||
|
||||
@@ -109,7 +144,7 @@ func (m *MockMetadataStore) GetKey(ctx context.Context, fieldKeySelector *teleme
|
||||
|
||||
result := []*telemetrytypes.TelemetryFieldKey{}
|
||||
|
||||
// Find keys matching the selector
|
||||
// Find keys matching the selector from KeysMap
|
||||
for name, keys := range m.KeysMap {
|
||||
if matchesName(fieldKeySelector, name) {
|
||||
for _, key := range keys {
|
||||
@@ -120,6 +155,14 @@ func (m *MockMetadataStore) GetKey(ctx context.Context, fieldKeySelector *teleme
|
||||
}
|
||||
}
|
||||
|
||||
// Add matching StaticFields (e.g. IntrinsicFields), same as the real metadata store does
|
||||
for key, field := range m.StaticFields {
|
||||
if fieldKeySelector.Name == "" || strings.Contains(key, fieldKeySelector.Name) {
|
||||
fieldCopy := field
|
||||
result = append(result, &fieldCopy)
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
@@ -291,7 +334,7 @@ func (m *MockMetadataStore) FetchTemporalityMulti(ctx context.Context, queryTime
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// FetchTemporalityMulti fetches the temporality for multiple metrics
|
||||
// FetchTemporalityAndTypeMulti fetches the temporality and type for multiple metrics
|
||||
func (m *MockMetadataStore) FetchTemporalityAndTypeMulti(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string]metrictypes.Temporality, map[string]metrictypes.Type, error) {
|
||||
temporalities := make(map[string]metrictypes.Temporality)
|
||||
types := make(map[string]metrictypes.Type)
|
||||
|
||||
Reference in New Issue
Block a user